from datetime import datetime
from tdw_catalog import connection, glossary_term
from tdw_catalog.entity import Entity, EntityBase, Property
from tdw_catalog.list_datasets import _ListDatasetsIterator, Filter as ListOrganizationDatasetsFilter
import tdw_catalog.metadata_template.creation_builder as creation_builder
import tdw_catalog.metadata_template as metadata_template
from tdw_catalog.utils import ConnectionPortalType, LegacyFilter, ListConnectionsFilter, ListGlossaryTermsFilter, ListSourcesFilter
import tdw_catalog.dataset as dataset
import tdw_catalog.source as source
import tdw_catalog.topic as topic
import tdw_catalog.organization_member as organization_member
import tdw_catalog.team as team
import tdw_catalog.warehouse as warehouse
import tdw_catalog.credential as credential
import tdw_catalog.organization_utils as organization_utils
import tdw_catalog.lineage.dataset_relationship as dataset_relationship
import tdw_catalog.lineage.column_relationship as column_relationship
from typing import Iterator, Optional, List, Union
from tdw_catalog.errors import CatalogException, _convert_error, _raise_error
[docs]@Entity([
Property("id", str, serialize=True),
Property("title", str, writable=True),
Property("created_at", datetime),
Property("updated_at", datetime)
])
class Organization(EntityBase):
"""
:class:`.Organization`\\ s are the primary entrypoints to a Data :class:`.Catalog`,
containing and linking together :class:`.OrganizationMember`\\ s, :class:`.Team`\\ s, :class:`.Dataset`\\ s, etc..
Attributes
----------
title : str
The name of the :class:`.Organization`
created_at : datetime
The datetime at which this :class:`.Organization` was created
updated_at : datetime
The datetime at which this :class:`.Organization` was last updated
"""
id: str
title: str
created_at: datetime
updated_at: datetime
def __str__(self) -> str:
return f"<Organization id={self.id} title={self.title}>"
@classmethod
def get(cls, client, id: str):
try:
res = client._get_organization(id=id)
return cls(client, **res)
except Exception as e:
err = _raise_error(e, "Unable to fetch Organization {}".format(id))
[docs] def save(self) -> None:
"""
Update this :class:`.Organization`, saving any changes to its title
Parameters
----------
None
Returns
-------
None
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to update this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._update_organization(
organization=self.serialize())
self.deserialize(res)
except Exception as e:
raise _convert_error(e)
[docs] def delete(self) -> None:
"""
Delete this :class:`.Organization`. This :class:`.Organization` object should not be
used after `delete()` has successfully returned, as the :class:`.Catalog`
organization it represents will no longer exist.
Parameters
----------
None
Returns
-------
None
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to delete this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
self._client._delete_organization(organization=self.serialize())
except Exception as e:
raise _convert_error(e)
[docs] def list_members(
self,
filter: Optional[LegacyFilter] = None
) -> List[organization_member.OrganizationMember]:
"""
Retrieve all :class:`.OrganizationMember`\\ s of this :class:`.Organization`
Parameters
----------
None
Returns
-------
List[OrganizationMember]
The :class:`.OrganizationMember`\\ s which are a member of this :class:`.Organization`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list :class:`.OrganizationMember`\\ s
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
org_members = self._client._list_organization_members(
organization_id=self.id, filter=filter)
return [
organization_member.OrganizationMember(self._client, **m)
for m in org_members
]
except Exception as e:
raise _convert_error(e)
[docs] def get_member(self,
user_id: str) -> organization_member.OrganizationMember:
"""
Retrieve the a specific member (:class:`.User`) of this :class:`.Organization`
Parameters
----------
user_id : str
The unique :class:`.User` ID of the :class:`.OrganizationMember`
Returns
-------
OrganizationMember
The :class:`.OrganizationMember` with the given :class:`.User` ID
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to fetch :class:`.OrganizationMember`\\ s
CatalogInvalidArgumentException
If the given :class:`.User` ID does not exist or is not a member of this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
return organization_member.OrganizationMember.get(
self._client, self.id, user_id)
[docs] def invite_member(
self,
user_id: str,
roles: organization_member.OrganizationMemberRoles = None
) -> organization_member.OrganizationMember:
"""
Invite the given :class:`.User` to be an :class:`.OrganizationMember` of this :class:`.Organization`
Parameters
----------
user_id : str
The unique :class:`.User` ID of the invitee
roles : organization_member.OrganizationMemberRoles
The membership roles for the :class:`.User`. All roles default to false.
Returns
-------
OrganizationMember
The newly created :class:`.OrganizationMember`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to invite :class:`.OrganizationMember`\\ s
CatalogAlreadyExistsException
If the caller is inviting a :class:`.User` who is already an :class:`.OrganizationMember` of this :class:`.Organization`
CatalogInvalidArgumentException
If the given :class:`.User` ID does not exist
CatalogException
If call to the :class:`.Catalog` server fails
"""
if roles is None:
roles = organization_member.OrganizationMemberRoles()
try:
member = organization_member.OrganizationMember(
self._client,
user_id=user_id,
organization_id=self.id,
roles=roles)
res = self._client._add_organization_member(
member=member.serialize())
member.deserialize(res)
return member
except Exception as e:
raise _convert_error(e)
[docs] def invite_members(
self,
emails: List[str],
invite_message: Optional[str] = "",
raise_on_failure: Optional[bool] = False,
roles: Optional[organization_member.OrganizationMemberRoles] = None,
) -> organization_utils.InviteMembersResponse:
"""
Invite the given :class:`.User`\\ (s) to become :class:`.OrganizationMember`\\ s of this :class:`.Organization`.
If a given email does not correspond to an existing :class:`.User`, an invitation to the
:class:`.Catalog` platform will be sent via email.
Parameters
----------
emails
The list of email addresses of the invitees.
invite_message : Optional[str]
The message to send the users when sending the invitation
raise_on_failure : Optional[bool]
Whether to raise an exception on a failure of any one invite
roles : Optional[OrganizationMemberRoles]
The roles the new members will take when invited
Returns
-------
InviteMembersResponse
This contains a summary of the successful and failed invitations.
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to invite :class:`.OrganizationMember` s
CatalogException
If call to the :class:`.Catalog` server fails
"""
ret = organization_utils.InviteMembersResponse([], [])
if roles is None:
roles = organization_member.OrganizationMemberRoles()
roles_serialized = roles.serialize()
invites = [{"email": email, **roles_serialized} for email in emails]
try:
res = self._client._add_organization_members(
invitations=invites,
organization_id=self.id,
invite_message=invite_message)
except Exception as e:
raise _convert_error(e)
# Looks like there is a scenario this returns a list??
if isinstance(res, list):
res = {"invitees": res, "failed_invitations": 0}
ret.successful_invitations = [
organization_member.OrganizationMember(self._client,
email=invited["email"],
**invited["member"])
for invited in res["invitees"] if "error_message" not in invited
]
ret.failed_invitations = [
organization_utils.InviteMembersResponseFailedInvitation(
email=invited["email"], error_message=invited["error_message"])
for invited in res["invitees"] if "error_message" in invited
]
if raise_on_failure and len(ret.failed_invitations) > 0:
raise CatalogException(
message=f"{len(ret.failed_invitations)} invitations failed",
meta=list(ret.failed_invitations))
return ret
[docs] def get_team(self, team_id: str) -> team.Team:
"""
Retrieve the given :class:`.Team` from this :class:`.Organization`
Parameters
----------
team_id : str
The unique ID of the :class:`.Team`
Returns
-------
Team
The :class:`.Team` with the given ID
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to retrieve :class:`.Team` s from this :class:`.Organization`
CatalogInvalidArgumentException
If the given :class:`.Team` ID does not exist
CatalogException
If call to the :class:`.Catalog` server fails
"""
return team.Team.get(self._client, self.id, team_id)
[docs] def list_teams(self,
organization_ids=None,
filter: Optional[LegacyFilter] = None) -> List[team.Team]:
"""
List all Teams in this :class:`.Organization`
Parameters
----------
filter : Optional[LegacyFilter]
An optional filter on the returned :class:`.Team` list, useful for pagination of results
Returns
-------
List[Team]
The list of :class:`.Team` s in this :class:`.Organization`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list :class:`.Team` s in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
org_teams = self._client._list_groups(organization_ids=[self.id],
filter=filter)
return list(
map(
lambda org_team: team.Team(client=self._client,
id=org_team["id"],
organization_id=org_team[
"organization_id"],
title=org_team["title"]),
org_teams,
))
except Exception as e:
raise _convert_error(e)
[docs] def create_team(self, title: str) -> team.Team:
"""
Create a :class:`.Team` within this :class:`.Organization`
Parameters
----------
title: str
The name of the new :class:`.Team`
Returns
-------
Team
The newly created :class:`.Team`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.Team` s in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._create_group(group={
"organization_id": self.id,
"title": title
})
return team.Team(
client=self._client,
id=res["id"],
organization_id=res["organization_id"],
title=res["title"],
)
except Exception as e:
raise _convert_error(e)
[docs] def get_topic(self, id: str) -> topic.Topic:
"""
Retrieve the given :class:`.Topic` from this :class:`.Organization`
Parameters
----------
id : str
The unique ID of the :class:`.Topic`
Returns
-------
Topic
The :class:`.Topic` with the given ID
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to retrieve :class:`.Topic`\\ s from this :class:`.Organization`, or if the given :class:`.Topic` ID does not exist
CatalogException
If call to the :class:`.Catalog` server fails
"""
# Topic's constructor will fill in title automatically
return topic.Topic.get(self._client, self.id, id)
[docs] def list_topics(self, filter: LegacyFilter = None) -> List[topic.Topic]:
"""
List all :class:`.Topic`\\ s in this :class:`.Organization`
Parameters
----------
filter : Optional[LegacyFilter]
An optional filter on the returned :class:`.Topic` list, useful for pagination of results
Returns
-------
List[Topic]
The list of :class:`.Topic`\\ s in this :class:`.Organization`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list :class:`.Topic`\\ s in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
org_topics = self._client._list_topics(organization_id=self.id,
filter=filter)
return list(
map(
lambda org_topic: topic.Topic(client=self._client,
**org_topic),
org_topics,
))
except Exception as e:
raise _convert_error(e)
[docs] def create_topic(self, title: str) -> topic.Topic:
"""
Create a :class:`.Topic` within this :class:`.Organization`
Parameters
----------
title: str
The name of the new :class:`.Topic`
Returns
-------
Topic
The newly created :class:`.Topic`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.Topic`\\ s in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._create_topic(topic={
"organization_id": self.id,
"title": title
})
return topic.Topic(self._client, **res)
except Exception as e:
raise _convert_error(e)
[docs] def get_credential(self, credential_id: str) -> credential.Credential:
"""
Retrieve a :class:`.Credential` belonging to this :class:`.Organization`
Parameters
----------
credential_id : str
The unique ID of the :class:`.Credential`
Returns
-------
Credential
The :class:`.Credential` associated with the given ID
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to retrieve :class:`.Credential`\\ s
CatalogNotFoundException
If the given :class:`.Credential` ID does not exist
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._get_credential(id=credential_id,
organization_id=self.id)
return credential.CredentialFactory(self._client, self.id).get(res)
except Exception as e:
raise _convert_error(e)
[docs] def create_credential(self) -> credential.CredentialFactory:
"""
Provides a :class:`.CredentialFactory` which is capable of creating :class:`.Credential`\\ s
within this :class:`.Organization`.
Parameters
----------
Returns
-------
CredentialFactory
A factory for creating specific types of :class:`.Credential`\\ s
"""
return credential.CredentialFactory(client=self._client,
organization_id=self.id)
[docs] def list_credentials(self,
filter: LegacyFilter = None
) -> List[credential.Credential]:
"""
List :class:`.Credential`\\ s which belong to the given :class:`.Organization`
Returns
-------
List[Credential]
:class:`.Credential`\\ s created under this :class:`.Organization`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list :class:`.Credential`\\ s
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
factory = credential.CredentialFactory(self._client, self.id)
res = self._client._list_credentials(organization_id=self.id)
return [factory.get(cred) for cred in res]
except Exception as e:
raise _convert_error(e)
[docs] def get_source(self, id: str) -> source.Source:
"""
Retrieve a :class:`.Source` belonging to this :class:`.Organization`
Parameters
----------
id : str
The unique ID of the :class:`.Source`
Returns
-------
Source
The :class:`.Source` associated with the given ID
Raises
------
CatalogInternalException
If call to the :class:`.Catalog` server fails
CatalogNotFoundException
If the :class:`.Source` with the supplied ID could not be found
CatalogPermissionDeniedException
If the caller is not allowed to retrieve the given :class:`.Source`
"""
return source.Source.get(self._client, self.id, id)
[docs] def list_sources(
self,
filter: Optional[ListSourcesFilter] = None) -> List[source.Source]:
"""
List Sources which belong to the given :class:`.Organization`
Parameters
----------
filter: : SourcesFilter
The :class:`.SourceFilter` to be used when performing the search
Returns
-------
List[Source]
:class:`.Source`\\ s created under this :class:`.Organization`
Raises
------
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._list_sources(organization_id=self.id,
filter=filter)
# Result seems to respond with the total count if nothing is available. That's odd because the RPC doesn't
# do that. Anyway, this protects against that from happening
if isinstance(res, dict) and "sources" in res:
return list(
map(
lambda source_res: source.Source(
self._client, **source_res), res["sources"]))
else:
return []
except Exception as e:
raise _convert_error(e)
[docs] def create_source(
self,
label: str,
description: Optional[str] = None,
) -> source.Source:
"""
Create a :class:`.Source` within this :class:`.Organization`
Parameters
----------
label : str
A descriptive label for the :class:`.Source`
description : Optional[str] = None
The description of the :class:`.Source`
Returns
-------
Source:
The newly created :class:`.Source`
Raises
------
CatalogInternalException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._create_source(
source={
"organization_id": self.id,
"user_id": self._client.profile["id"],
"label": label,
"description": description,
})
return source.Source(self._client, **res)
except Exception as e:
raise _convert_error(e)
[docs] def list_target_warehouses(self) -> List['warehouse.TargetWarehouse']:
"""
Retrieve the list of known :class:`.TargetWarehouse`\\ s available to this :class:`.Organization`
Parameters
----------
None
Returns
-------
List[TargetWarehouse]
:class:`.TargetWarehouse`\\ s that are available to this :class:`.Organization`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list :class:`.TargetWarehouse`\\ s (the caller must be an :class:`.Organization` admin, or have :class:`.Dataset` creation privileges)
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
server_warehouses = self._client._list_warehouses(
organization_id=self.id)
target_warhouses = filter(
lambda w: "external" not in w or not w["external"],
server_warehouses)
return list(
map(
lambda w: warehouse.TargetWarehouse(client=self._client,
**w),
target_warhouses))
except Exception as e:
raise _convert_error(e)
[docs] def list_external_warehouses(self) -> List['warehouse.ExternalWarehouse']:
"""
Retrieve the list of known :class:`.ExternalWarehouse`\\ s available to this :class:`.Organization`
Parameters
----------
None
Returns
-------
List[ExternalWarehouse]
:class:`.ExternalWarehouse`\\ s that are available to this :class:`.Organization`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list :class:`.ExternalWarehouse`\\ s (the caller must be an :class:`.Organization` admin, or have :class:`.Dataset` creation privileges)
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
server_warehouses = self._client._list_warehouses(
organization_id=self.id)
external_warhouses = filter(
lambda w: "external" in w and w["external"], server_warehouses)
return list(
map(
lambda w: warehouse.ExternalWarehouse(client=self._client,
**w),
external_warhouses))
except Exception as e:
raise _convert_error(e)
[docs] def get_connection(
self, id: str
) -> Union['connection.IngestionConnection',
'connection.VirtualizationConnection']:
"""
Retrieve the given :class:`.IngestionConnection` or :class:`.VirtualizationConnection` from this :class:`.Organization`
Parameters
----------
team_id : str
The unique ID of the Connection
Returns
-------
Union[IngestionConnection,VirtualizationConnection]
The Connection with the given ID
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to retrieve Connections from this :class:`.Organization`
CatalogNotFoundException
If the given Connection ID does not exist
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
return connection._Connection.get(self._client, id)
except Exception as e:
raise _convert_error(e)
[docs] def list_connections(
self,
filter: Optional[ListConnectionsFilter] = None
) -> List[Union['connection.IngestionConnection',
'connection.VirtualizationConnection']]:
"""
List all :class:`.VirtualizationConnection` and :class:`.IngestionConnection`\\ s in this :class:`.Organization`
Parameters
----------
filter : Optional[Filter]
An optional Filter on the returned Connection list, useful for pagination of results.
Note that the `organization_id` property will be set automatically to this :class:`.Organization`.
Returns
-------
List[Union[IngestionConnection,VirtualizationConnection]]
The list of Connections in this :class:`.Organization`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list Connections in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
f = filter
if f is None:
f = ListConnectionsFilter()
f.organization_id = self.id
org_connections = self._client._list_connections(filter=f)
return list(
map(
lambda org_connection: connection.IngestionConnection(
client=self._client, **org_connection)
if org_connection["portal"] == ConnectionPortalType.
EXTERNAL else connection.VirtualizationConnection(
client=self._client, **org_connection),
org_connections,
))
except Exception as e:
raise _convert_error(e)
[docs] def create_dataset(self,
source: 'source.Source',
title: str,
description: Optional[str] = None) -> 'dataset.Dataset':
"""
Creates a new :class:`.Dataset` within this :class:`.Organization`. The :class:`.Dataset`
will have a title and (optionally) a description, and must be
associated with a :class:`.Source`. The :class:`.Dataset` will otherwise be empty
and can be subsequently populated with metadata and data.
Parameters
----------
source: source.Source
The :class:`.Source` to associated with the new :class:`.Dataset`
title : str
A title for the new :class:`.Dataset`
description: Optional[str]
An optional description for the new :class:`.Dataset` (markdown supported)
Returns
-------
Dataset
The newly created :class:`.Dataset`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.Dataset`\\ s in this :class:`.Organization`
CatalogInvalidArgumentException
If title is an empty string, or if the :class:`.Source` belongs to a different
:class:`.Organization` than this one
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._create_dataset(organization_id=self.id,
source_id=source.id,
title=title,
description=description)
d = dataset.Dataset(self._client, **res)
d._context_organization = self
return d
except Exception as e:
raise _convert_error(e)
[docs] def get_dataset(
self,
id: str) -> 'Union[dataset.Dataset, dataset.ConnectedDataset]':
"""
Retrieve the given :class:`.Dataset` from this :class:`.Organization`
Parameters
----------
id : str
The unique ID of the :class:`.Dataset`
Returns
-------
Dataset
The :class:`.Dataset` with the given ID
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to retrieve the given :class:`.Dataset`, or if the :class:`.Dataset` does not exist
CatalogInvalidArgumentException
If the given :class:`.Dataset` ID is not a valid v4 UUID
CatalogException
If call to the :class:`.Catalog` server fails
"""
return dataset.Dataset.get(self._client, id, self)
[docs] def create_glossary_term(
self,
title: str,
description: Optional[str] = None) -> glossary_term.GlossaryTerm:
"""
Create a :class:`.GlossaryTerm` within this :class:`.Organization`
Parameters
----------
title: str
The name of the new :class:`.GlossaryTerm`
description: Optional[str]
The description of the new :class:`.GlossaryTerm`
Returns
-------
GlossaryTerm
The newly created :class:`.GlossaryTerm`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.GlossaryTerm`\\ s in this :class:`.Organization`
CatalogInvalidArgumentException
If title is an empty string
CatalogAlreadyExistsException
If a :class:`.GlossaryTerm` with the provided title already exists in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._create_glossary_term(organization_id=self.id,
title=title,
description=description)
return glossary_term.GlossaryTerm(self._client, **res)
except Exception as e:
raise _convert_error(e)
[docs] def get_glossary_term(self, id: str) -> glossary_term.GlossaryTerm:
"""
Retrieve the given :class:`.GlossaryTerm` from this :class:`.Organization`
Parameters
----------
id : str
The unique ID of the :class:`.GlossaryTerm`
Returns
-------
GlossaryTerm
The :class:`.GlossaryTerm` with the given ID
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to retrieve :class:`.GlossaryTerm`\\ s from this :class:`.Organization`, or if the given :class:`.GlossaryTerm` ID does not exist
CatalogException
If call to the :class:`.Catalog` server fails
"""
# GlossaryTerm's constructor will fill in title automatically
return glossary_term.GlossaryTerm.get(self._client, id)
[docs] def list_glossary_terms(
self,
organization_ids: Optional[List[str]] = None,
filter: Optional[ListGlossaryTermsFilter] = None
) -> List[glossary_term.GlossaryTerm]:
"""
List all :class:`.GlossaryTerm`\\ s in this :class:`.Organization`
Parameters
----------
organization_ids: Optional[List[str]]
An optional list of :class:`.Organization` ID's to list `GlossaryTerm`\\ s from multiple :class:`.Organization`\\ s
filter : Optional[ListGlossaryTermsFilter]
An optional :class:`.ListGlossaryTermsFilter` on the returned :class:`.GlossaryTerm` list, useful for pagination of results
Returns
-------
List[GlossaryTerm]
The list of :class:`.GlossaryTerm`\\ s in this :class:`.Organization`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list :class:`.GlossaryTerm`\\ s in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
org_glossary_terms = self._client._list_glossary_terms(
organization_ids=organization_ids
if organization_ids is not None else [self.id],
filter=filter)
return list(
map(
lambda org_glossary_terms: glossary_term.GlossaryTerm(
client=self._client, **org_glossary_terms),
org_glossary_terms,
))
except Exception as e:
raise _convert_error(e)
[docs] def list_datasets(
self,
filter: Optional[ListOrganizationDatasetsFilter] = None
) -> 'Iterator[dataset.Dataset]':
"""
Retrieve the list of :class:`.Dataset`\\ s which belong to the :class:`.Organization`.
The maximum number of results is limited, and must be paginated via the ``filter`` to obtain
additional results.
Parameters
----------
filter : Optional[list_datasets.Filter]
An optional filter on the returned :class:`.Dataset`\\ s (`None` by default)
Returns
-------
Iterator[Dataset]
An Iterator of :class:`.Dataset`\\ s belonging to this :class:`.Organization`, which
are lazily fetched as the Iterator is iterated.
Raises
------
CatalogPermissionDeniedException
If the caller does not have permission to list :class:`.Dataset`\\ s
CatalogException
If there is an issue communicating with the :class:`.Catalog` server, or an issue with the server itself
"""
try:
res = self._client._list_organization_datasets(
organization_id=self.id, filter=filter)
datasets = res["datasets"] if "datasets" in res else []
return _ListDatasetsIterator(self._client,
[d["id"] for d in datasets])
except Exception as e:
raise _convert_error(e)
[docs] def create_lineage(
self,
upstream_dataset: 'dataset.Dataset',
downstream_dataset: 'dataset.Dataset',
label: str,
description: Optional[str] = None,
column_lineage: List[tuple[Union[str, List[str]],
Union[str, List[str]]]] = [],
) -> 'dataset_relationship.DatasetLineageRelationship':
"""
Create a :class:`.DatasetLineageRelationship` within this :class:`.Organization`.
Each relationship describes a single source and destination :class:`.Dataset` (a
single "edge" in the lineage graph), with optional column-level lineage.
Branching (or many-to-many) relationships can be modelled by decomposing them into
their individual edges.
Parameters
----------
upstream_dataset : Dataset
The source dataset involved in this :class:`.DatasetLineageRelationship`
downstream_dataset : Dataset
The destination dataset involved in this :class:`.DatasetLineageRelationship`
label : str
A label describing this :class:`.DatasetLineageRelationship`
description : Optional[str]
An optional description providing further details about this :class:`.DatasetLineageRelationship`
column_lineage : List[tuple[Union[str, List[str]],Union[str,List[str]]]]
An optional list of column-level associations between the two :class:`.Dataset`\\ s, specified
as tuples. Each tuple is a single column-level relationship between a list of upstream columns
and a list of downstream columns. This argument defaults to the empty ``List`` if not supplied.
Example: ``[("address", ["street_number","street_name","city"])]``
Returns
-------
DatasetLineageRelationship
The newly created :class:`.DatasetLineageRelationship`
Raises
------
CatalogInvalidArgumentException
If any specified column names within provided column lineage do not actually exist in the provided :class:`.Dataset`\\ s
CatalogPermissionDeniedException
If the caller is not allowed to define lineage in this :class:`.Organization`,
or if they do not have access to one of the involved :class:`.Dataset`\\ s
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
import tdw_catalog.lineage.dataset_relationship as dataset_relationship
res = self._client._create_dataset_lineage(
upstream_dataset_id=upstream_dataset.id,
downstream_dataset_id=downstream_dataset.id,
label=label,
description=description,
organization_id=self.id,
column_lineage_relationships=[
c.serialize()
for c in column_relationship._tuples_to_column_lineage(
column_lineage, upstream_dataset, downstream_dataset)
])
# we use get, because it returns column lineage as well
return dataset_relationship.DatasetLineageRelationship.get(
self._client, res["id"])
except Exception as e:
raise _convert_error(e)
[docs] def create_or_replace_lineage(
self,
upstream_dataset: 'dataset.Dataset',
downstream_dataset: 'dataset.Dataset',
label: str,
description: Optional[str] = None,
column_lineage: List[tuple[List[str], List[str]]] = [],
) -> 'dataset_relationship.DatasetLineageRelationship':
"""
Create a :class:`.DatasetLineageRelationship` within this :class:`.Organization`.
Each relationship describes a single source and destination :class:`.Dataset` (a
single "edge" in the lineage graph), with optional column-level lineage.
Branching (or many-to-many) relationships can be modelled by decomposing them into
their individual edges.
If no relationships between the given :class:`.Dataset`\\ s
exist, one will be created. Unlike ``create_lineage``, pre-existing
relationships between the given :class:`.Dataset`\\ s will be cleared
and replaced by this one, facilitating easy one-way syncs from an external
lineage metdata source and the :class:`.Catalog` platform.
Parameters
----------
upstream_dataset : Dataset
The source dataset involved in this :class:`.DatasetLineageRelationship`
downstream_dataset : Dataset
The destination dataset involved in this :class:`.DatasetLineageRelationship`
label : str
A label describing this :class:`.DatasetLineageRelationship`
description : Optional[str]
An optional description providing further details about this :class:`.DatasetLineageRelationship`
column_lineage : List[tuple[Union[str, List[str]],Union[str,List[str]]]]
An optional list of column-level associations between the two :class:`.Dataset`\\ s, specified
as tuples. Each tuple is a single column-level relationship between a list of upstream columns
and a list of downstream columns. This argument defaults to the empty ``List`` if not supplied.
Example: ``[("address", ["street_number","street_name","city"])]``
Returns
-------
DatasetLineageRelationship
The newly created :class:`.DatasetLineageRelationship`
Raises
------
CatalogInvalidArgumentException
If any specified column names within provided column lineage do not actually exist in the provided :class:`.Dataset`\\ s
CatalogPermissionDeniedException
If the caller is not allowed to define lineage in this :class:`.Organization`,
or if they do not have access to one of the involved :class:`.Dataset`\\ s
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
import tdw_catalog.lineage.dataset_relationship as dataset_relationship
res = self._client._create_or_replace_dataset_lineage(
upstream_dataset_id=upstream_dataset.id,
downstream_dataset_id=downstream_dataset.id,
label=label,
description=description,
organization_id=self.id,
column_lineage_relationships=[
c.serialize()
for c in column_relationship._tuples_to_column_lineage(
column_lineage, upstream_dataset, downstream_dataset)
])
# we use get, because it returns column lineage as well
return dataset_relationship.DatasetLineageRelationship.get(
self._client, res["id"])
except Exception as e:
raise _convert_error(e)
[docs] def get_lineage(
self,
id: str) -> 'dataset_relationship.DatasetLineageRelationship':
"""
Retrieve the given :class:`.DatasetLineageRelationship` from this :class:`.Organization`
Parameters
----------
id : str
The unique ID of the :class:`.DatasetLineageRelationship`
Returns
-------
DatasetLineageRelationship
The :class:`.DatasetLineageRelationship` with the given ID
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to retrieve :class:`.DatasetLineageRelationship`\\ s from this :class:`.Organization`
CatalogInvalidArgumentException
If the given :class:`.DatasetLineageRelationship` ID does not exist
CatalogException
If call to the :class:`.Catalog` server fails
"""
return dataset_relationship.DatasetLineageRelationship.get(
self._client, id)