Source code for tdw_catalog.lineage.column_relationship

from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, List, Optional, Union
import uuid
from tdw_catalog.data_dictionary import Column
from tdw_catalog.entity import Entity, EntityBase, Property
from tdw_catalog.errors import CatalogInvalidArgumentException, CatalogUnknownException, _convert_error
from tdw_catalog.relations import _DatasetLineageRelationshipRelation
from tdw_catalog.utils import ColumnType

if TYPE_CHECKING:
    import tdw_catalog.organization as organization
    import tdw_catalog.dataset as dataset
    import tdw_catalog.lineage.dataset_relationship as dataset_relationship


[docs]@dataclass class LineageColumn(): """ A :class:`.Dataset` column involved in a :class:`.ColumnLineageRelationship` Attributes ---------- key : str The column name for this :class:`.LineageColumn`, within the actual :class:`.Warehouse` where the data lives type : ColumnType The data type for this :class:`.LineageColumn`. Available types can be found in :class:`.ColumnType`. name: Optional[str] An optional friendly name for this :class:`.LineageColumn`, which is visually used in place of the ``key`` throughout the :class:`.Catalog` """ _key: str _name: Optional[str] _type: ColumnType @property def key(self) -> str: return self._key @property def type(self) -> ColumnType: return self._type @property def name(self) -> Optional[str]: return self._name def serialize(self) -> dict: return {"key": self._key, "title": self._name, "type": self._type} @classmethod def deserialize(cls, data: dict) -> None: try: return LineageColumn( _key=data["key"], _name=data["title"] if "title" in data else None, _type=ColumnType[data["type"].upper()]) except KeyError: raise CatalogUnknownException( message= "Unable to parse column-level lineage. Unknown column type: {ctype}" .format(ctype=data["type"]))
def _deserialize_column_lineage_columns( data: List[dict]) -> Optional[List[LineageColumn]]: if data is None: return None return list(map(lambda d: LineageColumn.deserialize(d), data))
[docs]@Entity([ Property("id", str, serialize=True), Property( "dataset_lineage_relationship_id", str, serialize=True, relation= "tdw_catalog.lineage.dataset_relationship.DatasetLineageRelationship"), Property("upstream_columns", List[LineageColumn], writable=True, deserialize=_deserialize_column_lineage_columns), Property("downstream_columns", List[LineageColumn], writable=True, deserialize=_deserialize_column_lineage_columns), Property("user_id", str), Property("label", str, writable=True), Property("description", Optional[str], writable=True), Property("created_at", datetime), Property("updated_at", datetime) ]) class ColumnLineageRelationship(EntityBase, _DatasetLineageRelationshipRelation): """ :class:`.ColumnLineageRelationship` Attributes ---------- id : str :class:`.ColumnLineageRelationship`\\ 's unique id dataset_lineage_relationship_id : str The unique ID of the :class:`.DatasetLineageRelationship` to which this :class:`.ColumnLineageRelationship` belongs dataset_lineage_relationship : dataset_relationship.DatasetLineageRelationship The :class:`.DatasetLineageRelationship` object that relates to the `dataset_lineage_relationship_id` of this model upstream_columns : List[LineageColumn] The source columns involved in this relationship downstream_columns : List[LineageColumn], The destination columns involved in this relationship user_id : str The unique :class:`.User` ID of the user who created this :class:`.ColumnLineageRelationship` label : str The descriptive label for this :class:`.ColumnLineageRelationship` description : Optional[str] = None An optional extended description for this :class:`.ColumnLineageRelationship` created_at : datetime The datetime at which this :class:`.ColumnLineageRelationship` was created updated_at : datetime The datetime at which this :class:`.ColumnLineageRelationship` was last updated """ id: str dataset_lineage_relationship_id: str dataset_lineage_relationship: 'dataset_relationship.DatasetLineageRelationship' upstream_columns: List[LineageColumn] downstream_columns: List[LineageColumn] user_id: str label: str description: Optional[str] = None created_at: datetime updated_at: datetime def __str__(self) -> str: return f'<ColumnLineageRelationship id={self._id} label={self.label}>' @classmethod def _create(cls, upstream_columns: List[Column], downstream_columns: List[Column], label: str = str(uuid.uuid4()), description: Optional[str] = None): """ Helper method for constructing a :class:`.ColumnLineageRelationship` object which can be used in conjunction with ``organization.create_lineage`` Parameters ---------- upstream_columns : List[Column] The source columns involved in this relationship downstream_columns : List[Column], The destination columns involved in this relationship label : str The descriptive label for this :class:`.ColumnLineageRelationship` description : Optional[str] = None An optional extended description for this :class:`.ColumnLineageRelationship` Returns ------- ColumnLineageRelationship An unsaved :class:`.ColumnLineageRelationship`, for use with ``organization.create_lineage`` """ r = ColumnLineageRelationship(None, upstream_columns=[], downstream_columns=[], label=label, description=description) r.upstream_columns = [u._to_lineage_column() for u in upstream_columns] r.downstream_columns = [ u._to_lineage_column() for u in downstream_columns ] return r
[docs] @classmethod def get(cls, client, id: str): """ Retrieve a :class:`.ColumnLineageRelationship` Parameters ---------- client : Catalog The :class:`.Catalog` client to use to get the :class:`.ColumnLineageRelationship` id : str The unique ID of the :class:`.ColumnLineageRelationship` Returns ------- ColumnLineageRelationship The :class:`.ColumnLineageRelationship` associated with the given ID Raises ------ CatalogInternalException If call to the :class:`.Catalog` server fails CatalogNotFoundException If the :class:`.ColumnLineageRelationship` with the supplied ID could not be found CatalogPermissionDeniedException If the caller is not allowed to retrieve this :class:`.ColumnLineageRelationship` because they do not have access to one or both datasets involved, or the :class:`.Organization` this relationship belongs to. """ try: res = client._get_column_lineage(column_lineage_relationship_id=id) return ColumnLineageRelationship(client, **res) except Exception as e: raise _convert_error(e)
def _tuples_to_column_lineage( column_lineage: List[tuple[Union[str, List[str]], Union[str, List[str]]]], upstream_dataset: 'dataset.Dataset', downstream_dataset: 'dataset.Dataset' ) -> List[ColumnLineageRelationship]: import tdw_catalog.dataset as dataset # map column lineage to proper types def find_col(col_name: str, dataset: dataset.Dataset): if col_name not in dataset.data_dictionary: raise CatalogInvalidArgumentException( message= "Column '{}' does not exist in data dictionary for dataset {}". format(col_name, dataset.id)) return dataset.data_dictionary[col_name] return [ ColumnLineageRelationship._create( upstream_columns=[ find_col(c, upstream_dataset) # loop over supplied upstream column list, or just # the one if provided as a string instead of a list for c in ([upstream_col_list] if isinstance( upstream_col_list, str) else upstream_col_list) ], downstream_columns=[ find_col(c, downstream_dataset) # loop over supplied downstream column list, or just # the one if provided as a string instead of a list for c in ([downstream_col_list] if isinstance( downstream_col_list, str) else downstream_col_list) ], ) for (upstream_col_list, downstream_col_list) in column_lineage ]