from datetime import datetime
from typing import TYPE_CHECKING, Optional, Union, List
from tdw_catalog.entity import Entity, EntityBase, Property
from tdw_catalog.errors import _convert_error
if TYPE_CHECKING:
import tdw_catalog.organization as organization
import tdw_catalog.dataset as dataset
import tdw_catalog.lineage.column_relationship as column_relationship
[docs]@Entity([
Property("id", str, serialize=True),
Property("organization_id",
str,
relation="tdw_catalog.organization.Organization"),
Property("upstream_dataset_id",
str,
relation="tdw_catalog.dataset.Dataset"),
Property("downstream_dataset_id",
str,
relation="tdw_catalog.dataset.Dataset"),
Property("user_id", str),
Property("label", str, writable=True),
Property("description", Optional[str], writable=True),
Property("created_at", datetime),
Property("updated_at", datetime)
])
class DatasetLineageRelationship(EntityBase):
"""
:class:`.DatasetLineageRelationship`
Attributes
----------
id : str
:class:`.DatasetLineageRelationship`\\ 's unique id
organization_id : str
The unique ID of the :class:`.Organization` to which this :class:`.DatasetLineageRelationship` belongs
organization : organization.Organization
The :class:`.Organization` object that relates to the `organization_id` of this model
upstream_dataset_id : str
The unique ID of the :class:`.Dataset` on the upstream end of this class:`.DatasetLineageRelationship`
upstream_dataset : Union[dataset.Dataset, dataset.ConnectedDataset]
The :class:`.Dataset` object that relates to the `upstream_dataset_id` of this model
downstream_dataset_id : str
The unique ID of the :class:`.Dataset` on the downstream end of this class:`.DatasetLineageRelationship`
downstream_dataset : Union[dataset.Dataset, dataset.ConnectedDataset]
The :class:`.Dataset` object that relates to the `downstream_dataset_id` of this model
user_id : str
The unique :class:`.User` ID of the user who created this :class:`.DatasetLineageRelationship`
label : str
The descriptive label for this :class:`.DatasetLineageRelationship`
description : Optional[str] = None
An optional extended description for this :class:`.DatasetLineageRelationship`
created_at : datetime
The datetime at which this :class:`.DatasetLineageRelationship` was created
updated_at : datetime
The datetime at which this :class:`.DatasetLineageRelationship` was last updated
"""
id: str
organization_id: str
organization: 'organization.Organization'
upstream_dataset_id: str
upstream_dataset: 'Union[dataset.Dataset, dataset.ConnectedDataset]'
downstream_dataset_id: str
downstream_dataset: 'Union[dataset.Dataset, dataset.ConnectedDataset]'
user_id: str
label: str
description: Optional[str] = None
created_at: datetime
updated_at: datetime
_column_lineage: 'List[column_relationship.ColumnLineageRelationship]'
def __str__(self) -> str:
return f'<DatasetLineageRelationship id={self._id} label={self.label} upstream_dataset_id={self.upstream_dataset_id} downstream_dataset_id={self.downstream_dataset_id}>'
[docs] @classmethod
def get(cls, client, id: str):
"""
Retrieve a :class:`.DatasetLineageRelationship`
Parameters
----------
client : Catalog
The :class:`.Catalog` client to use to get the :class:`.DatasetLineageRelationship`
id : str
The unique ID of the :class:`.DatasetLineageRelationship`
Returns
-------
DatasetLineageRelationship
The :class:`.DatasetLineageRelationship` associated with the given ID
Raises
------
CatalogInternalException
If call to the :class:`.Catalog` server fails
CatalogNotFoundException
If the :class:`.DatasetLineageRelationship` with the supplied ID could not be found
CatalogPermissionDeniedException
If the caller is not allowed to retrieve this :class:`.DatasetLineageRelationship` because
they do not have access to one or both datasets involved, or the :class:`.Organization`
this relationship belongs to.
"""
try:
res = client._get_dataset_lineage(
dataset_lineage_relationship_id=id)
dataset_relationship = res[
"dataset_relationship"] if "dataset_relationship" in res else res
d = DatasetLineageRelationship(client, **dataset_relationship)
# If the response contained column lineage, deserialize it
if "column_relationships" in res:
import tdw_catalog.lineage.column_relationship as column_relationship
d._column_lineage = [
column_relationship.ColumnLineageRelationship(client, **c)
for c in res["column_relationships"]
]
return d
except Exception as e:
raise _convert_error(e)
[docs] def save(self) -> None:
"""
Update this :class:`.DatasetLineageRelationship`, saving any changes to its fields
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to update this :class:`.DatasetLineageRelationship`
CatalogNotFoundException
If the :class:`.DatasetLineageRelationship` no longer exists
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._update_dataset_lineage(**self.serialize())
self.deserialize(res)
except Exception as e:
raise _convert_error(e)
[docs] def delete(self) -> None:
"""
Delete this :class:`.DatasetLineageRelationship`. All :class:`.ColumnLineageRelationship`\\ s
within this :class:`.DatasetLineageRelationship` will be deleted as well.
This :class:`.DatasetLineageRelationship` object should not be used after `delete()` has successfully returned
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to delete this :class:`.DatasetLineageRelationship`
CatalogNotFoundException
If the :class:`.DatasetLineageRelationship` no longer exists
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
self._client._delete_dataset_lineage(id=self.id)
except Exception as e:
raise _convert_error(e)
[docs] def list_column_lineage(
self) -> 'List[column_relationship.ColumnLineageRelationship]':
"""
List all column lineage within this :class:`.DatasetLineageRelationship`
Returns
-------
List[ColumnLineageRelationship]
The :class:`.ColumnLineageRelationship`\\ s contained within this :class:`.DatasetLineageRelationship`
"""
return self._column_lineage if self._column_lineage is not None else []
def _get_organization(self):
import tdw_catalog.organization as organization
return organization.Organization.get(self._client,
self.organization_id)
def _get_upstream_dataset(self):
import tdw_catalog.dataset as dataset
return dataset.Dataset.get(self._client, self.upstream_dataset_id,
self.organization_id)
def _get_downstream_dataset(self):
import tdw_catalog.dataset as dataset
return dataset.Dataset.get(self._client, self.downstream_dataset_id,
self.organization_id)