from typing import TYPE_CHECKING, List, Optional, Union
from tdw_catalog import connection, credential, warehouse
from tdw_catalog.entity import Entity, Property, EntityBase
from tdw_catalog.errors import CatalogException, _convert_error, _raise_error
from datetime import datetime
from tdw_catalog.relations import _OrganizationRelation
from tdw_catalog.utils import ConnectionPortalType, ListConnectionsFilter
if TYPE_CHECKING:
import tdw_catalog.organization as organization
[docs]@Entity([
Property("id", str, serialize=True),
Property("organization_id",
str,
relation="tdw_catalog.organization.Organization",
serialize=True),
Property("user_id", str, serialize=True),
Property("label", str, writable=True),
Property("description", str, writable=True),
Property("created_at", datetime),
Property("updated_at", datetime)
])
class Source(EntityBase, _OrganizationRelation):
"""
A :class:`.Source` is used to semantically group a set of related :class:`.Dataset`\\ s. :class:`.User`\\ s
are free to label a :class:`.Source` in a descriptive way to best understand the
meaning behind this grouping.
Attributes
----------
id : str
Source's unique id
organization : Organization
The :class:`.Organization`associated with this :class:`.Source`. An :class:`.Organization` or ``organization_id`` can be provided but not both.
organization_id : str
The unique ID of the :class:`.Organization` to which this :class:`.Source` belongs
user_id : str
The unique user ID of the :class:`.OrganizationMember` who created this :class:`.Source`
label : str
A descriptive label for this :class:`.Source`
description : Optional[str] = None
An optional extended description for this :class:`.Source`
created_at : datetime
The datetime at which this :class:`.Source` was created
updated_at : datetime
The datetime at which this :class:`.Source` was last updated
"""
id: str
organization: 'organization.Organization'
organization_id: str
user_id: str
label: str
description: str
created_at: datetime
updated_at: datetime
def __str__(self) -> str:
return f'<Source id={self._id} label={self.label}>'
[docs] @classmethod
def get(cls, client, organization_id: str, id: str):
"""
Retrieve a :class:`.Source` belonging to this :class:`.Organization`
Parameters
----------
client : Catalog
The :class:`.Catalog` client to use to get the :class:`.Source`
organization_id : str
The :class:`.Organization`ID the :class:`.Source` belongs to
id : str
The unique ID of the :class:`.Source`
Returns
-------
Source
The :class:`.Source` associated with the given ID
Raises
------
CatalogInternalException
If call to the :class:`.Catalog` server fails
CatalogNotFoundException
If the :class:`.Source` with the supplied ID could not be found
CatalogPermissionDeniedException
If the caller is not allowed to retrieve this :class:`.Source`
"""
try:
res = client._get_source(id=id, organization_id=organization_id)
return cls(client, **res)
except Exception as e:
err = _raise_error(e, "Unable to fetch Source {}".format(id))
[docs] def save(self) -> None:
"""
Update this :class:`.Source`, saving any changes to its fields
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to update this :class:`.Source`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._update_source(source=self.serialize())
self.deserialize(res)
except Exception as e:
raise _convert_error(e)
[docs] def delete(self) -> None:
"""
Delete this :class:`.Source`. This :class:`.Source` object should not be
used after `delete()` has successfully returned
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to delete this :class:`.Source`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
self._client._delete_source(source=self.serialize())
except Exception as e:
raise _convert_error(e)
[docs] def list_connections(
self,
filter: Optional[ListConnectionsFilter] = None
) -> List[Union['connection.IngestionConnection',
'connection.VirtualizationConnection']]:
"""
List all :class:`.IngestionConnection` and :class:`.VirtualizationConnection`\\ s belonging to this :class:`.Source`
Parameters
----------
filter : Optional[ListConnectionsFilter]
An optional filter on the returned Connection list, useful for pagination of results.
Note that the `organization_id` and source_ids properties will be set automatically
to this :class:`.Organization` and Source.
Returns
-------
List[Connection]
The list of Connections in this :class:`.Source`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list Connections in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
f = filter
if f is None:
f = ListConnectionsFilter()
f.organization_id = self.organization_id
f.source_ids = [self.id]
org_connections = self._client._list_connections(filter=f)
return list(
map(
lambda org_connection: connection.IngestionConnection(
client=self._client, **org_connection)
if org_connection["portal"] == ConnectionPortalType.
EXTERNAL else connection.VirtualizationConnection(
client=self._client, **org_connection),
org_connections,
))
except Exception as e:
raise _convert_error(e)
def _create_connection(
self,
label: str,
portal: ConnectionPortalType,
url: Optional[str] = None,
description: Optional[str] = None,
warehouse: Optional['warehouse.Warehouse'] = None,
default_schema: Optional[str] = None,
credential: Optional['credential.Credential'] = None,
ingest_schedules: Optional[
List['connection.ConnectionSchedule']] = None,
):
try:
create_args = {
'source_id': self.id,
'label': label,
'portal': portal,
}
if url is not None:
create_args['url'] = url
if description is not None:
create_args['description'] = description
if credential is not None:
create_args['credential_id'] = credential.id
if warehouse is not None:
create_args['warehouse'] = warehouse.name
if default_schema is not None:
create_args['default_schema'] = default_schema
if ingest_schedules is not None:
create_args['ingest_schedules'] = [
s.serialize() for s in ingest_schedules
]
res = self._client._create_connection(**create_args)
if portal == ConnectionPortalType.EXTERNAL:
return connection.VirtualizationConnection(
client=self._client,
**res,
)
else:
return connection.IngestionConnection(
client=self._client,
**res,
)
except Exception as e:
raise _convert_error(e)
[docs] def create_ingestion_connection(
self,
label: str,
portal: ConnectionPortalType,
url: Optional[str] = None,
description: Optional[str] = None,
warehouse: Optional['warehouse.Warehouse'] = None,
credential: Optional['credential.Credential'] = None,
ingest_schedules: Optional[
List['connection.ConnectionSchedule']] = None,
) -> 'connection.IngestionConnection':
"""
Create an :class:`.IngestionConnection` within this :class:`.Source`
Parameters
----------
label : str
The descriptive label for this :class:`.IngestionConnection`
portal : ConnectionPortalType
The method of data access employed by this :class:`.IngestionConnection`
url : Optional[str]
A canonical URL that points to the location of data resources within the portal
description : Optional[str] = None
An optional extended description for this :class:`.IngestionConnection`
warehouse : Optional[Warehouse]
:class:`.Dataset`\\ s created using this :class:`.IngestionConnection` will ingest to this :class:`.Warehouse` by default (can be overriden at ingest time).
credential : Optional[Credential]
The :class:`.Credential` associated with this :class:`.IngestionConnection`.
ingest_schedules : Optional[List[ConnectionSchedule]]
Optional :class:`.ConnectionSchedule`\\ s which, when specified, indicate the frequency with which to
reingest ingested data. Specific Datasets using this :class:`.IngestionConnection`
may override this set of Schedules.
Returns
-------
IngestionConnection
The newly created IngestionConnection
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.IngestionConnection`\\ s in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
return self._create_connection(label, portal, url, description,
warehouse, None, credential,
ingest_schedules)
# TODO change to warehouse creation builder, which builds appropriate params
# for CreateWarehouseRequest, and remove warehouse argument (pending
# the launch of warehouse self-serve)
def _create_virtualization_connection(
self,
label: str,
default_schema: str,
description: Optional[str] = None,
warehouse: Optional['warehouse.Warehouse'] = None,
credential: Optional['credential.Credential'] = None,
metrics_collection_schedules: Optional[
List['connection.ConnectionSchedule']] = None,
) -> 'connection.IngestionConnection':
"""
Create a :class:`.VirtualizationConnection` within this :class:`.Source`
Parameters
----------
label : str
The descriptive label for this :class:`.VirtualizationConnection`
default_schema: string
The schema to search for tables and views when this :class:`.VirtualizationConnection` is used for data virtualization.
description : Optional[str] = None
An optional extended description for this :class:`.VirtualizationConnection`
warehouse : Optional[Warehouse]
:class:`.Dataset`\\ s created using this :class:`.VirtualizationConnection` will always access data from this :class:`.Warehouse`
credential : Optional[Credential]
The :class:`.Credential` associated with this :class:`.VirtualizationConnection`. Omitted when virtualizing. A :class:`.Credential` or ``credential_id`` can be provided but not both.
metrics_collection_schedules : Optional[List[ConnectionSchedule]]
Optional :class:`.ConnectionSchedule`\\ s which, when specified, indicate the frequency with which to
re-analyze virtualized data. Specific Datasets using this :class:`.VirtualizationConnection`
may override this set of Schedules.
Returns
-------
VirtualizationConnection
The newly created VirtualizationConnection
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.VirtualizationConnection`\\ s in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
return self._create_connection(
label=label,
portal=ConnectionPortalType.EXTERNAL,
url=None,
description=description,
warehouse=warehouse,
default_schema=default_schema,
credential=credential,
ingest_schedules=metrics_collection_schedules,
)