Source code for tdw_catalog.lineage.dataset_relationship

from datetime import datetime
from typing import TYPE_CHECKING, Optional, Union, List
from tdw_catalog.entity import Entity, EntityBase, Property
from tdw_catalog.errors import _convert_error

if TYPE_CHECKING:
    import tdw_catalog.organization as organization
    import tdw_catalog.dataset as dataset
    import tdw_catalog.lineage.column_relationship as column_relationship


[docs]@Entity([ Property("id", str, serialize=True), Property("organization_id", str, relation="tdw_catalog.organization.Organization"), Property("upstream_dataset_id", str, relation="tdw_catalog.dataset.Dataset"), Property("downstream_dataset_id", str, relation="tdw_catalog.dataset.Dataset"), Property("user_id", str), Property("label", str, writable=True), Property("description", Optional[str], writable=True), Property("created_at", datetime), Property("updated_at", datetime) ]) class DatasetLineageRelationship(EntityBase): """ :class:`.DatasetLineageRelationship` Attributes ---------- id : str :class:`.DatasetLineageRelationship`\\ 's unique id organization_id : str The unique ID of the :class:`.Organization` to which this :class:`.DatasetLineageRelationship` belongs organization : organization.Organization The :class:`.Organization` object that relates to the `organization_id` of this model upstream_dataset_id : str The unique ID of the :class:`.Dataset` on the upstream end of this class:`.DatasetLineageRelationship` upstream_dataset : Union[dataset.Dataset, dataset.ConnectedDataset] The :class:`.Dataset` object that relates to the `upstream_dataset_id` of this model downstream_dataset_id : str The unique ID of the :class:`.Dataset` on the downstream end of this class:`.DatasetLineageRelationship` downstream_dataset : Union[dataset.Dataset, dataset.ConnectedDataset] The :class:`.Dataset` object that relates to the `downstream_dataset_id` of this model user_id : str The unique :class:`.User` ID of the user who created this :class:`.DatasetLineageRelationship` label : str The descriptive label for this :class:`.DatasetLineageRelationship` description : Optional[str] = None An optional extended description for this :class:`.DatasetLineageRelationship` created_at : datetime The datetime at which this :class:`.DatasetLineageRelationship` was created updated_at : datetime The datetime at which this :class:`.DatasetLineageRelationship` was last updated """ id: str organization_id: str organization: 'organization.Organization' upstream_dataset_id: str upstream_dataset: 'Union[dataset.Dataset, dataset.ConnectedDataset]' downstream_dataset_id: str downstream_dataset: 'Union[dataset.Dataset, dataset.ConnectedDataset]' user_id: str label: str description: Optional[str] = None created_at: datetime updated_at: datetime _column_lineage: 'List[column_relationship.ColumnLineageRelationship]' def __str__(self) -> str: return f'<DatasetLineageRelationship id={self._id} label={self.label} upstream_dataset_id={self.upstream_dataset_id} downstream_dataset_id={self.downstream_dataset_id}>'
[docs] @classmethod def get(cls, client, id: str): """ Retrieve a :class:`.DatasetLineageRelationship` Parameters ---------- client : Catalog The :class:`.Catalog` client to use to get the :class:`.DatasetLineageRelationship` id : str The unique ID of the :class:`.DatasetLineageRelationship` Returns ------- DatasetLineageRelationship The :class:`.DatasetLineageRelationship` associated with the given ID Raises ------ CatalogInternalException If call to the :class:`.Catalog` server fails CatalogNotFoundException If the :class:`.DatasetLineageRelationship` with the supplied ID could not be found CatalogPermissionDeniedException If the caller is not allowed to retrieve this :class:`.DatasetLineageRelationship` because they do not have access to one or both datasets involved, or the :class:`.Organization` this relationship belongs to. """ try: res = client._get_dataset_lineage( dataset_lineage_relationship_id=id) dataset_relationship = res[ "dataset_relationship"] if "dataset_relationship" in res else res d = DatasetLineageRelationship(client, **dataset_relationship) # If the response contained column lineage, deserialize it if "column_relationships" in res: import tdw_catalog.lineage.column_relationship as column_relationship d._column_lineage = [ column_relationship.ColumnLineageRelationship(client, **c) for c in res["column_relationships"] ] return d except Exception as e: raise _convert_error(e)
[docs] def save(self) -> None: """ Update this :class:`.DatasetLineageRelationship`, saving any changes to its fields Raises ------ CatalogPermissionDeniedException If the caller is not allowed to update this :class:`.DatasetLineageRelationship` CatalogNotFoundException If the :class:`.DatasetLineageRelationship` no longer exists CatalogException If call to the :class:`.Catalog` server fails """ try: res = self._client._update_dataset_lineage(**self.serialize()) self.deserialize(res) except Exception as e: raise _convert_error(e)
[docs] def delete(self) -> None: """ Delete this :class:`.DatasetLineageRelationship`. All :class:`.ColumnLineageRelationship`\\ s within this :class:`.DatasetLineageRelationship` will be deleted as well. This :class:`.DatasetLineageRelationship` object should not be used after `delete()` has successfully returned Raises ------ CatalogPermissionDeniedException If the caller is not allowed to delete this :class:`.DatasetLineageRelationship` CatalogNotFoundException If the :class:`.DatasetLineageRelationship` no longer exists CatalogException If call to the :class:`.Catalog` server fails """ try: self._client._delete_dataset_lineage(id=self.id) except Exception as e: raise _convert_error(e)
[docs] def list_column_lineage( self) -> 'List[column_relationship.ColumnLineageRelationship]': """ List all column lineage within this :class:`.DatasetLineageRelationship` Returns ------- List[ColumnLineageRelationship] The :class:`.ColumnLineageRelationship`\\ s contained within this :class:`.DatasetLineageRelationship` """ return self._column_lineage if self._column_lineage is not None else []
def _get_organization(self): import tdw_catalog.organization as organization return organization.Organization.get(self._client, self.organization_id) def _get_upstream_dataset(self): import tdw_catalog.dataset as dataset return dataset.Dataset.get(self._client, self.upstream_dataset_id, self.organization_id) def _get_downstream_dataset(self): import tdw_catalog.dataset as dataset return dataset.Dataset.get(self._client, self.downstream_dataset_id, self.organization_id)