from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, List, Optional, Union
import uuid
from tdw_catalog.data_dictionary import Column
from tdw_catalog.entity import Entity, EntityBase, Property
from tdw_catalog.errors import CatalogInvalidArgumentException, CatalogUnknownException, _convert_error
from tdw_catalog.relations import _DatasetLineageRelationshipRelation
from tdw_catalog.utils import ColumnType
if TYPE_CHECKING:
import tdw_catalog.organization as organization
import tdw_catalog.dataset as dataset
import tdw_catalog.lineage.dataset_relationship as dataset_relationship
[docs]@dataclass
class LineageColumn():
"""
A :class:`.Dataset` column involved in a :class:`.ColumnLineageRelationship`
Attributes
----------
key : str
The column name for this :class:`.LineageColumn`, within the actual :class:`.Warehouse` where the data lives
type : ColumnType
The data type for this :class:`.LineageColumn`. Available types can be found in :class:`.ColumnType`.
name: Optional[str]
An optional friendly name for this :class:`.LineageColumn`, which is visually used in place of the ``key`` throughout the :class:`.Catalog`
"""
_key: str
_name: Optional[str]
_type: ColumnType
@property
def key(self) -> str:
return self._key
@property
def type(self) -> ColumnType:
return self._type
@property
def name(self) -> Optional[str]:
return self._name
def serialize(self) -> dict:
return {"key": self._key, "title": self._name, "type": self._type}
@classmethod
def deserialize(cls, data: dict) -> None:
try:
return LineageColumn(
_key=data["key"],
_name=data["title"] if "title" in data else None,
_type=ColumnType[data["type"].upper()])
except KeyError:
raise CatalogUnknownException(
message=
"Unable to parse column-level lineage. Unknown column type: {ctype}"
.format(ctype=data["type"]))
def _deserialize_column_lineage_columns(
data: List[dict]) -> Optional[List[LineageColumn]]:
if data is None:
return None
return list(map(lambda d: LineageColumn.deserialize(d), data))
[docs]@Entity([
Property("id", str, serialize=True),
Property(
"dataset_lineage_relationship_id",
str,
serialize=True,
relation=
"tdw_catalog.lineage.dataset_relationship.DatasetLineageRelationship"),
Property("upstream_columns",
List[LineageColumn],
writable=True,
deserialize=_deserialize_column_lineage_columns),
Property("downstream_columns",
List[LineageColumn],
writable=True,
deserialize=_deserialize_column_lineage_columns),
Property("user_id", str),
Property("label", str, writable=True),
Property("description", Optional[str], writable=True),
Property("created_at", datetime),
Property("updated_at", datetime)
])
class ColumnLineageRelationship(EntityBase,
_DatasetLineageRelationshipRelation):
"""
:class:`.ColumnLineageRelationship`
Attributes
----------
id : str
:class:`.ColumnLineageRelationship`\\ 's unique id
dataset_lineage_relationship_id : str
The unique ID of the :class:`.DatasetLineageRelationship` to which this :class:`.ColumnLineageRelationship` belongs
dataset_lineage_relationship : dataset_relationship.DatasetLineageRelationship
The :class:`.DatasetLineageRelationship` object that relates to the `dataset_lineage_relationship_id` of this model
upstream_columns : List[LineageColumn]
The source columns involved in this relationship
downstream_columns : List[LineageColumn],
The destination columns involved in this relationship
user_id : str
The unique :class:`.User` ID of the user who created this :class:`.ColumnLineageRelationship`
label : str
The descriptive label for this :class:`.ColumnLineageRelationship`
description : Optional[str] = None
An optional extended description for this :class:`.ColumnLineageRelationship`
created_at : datetime
The datetime at which this :class:`.ColumnLineageRelationship` was created
updated_at : datetime
The datetime at which this :class:`.ColumnLineageRelationship` was last updated
"""
id: str
dataset_lineage_relationship_id: str
dataset_lineage_relationship: 'dataset_relationship.DatasetLineageRelationship'
upstream_columns: List[LineageColumn]
downstream_columns: List[LineageColumn]
user_id: str
label: str
description: Optional[str] = None
created_at: datetime
updated_at: datetime
def __str__(self) -> str:
return f'<ColumnLineageRelationship id={self._id} label={self.label}>'
@classmethod
def _create(cls,
upstream_columns: List[Column],
downstream_columns: List[Column],
label: str = str(uuid.uuid4()),
description: Optional[str] = None):
"""
Helper method for constructing a :class:`.ColumnLineageRelationship` object
which can be used in conjunction with ``organization.create_lineage``
Parameters
----------
upstream_columns : List[Column]
The source columns involved in this relationship
downstream_columns : List[Column],
The destination columns involved in this relationship
label : str
The descriptive label for this :class:`.ColumnLineageRelationship`
description : Optional[str] = None
An optional extended description for this :class:`.ColumnLineageRelationship`
Returns
-------
ColumnLineageRelationship
An unsaved :class:`.ColumnLineageRelationship`, for use with ``organization.create_lineage``
"""
r = ColumnLineageRelationship(None,
upstream_columns=[],
downstream_columns=[],
label=label,
description=description)
r.upstream_columns = [u._to_lineage_column() for u in upstream_columns]
r.downstream_columns = [
u._to_lineage_column() for u in downstream_columns
]
return r
[docs] @classmethod
def get(cls, client, id: str):
"""
Retrieve a :class:`.ColumnLineageRelationship`
Parameters
----------
client : Catalog
The :class:`.Catalog` client to use to get the :class:`.ColumnLineageRelationship`
id : str
The unique ID of the :class:`.ColumnLineageRelationship`
Returns
-------
ColumnLineageRelationship
The :class:`.ColumnLineageRelationship` associated with the given ID
Raises
------
CatalogInternalException
If call to the :class:`.Catalog` server fails
CatalogNotFoundException
If the :class:`.ColumnLineageRelationship` with the supplied ID could not be found
CatalogPermissionDeniedException
If the caller is not allowed to retrieve this :class:`.ColumnLineageRelationship` because
they do not have access to one or both datasets involved, or the :class:`.Organization`
this relationship belongs to.
"""
try:
res = client._get_column_lineage(column_lineage_relationship_id=id)
return ColumnLineageRelationship(client, **res)
except Exception as e:
raise _convert_error(e)
def _tuples_to_column_lineage(
column_lineage: List[tuple[Union[str, List[str]], Union[str,
List[str]]]],
upstream_dataset: 'dataset.Dataset',
downstream_dataset: 'dataset.Dataset'
) -> List[ColumnLineageRelationship]:
import tdw_catalog.dataset as dataset
# map column lineage to proper types
def find_col(col_name: str, dataset: dataset.Dataset):
if col_name not in dataset.data_dictionary:
raise CatalogInvalidArgumentException(
message=
"Column '{}' does not exist in data dictionary for dataset {}".
format(col_name, dataset.id))
return dataset.data_dictionary[col_name]
return [
ColumnLineageRelationship._create(
upstream_columns=[
find_col(c, upstream_dataset)
# loop over supplied upstream column list, or just
# the one if provided as a string instead of a list
for c in ([upstream_col_list] if isinstance(
upstream_col_list, str) else upstream_col_list)
],
downstream_columns=[
find_col(c, downstream_dataset)
# loop over supplied downstream column list, or just
# the one if provided as a string instead of a list
for c in ([downstream_col_list] if isinstance(
downstream_col_list, str) else downstream_col_list)
],
) for (upstream_col_list, downstream_col_list) in column_lineage
]