import json
from datetime import datetime
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from tdw_catalog.errors import CatalogInvalidArgumentException, CatalogUnknownException, _convert_error
from tdw_catalog import Catalog
from tdw_catalog.utils import ColumnType
if TYPE_CHECKING:
from tdw_catalog import glossary_term
from tdw_catalog.lineage.column_relationship import LineageColumn
from tdw_catalog.dataset import Dataset
def _add_index(serializedColumn: dict, position: int) -> dict:
res = dict(serializedColumn)
res["position"] = position
return res
[docs]class Column():
"""
A single :class:`.Column` within a :class:`.DataDictionary`
Attributes
__________
key : str
The column name for this :class:`.Column`, within the actual :class:`.Warehouse` where the data lives
type : ColumnType
The data type for this :class:`.Column`. Available types can be found in :class:`.ColumnType`.
name: Optional[str]
An optional friendly name for this :class:`.Column`, which is visually used in place of the ``key`` throughout the :class:`.Catalog`
description: Optional[str]
An optional description for this :class:`.Column`
"""
_dataset: 'Dataset'
_id: str
_key: str
_type: ColumnType
_name: Optional[str]
_description: Optional[str]
_glossary_term_ids: Optional[List[str]]
@classmethod
def _from_property(cls, dataset: 'Dataset', p: Dict[str, any]):
# create the correct Column class based on whether the dataset is connected,
# and whether it's a currency column
c = (CurrencyColumn() if dataset.is_connected else
MetadataOnlyCurrencyColumn()) if p["type"] == "currency" else (
Column() if dataset.is_connected else MetadataOnlyColumn())
# then, fill in the column fields
c._dataset = dataset
c._id = p["id"]
c._key = p["key"]
try:
c._type = ColumnType[p["type"].upper()]
except KeyError:
raise CatalogUnknownException(
message=
"Unable to parse data dictionary. Unknown column type: {ctype}"
.format(ctype=p["type"]))
c._name = p["title"] if "title" in p else None
c._description = p["description"] if "description" in p else None
c._glossary_term_ids = p[
"glossary_term_ids"] if "glossary_term_ids" in p else []
try:
if p["type"] == "currency":
currency_json = json.loads(p["meta"])
c._symbol = currency_json[
"symbol"] if "symbol" in currency_json else ""
except json.decoder.JSONDecodeError:
c._symbol = None
return c
def __init__(self,
key: str = None,
type: ColumnType = None,
name: Optional[str] = None,
description: Optional[str] = None):
"""
Initializes a fresh :class:`.Column`, for inclusion in a :class:`.MetadataOnlyDataDictionary`
Parameters
----------
key : str
The underlying column name for this :class:`.Column`, within the actual :class:`.Warehouse` where this data lives
type : ColumnType
The data type for this :class:`.Column`. Available types can be found in :class:`.ColumnType`.
name: Optional[str]
A friendly name for this :class:`.Column`, which is visually used in place of the ``key`` throughout the :class:`.Catalog`
description: Optional[str]
An optional description for this :class:`.Column`
"""
self._id = None
self._key = key
self._type = type
self._name = name
self._description = description
self._glossary_term_ids = None
@property
def key(self) -> str:
return self._key
@property
def type(self) -> ColumnType:
return self._type
@property
def name(self) -> Optional[str]:
return self._name
@name.setter
def name(self, name: str):
self._name = name
@property
def description(self) -> str:
return self._description
@description.setter
def description(self, description: str):
self._description = description
def serialize(self) -> Dict:
return {
"id": "" if self._id is None else self._id,
"key": self._key,
"title": "" if self._name is None else self._name,
"description":
"" if self._description is None else self._description,
"type": self._type,
"glossary_term_ids": self._glossary_term_ids,
}
[docs] def apply_glossary_term(
self, glossary_term: 'glossary_term.GlossaryTerm') -> None:
"""
Apply a :class:`.GlossaryTerm` to this :class:`.Column`. The
containing :class:`.DataDictionary` must be saved for the change
to take permanent effect.
Parameters
----------
glossary_term : GlossaryTerm
The :class:`.GlossaryTerm` to classify this :class:`.Column` with
Returns
-------
None
Raises
------
CatalogInvalidArgumentException
If the :class:`.Organization` of the :class:`.GlossaryTerm` does not
match the :class:`.Organization` which the :class:`.Dataset` was retrieved
from.
"""
if glossary_term.organization_id != self._dataset._context_organization.id:
raise CatalogInvalidArgumentException(
message=
"Organization ID of the supplied GlossaryTerm does not match the Organization which this Dataset was retrieved from."
)
if self._glossary_term_ids is None:
self._glossary_term_ids = []
self._glossary_term_ids = self._glossary_term_ids + [glossary_term.id]
[docs] def remove_glossary_term(
self, glossary_term: 'glossary_term.GlossaryTerm') -> None:
"""
Remove a :class:`.GlossaryTerm` from this :class:`.Column`. The
containing :class:`.DataDictionary` must be saved for the change
to take permanent effect.
Parameters
----------
glossary_term : GlossaryTerm
The :class:`.GlossaryTerm` to be removed from this :class:`.Column`
Returns
-------
None
"""
if self._glossary_term_ids is None:
return
self._glossary_term_ids = [
t for t in self._glossary_term_ids if t != glossary_term.id
]
[docs] def list_glossary_terms(self) -> 'List[glossary_term.GlossaryTerm]':
"""
Return a list of :class:`.GlossaryTerm`\\ s that have been applied to this :class:`.Column`
Parameters
----------
None
Returns
-------
List[glossary_term.GlossaryTerm]
The list of :class:`.GlossaryTerm`\\ s that have been applied to this :class:`.Column`
Raises
------
CatalogPermissionDeniedException
If the caller does not have permission to list :class:`.GlossaryTerm`\\ s on a :class:`.Dataset`\\ 's :class:`.Column`\\ s
CatalogInternalException
If call to the :class:`.Catalog` server fails
"""
from tdw_catalog import glossary_term
if self._glossary_term_ids is None:
return []
else:
return list(
map(
lambda gt: glossary_term.GlossaryTerm.get(
client=self._dataset._client, id=gt),
self._glossary_term_ids))
def _to_lineage_column(self) -> 'LineageColumn':
from tdw_catalog.lineage.column_relationship import LineageColumn
return LineageColumn(self.key, self.name, self.type)
[docs]class CurrencyColumn(Column):
"""
A currency-specific extension of :class:`.Column`, with an added
currency symbol (such as $)
Attributes
__________
symbol : Optional[str]
An optional currency symbol (e.g. ``'$'``)
"""
_symbol: Optional[str]
def __init__(self,
key: str = None,
type: ColumnType = None,
name: Optional[str] = None,
description: Optional[str] = None,
symbol: Optional[str] = None):
"""
Initializes a fresh :class:`.Column`, for inclusion in a :class:`.MetadataOnlyDataDictionary`
Parameters
----------
key : str
The underlying column name for this :class:`.Column`, within the actual :class:`.Warehouse` where this data lives
type : ColumnType
The data type for this :class:`.Column`. Available types can be found in :class:`.ColumnType`.
name: Optional[str]
A friendly name for this :class:`.Column`, which is visually used in place of the ``key`` throughout the :class:`.Catalog`
description: Optional[str]
An optional description for this :class:`.Column`
symbol : Optional[str]
An optional currency symbol (e.g. ``'$'``)
"""
super().__init__(key, type, name, description)
self._symbol = symbol
@property
def symbol(self) -> Optional[str]:
return self._symbol
@symbol.setter
def symbol(self, symbol: str):
self._symbol = symbol
def serialize(self) -> Dict:
result = super().serialize()
result["meta"] = json.dumps(
{"symbol": self.symbol if self.symbol is not None else ""})
return result
[docs]class DataDictionary():
"""
A :class:`.DataDictionary` describes the schema of data represented
by a :class:`.Dataset` as a sequence of :class:`.Column`\ s, each with
a ``key``, ``title``, ``type``, and optional ``description``.
A :class:`.DataDictionary` behaves as a ``dict`` - columns can be accessed
via their key as follows: ``data_dictionary["column_name"]``.
Attributes
__________
last_updated_at: datetime
The last time this :class:`.DataDictionary` was updated, either by hand
(for :class:`.Dataset`\ s which are not connected) or
via a schedule metrics collection (for :class:`.ConnectedDataset`\ s which are)
columns: List[Column]
The list of :class:`.Column`\ s which make up this :class:`.DataDictionary`
"""
_dataset: 'Dataset'
_last_updated_at: datetime
_version_id: Optional[str]
_columns: List[Column]
def __init__(self, dataset: 'Dataset', last_updated_at: datetime,
version_id: Optional[str], columns: List[Column]):
self._dataset = dataset
self._last_updated_at = last_updated_at
self._version_id = version_id
self._columns = columns
@property
def last_updated_at(self) -> datetime:
"""
Returns the last time this :class:`.DataDictionary` was modified
"""
return self._last_updated_at
[docs] def columns(self) -> List[Column]:
"""
Returns all :class:`.Column`\\ s in this :class:`.DataDictionary`
"""
return self._columns
def __getitem__(self, key: str) -> Union[Column, CurrencyColumn]:
"""
Access a column in this dictionary using its key
"""
return next(iter([c for c in self._columns if c.key == key]), None)
def __len__(self) -> int:
"""
Returns the number of :class:`.Column`\\ s in this :class:`.DataDictionary`
"""
return len(self._columns)
[docs] def has_key(self, key: str) -> bool:
"""
Returns ``true`` if and only if a :class:`.Column` with the given ``key`` exists in this :class:`.DataDictionary`
"""
return len([c for c in self._columns if c.key == key]) > 0
def __contains__(self, key: str) -> bool:
"""
Returns ``true`` if and only if a :class:`.Column` with the given ``key`` exists in this :class:`.DataDictionary`
"""
return self.has_key(key)
[docs] def save(self):
"""
Update this :class:`.DataDictionary`, saving all changes to its schema
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to update this :class:`.DataDictionary`
CatalogException
If call to the :class:`.Catalog` server fails
"""
if self._dataset.is_connected:
self._dataset._client._update_properties(
dataset_id=self._dataset.id,
version_id=self._version_id,
properties=[
_add_index(p.serialize(), i) for i, p in enumerate(self._columns)
],
organization_id=self._dataset._context_organization.id
if self._dataset._context_organization is not None else None)
else:
self._dataset._client._replace_properties(
dataset_id=self._dataset.id,
properties=[
_add_index(p.serialize(), i) for i, p in enumerate(self._columns)
],
organization_id=self._dataset._context_organization.id
if self._dataset._context_organization is not None else None)