from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional, Union
from tdw_catalog import source
from tdw_catalog.entity import Entity, Property, EntityBase
from tdw_catalog.errors import CatalogException, _convert_error, _raise_error
from datetime import datetime
from tdw_catalog import Catalog
import tdw_catalog.credential as credential
from tdw_catalog.relations import _CredentialRelation, _SourceRelation
from tdw_catalog.utils import ConnectionPortalType
if TYPE_CHECKING:
import tdw_catalog.organization as organization
[docs]@dataclass
class HourlyInterval:
"""
An hourly interval causes a :class:`.ConnectionSchedule`
to execute at a specific minute every hour.
Attributes
----------
minute : int
The minute of the hour to execute at, between 0 and 59
"""
minute: int
def serialize(self) -> dict:
return {
"hourly": {
"minute": self.minute,
}
}
[docs]@dataclass
class DailyInterval(HourlyInterval):
"""
A DailyInterval interval causes a :class:`.ConnectionSchedule`
to execute at a specific minute and hour each day
Attributes
----------
minute : int
The minute of the hour to execute at, between 0 and 59
hour : int
The hour of the day to execute at, between 0 and 23
"""
hour: int
def serialize(self) -> dict:
result = super().serialize()["hourly"]
result["hour"] = self.hour
return {"daily": result}
[docs]@dataclass
class WeeklyInterval(DailyInterval):
"""
A WeelyInterval interval causes a :class:`.ConnectionSchedule`
to execute on a specific day of the week, at a specific minute+hour,
every week.
Attributes
----------
minute : int
The minute of the hour to execute at, between 0 and 59
hour : int
The hour of the day to execute at, between 0 and 23
dayOfWeek: int
The day of the week beginning on Sunday, between 0 and 6
"""
dayOfWeek: int
def serialize(self) -> dict:
result = super().serialize()["daily"]
result["dayOfWeek"] = self.dayOfWeek
return {"weekly": result}
[docs]@dataclass
class MonthlyInterval(DailyInterval):
"""
A MonthlyInterval interval causes a :class:`.ConnectionSchedule`
to execute on a specific day of the month, at a specific minute+hour,
every month.
Attributes
----------
minute : int
The minute of the hour to execute at, between 0 and 59
hour : int
The hour of the day to execute at, between 0 and 23
dayOfMonth : int
The day of the week to execute at, beginning on Sunday, between 1 and 31, or "-1" for the last day of each month
"""
dayOfMonth: int
def serialize(self) -> dict:
result = super().serialize()["daily"]
result["dayOfMonth"] = self.dayOfMonth
return {"monthly": result}
[docs]@dataclass
class YearlyInterval(MonthlyInterval):
"""
A MonthlyInterval interval causes a :class:`.ConnectionSchedule`
to execute on a specific day of a specific month,
at a specific minute+hour, every year.
Attributes
----------
minute : int
The minute of the hour to execute at, between 0 and 59
hour : int
The hour of the day to execute at, between 0 and 23
dayOfMonth : int
The day of the week to execute at, beginning on Sunday, between 1 and 31, or "-1" for the last day of each month
month : int
The month of the year to execute at, between 1 and 12
"""
month: int
def serialize(self) -> dict:
result = super().serialize()["monthly"]
result["month"] = self.month
return {"yearly": result}
def _deserializeInterval(
data: dict
) -> Union[HourlyInterval, DailyInterval, WeeklyInterval, MonthlyInterval,
YearlyInterval]:
if "yearly" in data:
return YearlyInterval(data["yearly"]["minute"], data["yearly"]["hour"],
data["yearly"]["dayOfMonth"],
data["yearly"]["month"])
if "monthly" in data:
return MonthlyInterval(data["monthly"]["minute"],
data["monthly"]["hour"],
data["monthly"]["dayOfMonth"])
if "weekly" in data:
return WeeklyInterval(data["weekly"]["minute"], data["weekly"]["hour"],
data["weekly"]["dayOfWeek"])
if "daily" in data:
return DailyInterval(data["daily"]["minute"], data["daily"]["hour"])
if "hourly" in data:
return HourlyInterval(data["hourly"]["minute"])
raise CatalogException(message="Cannot deserialize schedule")
[docs]@dataclass
class ConnectionSchedule():
"""
A :class:`.ConnectionSchedule` describes the frequency with which to
reingest ingested data, or re-analyze virtualized data
Attributes
__________
interval: HourlyInterval | DailyInterval | WeeklyInterval | MonthlyInterval | YearlyInterval
The interval that this schedule represents
timezone: str
The timezone in which to interpret times in the `interval`
"""
interval: Union[HourlyInterval, DailyInterval, WeeklyInterval,
MonthlyInterval, YearlyInterval]
timezone: str
def serialize(self) -> dict:
res = {
"timezone": self.timezone,
}
res.update(self.interval)
return res
@classmethod
def deserialize(cls, data: dict) -> None:
return ConnectionSchedule(timezone=data["timezone"],
interval=_deserializeInterval(data))
def _deserialize_connection_schedules(
data: List[dict]) -> Optional[List[ConnectionSchedule]]:
if data is None:
return None
return list(map(lambda d: ConnectionSchedule.deserialize(d), data))
@Entity([
Property("id", str, serialize=True),
Property("source_id", str, relation="tdw_catalog.source.Source"),
Property("user_id", str),
Property("label", str, writable=True),
Property("description", Optional[str], writable=True),
Property("portal", ConnectionPortalType, writable=True),
Property("url", str, writable=True),
Property("warehouse", Optional[str], writable=True),
Property("credential_id",
Optional[str],
relation="tdw_catalog.credential.Credential",
writable=True),
Property("disabled", Optional[bool], writable=True),
Property("created_at", datetime),
Property("updated_at", datetime)
])
class _Connection(EntityBase, _SourceRelation, _CredentialRelation):
"""
:class:`._Connection`\\ s are used to attach data to a :class:`.Dataset`, describing
the mechanism and necessary credentials for accessing said data.
Data can be ingested via a :class:`._Connection`, pulled from an uploaded file,
or a remote location such as a cloud storage bucket. Data can also
be virtualized via a :class:`._Connection`, accessed from a remote location
without being copied into the platform.
Attributes
----------
id : str
:class:`._Connection`\\ 's unique id
source_id : str
The unique ID of the :class:`.Source` to which this :class:`._Connection` belongs
source : Source
The :class:`.Source` associated with this :class:`._Connection`. A :class:`.Source` or ``source_id`` can be provided but not both.
user_id : str
The unique :class:`.User` ID of the user who created this :class:`._Connection`
label : str
The descriptive label for this :class:`._Connection`
description : Optional[str] = None
An optional extended description for this :class:`._Connection`
portal : ConnectionPortalType
The method of data access employed by this :class:`._Connection`
url : str
A canonical URL that points to the location of data resources within the portal
warehouse : Optional[str]
Virtualized datasets created using this :class:`._Connection` will always access data from this :class:`.Warehouse` (must be suplied for virtualization).
Non-virtualized datasets created using this :class:`._Connection` will ingest to this :class:`.Warehouse` by default (can be overriden at ingest time).
default_schema: Optional[str]
The schema to search for tables and views when this :class:`._Connection` is used for data virtualization. This field must be empty
for ingestion-oriented :class:`._Connection`\\ s, and is optional for virtualization-oriented ones.
credential_id : Optional[str]
The :class:`.Credential` ID that should be used along with the portal to access :class:`.Dataset`\\ s when ingesting. Omitted when virtualizing.
credential : Optional[credential.Credential]
The :class:`.Credential` associated with this :class:`._Connection`. Omitted when virtualizing. A :class:`.Credential` or ``credential_id`` can be provided but not both.
ingest_schedules : Optional[List[ConnectionSchedule]]
Optional :class:`.ConnectionSchedule`\\ s which, when specified, indicate the frequency with which to
reingest ingested data, or re-analyze virtualized data. Specific :class:`.Dataset`\\ s using this :class:`._Connection`
may override this set of :class:`.ConnectionSchedule`\\ s.
disabled : Optional[bool]
When true, disables the schedule on this :class:`._Connection`. The :class:`._Connection` itself can still be used for manual ingestion or data virtualization.
created_at : datetime
The datetime at which this :class:`._Connection` was created
updated_at : datetime
The datetime at which this :class:`._Connection` was last updated
"""
id: str
source_id: str
source: 'source.Source'
user_id: str
label: str
description: Optional[str]
portal: ConnectionPortalType
url: str
warehouse: Optional[str]
credential_id: Optional[str]
credential: Optional['credential.Credential']
created_at: datetime
updated_at: datetime
def __str__(self) -> str:
return f'<Connection id={self._id} label={self.label} portal={self.portal}>'
@classmethod
def get(cls, client, id: str):
"""
Retrieve a :class:`._Connection`
Parameters
----------
client : Catalog
The :class:`.Catalog` client to use to get the :class:`._Connection`
id : str
The unique ID of the :class:`._Connection`
Returns
-------
_Connection
The :class:`._Connection` associated with the given ID
Raises
------
CatalogInternalException
If call to the :class:`.Catalog` server fails
CatalogNotFoundException
If the :class:`._Connection` with the supplied ID could not be found
CatalogPermissionDeniedException
If the caller is not allowed to retrieve this :class:`._Connection`
"""
try:
res = client._get_connection(id=id)
if res["portal"] == ConnectionPortalType.EXTERNAL:
return VirtualizationConnection(client, **res)
else:
return IngestionConnection(client, **res)
except Exception as e:
err = _raise_error(e, "Unable to fetch Connection {}".format(id))
def save(self) -> None:
"""
Update this :class:`._Connection`, saving any changes to its fields
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to update this :class:`._Connection`
CatalogNotFoundException
If the :class:`._Connection` no longer exists
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._update_connection(**self.serialize())
self.deserialize(res)
except Exception as e:
raise _convert_error(e)
def delete(self) -> None:
"""
Delete this :class:`._Connection`. This :class:`._Connection` object should not be
used after `delete()` has successfully returned
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to delete this :class:`._Connection`
CatalogNotFoundException
If the :class:`._Connection` no longer exists
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
self._client._delete_connection(id=self.id)
except Exception as e:
raise _convert_error(e)
[docs]@Entity([
Property("ingest_schedules",
Optional[List[ConnectionSchedule]],
writable=True,
deserialize=_deserialize_connection_schedules),
])
class IngestionConnection(_Connection):
"""
:class:`.IngestionConnection`\\ s are used to attach ingested data to a :class:`.Dataset`,
describing the mechanism and necessary credentials for accessing said data.
Data is ingested via an :class:`.IngestionConnection`: pulled from an uploaded file,
or a remote location such as a cloud storage bucket.
Attributes
----------
id : str
:class:`.IngestionConnection`\\ 's unique id
source_id : str
The unique ID of the :class:`.Source` to which this :class:`.IngestionConnection` belongs
source : Source
The :class:`.Source` associated with this :class:`.IngestionConnection`. A :class:`.Source` or ``source_id`` can be provided but not both.
user_id : str
The unique :class:`.User` ID of the user who created this :class:`.IngestionConnection`
label : str
The descriptive label for this :class:`.IngestionConnection`
description : Optional[str] = None
An optional extended description for this :class:`.IngestionConnection`
portal : ConnectionPortalType
The method of data access employed by this :class:`.IngestionConnection`
url : str
A canonical URL that points to the location of data resources within the portal
warehouse : Optional[str]
:class:`.Dataset`\\ s created using this :class:`.IngestionConnection` will ingest to this :class:`.Warehouse` by default (can be overriden at ingest time).
credential_id : Optional[str]
The :class:`.Credential` ID that should be used along with the portal to access :class:`.Dataset`\\ s when ingesting.
credential : Optional[credential.Credential]
The :class:`.Credential` associated with this :class:`.IngestionConnection`. Omitted when virtualizing. A :class:`.Credential` or ``credential_id`` can be provided but not both.
ingest_schedules : Optional[List[ConnectionSchedule]]
Optional :class:`.ConnectionSchedule`\\ s which, when specified, indicate the frequency with which to
reingest ingested data. Specific :class:`.Dataset`\\ s using this :class:`.IngestionConnection`
may override this set of :class:`.ConnectionSchedule`\\ s.
disabled : Optional[bool]
When true, disables the schedule on this :class:`.IngestionConnection`. The :class:`.IngestionConnection` itself can still be used for manual ingestion or data virtualization.
created_at : datetime
The datetime at which this :class:`.IngestionConnection` was created
updated_at : datetime
The datetime at which this :class:`.IngestionConnection` was last updated
"""
ingest_schedules: Optional[List[ConnectionSchedule]]
[docs]@Entity([
Property("default_schema", Optional[str], writable=True),
Property("ingest_schedules",
Optional[List[ConnectionSchedule]],
display_key="metrics_collection_schedules",
writable=True,
deserialize=_deserialize_connection_schedules),
])
class VirtualizationConnection(_Connection):
"""
:class:`.VirtualizationConnection`\\ s are used to attach virtualized data to a :class:`.Dataset`,
describing the mechanism and necessary credentials for accessing said data.
Data is accessed from a remote location without being copied into the platform.
Attributes
----------
id : str
:class:`.IngestionConnection`\\ 's unique id
source_id : str
The unique ID of the :class:`.Source` to which this :class:`.IngestionConnection` belongs
source : Source
The :class:`.Source` associated with this :class:`.IngestionConnection`. A :class:`.Source` or ``source_id`` can be provided but not both.
user_id : str
The unique :class:`.User` ID of the user who created this :class:`.IngestionConnection`
label : str
The descriptive label for this :class:`.IngestionConnection`
description : Optional[str] = None
An optional extended description for this :class:`.IngestionConnection`
portal : ConnectionPortalType
The method of data access employed by this :class:`.IngestionConnection`
url : str
A canonical URL that points to the location of data resources within the portal
warehouse : Optional[str]
Virtualized datasets created using this :class:`.IngestionConnection` will always access data from this :class:`.Warehouse` (must be suplied for virtualization).
Non-virtualized datasets created using this :class:`.IngestionConnection` will ingest to this :class:`.Warehouse` by default (can be overriden at ingest time).
credential_id : Optional[str]
The :class:`.Credential` ID that should be used along with the portal to access :class:`.Dataset`\\ s when ingesting. Omitted when virtualizing.
credential : Optional[credential.Credential]
The :class:`.Credential` associated with this :class:`.IngestionConnection`. Omitted when virtualizing. A :class:`.Credential` or ``credential_id`` can be provided but not both.
default_schema: str
The schema to search for tables and views
metrics_collection_schedules : Optional[List[ConnectionSchedule]]
Optional :class:`.ConnectionSchedule`\\ s which, when specified, indicate the frequency with which to
re-analyze virtualized data. Specific :class:`.Dataset`\\ s using this :class:`.VirtualizationConnection`
may override this set of :class:`.ConnectionSchedule`\\ s.
disabled : Optional[bool]
When true, disables the schedule on this :class:`.IngestionConnection`. The :class:`.IngestionConnection` itself can still be used for manual ingestion or data virtualization.
created_at : datetime
The datetime at which this :class:`.IngestionConnection` was created
updated_at : datetime
The datetime at which this :class:`.IngestionConnection` was last updated
"""
default_schema: Optional[str]
metrics_collection_schedules: Optional[List[ConnectionSchedule]]