Source code for tdw_catalog.source

from typing import TYPE_CHECKING, List, Optional, Union
from tdw_catalog import connection, credential, warehouse
from tdw_catalog.entity import Entity, Property, EntityBase
from tdw_catalog.errors import CatalogException, _convert_error, _raise_error

from datetime import datetime
from tdw_catalog.relations import _OrganizationRelation

from tdw_catalog.utils import ConnectionPortalType, ListConnectionsFilter

if TYPE_CHECKING:
    import tdw_catalog.organization as organization


[docs]@Entity([ Property("id", str, serialize=True), Property("organization_id", str, relation="tdw_catalog.organization.Organization", serialize=True), Property("user_id", str, serialize=True), Property("label", str, writable=True), Property("description", str, writable=True), Property("created_at", datetime), Property("updated_at", datetime) ]) class Source(EntityBase, _OrganizationRelation): """ A :class:`.Source` is used to semantically group a set of related :class:`.Dataset`\\ s. :class:`.User`\\ s are free to label a :class:`.Source` in a descriptive way to best understand the meaning behind this grouping. Attributes ---------- id : str Source's unique id organization : Organization The :class:`.Organization`associated with this :class:`.Source`. An :class:`.Organization` or ``organization_id`` can be provided but not both. organization_id : str The unique ID of the :class:`.Organization` to which this :class:`.Source` belongs user_id : str The unique user ID of the :class:`.OrganizationMember` who created this :class:`.Source` label : str A descriptive label for this :class:`.Source` description : Optional[str] = None An optional extended description for this :class:`.Source` created_at : datetime The datetime at which this :class:`.Source` was created updated_at : datetime The datetime at which this :class:`.Source` was last updated """ id: str organization: 'organization.Organization' organization_id: str user_id: str label: str description: str created_at: datetime updated_at: datetime def __str__(self) -> str: return f'<Source id={self._id} label={self.label}>'
[docs] @classmethod def get(cls, client, organization_id: str, id: str): """ Retrieve a :class:`.Source` belonging to this :class:`.Organization` Parameters ---------- client : Catalog The :class:`.Catalog` client to use to get the :class:`.Source` organization_id : str The :class:`.Organization`ID the :class:`.Source` belongs to id : str The unique ID of the :class:`.Source` Returns ------- Source The :class:`.Source` associated with the given ID Raises ------ CatalogInternalException If call to the :class:`.Catalog` server fails CatalogNotFoundException If the :class:`.Source` with the supplied ID could not be found CatalogPermissionDeniedException If the caller is not allowed to retrieve this :class:`.Source` """ try: res = client._get_source(id=id, organization_id=organization_id) return cls(client, **res) except Exception as e: err = _raise_error(e, "Unable to fetch Source {}".format(id))
[docs] def save(self) -> None: """ Update this :class:`.Source`, saving any changes to its fields Raises ------ CatalogPermissionDeniedException If the caller is not allowed to update this :class:`.Source` CatalogException If call to the :class:`.Catalog` server fails """ try: res = self._client._update_source(source=self.serialize()) self.deserialize(res) except Exception as e: raise _convert_error(e)
[docs] def delete(self) -> None: """ Delete this :class:`.Source`. This :class:`.Source` object should not be used after `delete()` has successfully returned Raises ------ CatalogPermissionDeniedException If the caller is not allowed to delete this :class:`.Source` CatalogException If call to the :class:`.Catalog` server fails """ try: self._client._delete_source(source=self.serialize()) except Exception as e: raise _convert_error(e)
[docs] def list_connections( self, filter: Optional[ListConnectionsFilter] = None ) -> List[Union['connection.IngestionConnection', 'connection.VirtualizationConnection']]: """ List all :class:`.IngestionConnection` and :class:`.VirtualizationConnection`\\ s belonging to this :class:`.Source` Parameters ---------- filter : Optional[ListConnectionsFilter] An optional filter on the returned Connection list, useful for pagination of results. Note that the `organization_id` and source_ids properties will be set automatically to this :class:`.Organization` and Source. Returns ------- List[Connection] The list of Connections in this :class:`.Source` Raises ------ CatalogPermissionDeniedException If the caller is not allowed to list Connections in this :class:`.Organization` CatalogException If call to the :class:`.Catalog` server fails """ try: f = filter if f is None: f = ListConnectionsFilter() f.organization_id = self.organization_id f.source_ids = [self.id] org_connections = self._client._list_connections(filter=f) return list( map( lambda org_connection: connection.IngestionConnection( client=self._client, **org_connection) if org_connection["portal"] == ConnectionPortalType. EXTERNAL else connection.VirtualizationConnection( client=self._client, **org_connection), org_connections, )) except Exception as e: raise _convert_error(e)
def _create_connection( self, label: str, portal: ConnectionPortalType, url: Optional[str] = None, description: Optional[str] = None, warehouse: Optional['warehouse.Warehouse'] = None, default_schema: Optional[str] = None, credential: Optional['credential.Credential'] = None, ingest_schedules: Optional[ List['connection.ConnectionSchedule']] = None, ): try: create_args = { 'source_id': self.id, 'label': label, 'portal': portal, } if url is not None: create_args['url'] = url if description is not None: create_args['description'] = description if credential is not None: create_args['credential_id'] = credential.id if warehouse is not None: create_args['warehouse'] = warehouse.name if default_schema is not None: create_args['default_schema'] = default_schema if ingest_schedules is not None: create_args['ingest_schedules'] = [ s.serialize() for s in ingest_schedules ] res = self._client._create_connection(**create_args) if portal == ConnectionPortalType.EXTERNAL: return connection.VirtualizationConnection( client=self._client, **res, ) else: return connection.IngestionConnection( client=self._client, **res, ) except Exception as e: raise _convert_error(e)
[docs] def create_ingestion_connection( self, label: str, portal: ConnectionPortalType, url: Optional[str] = None, description: Optional[str] = None, warehouse: Optional['warehouse.Warehouse'] = None, credential: Optional['credential.Credential'] = None, ingest_schedules: Optional[ List['connection.ConnectionSchedule']] = None, ) -> 'connection.IngestionConnection': """ Create an :class:`.IngestionConnection` within this :class:`.Source` Parameters ---------- label : str The descriptive label for this :class:`.IngestionConnection` portal : ConnectionPortalType The method of data access employed by this :class:`.IngestionConnection` url : Optional[str] A canonical URL that points to the location of data resources within the portal description : Optional[str] = None An optional extended description for this :class:`.IngestionConnection` warehouse : Optional[Warehouse] :class:`.Dataset`\\ s created using this :class:`.IngestionConnection` will ingest to this :class:`.Warehouse` by default (can be overriden at ingest time). credential : Optional[Credential] The :class:`.Credential` associated with this :class:`.IngestionConnection`. ingest_schedules : Optional[List[ConnectionSchedule]] Optional :class:`.ConnectionSchedule`\\ s which, when specified, indicate the frequency with which to reingest ingested data. Specific Datasets using this :class:`.IngestionConnection` may override this set of Schedules. Returns ------- IngestionConnection The newly created IngestionConnection Raises ------ CatalogPermissionDeniedException If the caller is not allowed to create :class:`.IngestionConnection`\\ s in this :class:`.Organization` CatalogException If call to the :class:`.Catalog` server fails """ return self._create_connection(label, portal, url, description, warehouse, None, credential, ingest_schedules)
# TODO change to warehouse creation builder, which builds appropriate params # for CreateWarehouseRequest, and remove warehouse argument (pending # the launch of warehouse self-serve) def _create_virtualization_connection( self, label: str, default_schema: str, description: Optional[str] = None, warehouse: Optional['warehouse.Warehouse'] = None, credential: Optional['credential.Credential'] = None, metrics_collection_schedules: Optional[ List['connection.ConnectionSchedule']] = None, ) -> 'connection.IngestionConnection': """ Create a :class:`.VirtualizationConnection` within this :class:`.Source` Parameters ---------- label : str The descriptive label for this :class:`.VirtualizationConnection` default_schema: string The schema to search for tables and views when this :class:`.VirtualizationConnection` is used for data virtualization. description : Optional[str] = None An optional extended description for this :class:`.VirtualizationConnection` warehouse : Optional[Warehouse] :class:`.Dataset`\\ s created using this :class:`.VirtualizationConnection` will always access data from this :class:`.Warehouse` credential : Optional[Credential] The :class:`.Credential` associated with this :class:`.VirtualizationConnection`. Omitted when virtualizing. A :class:`.Credential` or ``credential_id`` can be provided but not both. metrics_collection_schedules : Optional[List[ConnectionSchedule]] Optional :class:`.ConnectionSchedule`\\ s which, when specified, indicate the frequency with which to re-analyze virtualized data. Specific Datasets using this :class:`.VirtualizationConnection` may override this set of Schedules. Returns ------- VirtualizationConnection The newly created VirtualizationConnection Raises ------ CatalogPermissionDeniedException If the caller is not allowed to create :class:`.VirtualizationConnection`\\ s in this :class:`.Organization` CatalogException If call to the :class:`.Catalog` server fails """ return self._create_connection( label=label, portal=ConnectionPortalType.EXTERNAL, url=None, description=description, warehouse=warehouse, default_schema=default_schema, credential=credential, ingest_schedules=metrics_collection_schedules, )