from typing import TYPE_CHECKING, Optional
from tdw_catalog.entity import Entity, Property, EntityBase
from tdw_catalog.errors import CatalogException, _convert_error, _raise_error
from datetime import datetime
from tdw_catalog import Catalog
from tdw_catalog.relations import _OrganizationRelation
if TYPE_CHECKING:
import tdw_catalog.organization as organization
[docs]@Entity([
Property("id", str, serialize=True),
Property("organization_id",
str,
relation="tdw_catalog.organization.Organization",
serialize=True),
Property("user_id", str, serialize=True),
Property("name", str, writable=True),
Property("description", str, writable=True),
Property("created_at", datetime),
Property("updated_at", datetime)
])
class Credential(EntityBase, _OrganizationRelation):
"""
:class:`.Credential`\\ s are used in conjunction with :class:`.Source`\\ s to ingest data into :class:`.Dataset`\\ s
Parameters
----------
id : str
:class:`.Credential`\\ 's unique id
organization_id : str
The unique ID of the :class:`.Organization` to which this :class:`.Credential` belongs
user_id : str
The unique user ID of the user who created this :class:`.Credential`
name : str
A name for this :class:`.Credential`
description : str
The Optional description of this :class:`.Credential`
created_at : datetime
The datetime at which this :class:`.Credential` was created
updated_at : datetime
The datetime at which this :class:`.Credential` was last updated
"""
_client: 'Catalog'
id: str
organization: 'organization.Organization'
organization_id: str
user_id: str
name: str
description: str
created_at: datetime
updated_at: datetime
def __str__(self) -> str:
return f"<{self.__class__.__name__} id={self.id} name={self.name}>"
def serialize(self) -> dict:
result = super().serialize()
result["user_id"] = self._client.profile["id"]
return result
[docs] @classmethod
def get(cls, client: 'Catalog', organization_id: str, id: str):
"""
Retrieve a :class:`.Credential` belonging to an :class:`.Organization`
Parameters
----------
client : catalog.Client
The :class:`.Catalog` client to use to get the :class:`.Credential`
organization_id : str
The unique ID of the :class:`.Organization`
id : str
The unique ID of the :class:`.Credential`
Returns
-------
Credential
The :class:`.Credential` associated with the given ID
"""
try:
res = client._get_credential(organization_id=organization_id,
id=id)
return cls(client, **res["credential"])
except Exception as e:
err = _raise_error(e, "Unable to fetch Credential {}".format(id))
[docs] def save(self) -> None:
"""
Update this :class:`.Credential`, saving any changes to its name, description or
type-specific fields.
Parameters
----------
None
Returns
-------
None
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to update this :class:`.Credential`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._update_credential(credential=self.serialize())
self.deserialize(res)
except Exception as e:
raise _convert_error(e)
[docs] def delete(self) -> None:
"""
Delete this :class:`.Credential` from the user. This :class:`.Credential` object
should not be used after `delete()` returns successfully.
Parameters
----------
None
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to update this :class:`.Credential`
CatalogInvalidArgumentException
If the given :class:`.Credential` does not exist
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
self._client._delete_credential(credential=self.serialize())
except Exception as e:
raise _convert_error(e)
[docs]@Entity([
Property("catalog_api_key", str, writable=True, serialize=False),
])
class CatalogCredential(Credential):
"""
A :class:`.CatalogCredential` permits a :class:`.Source` to access datasets which exist on another ThinkData Works :class:`.Catalog` server.
Attributes
----------
catalog_api_key : str
The API key for the target :class:`.Catalog`. Can be updated, but not read.
"""
catalog_api_key: str
def serialize(self) -> dict:
result = super().serialize()
result["namara"] = {"namara_api_key": self._catalog_api_key}
return result
[docs]@Entity([
Property("username", str, writable=True, serialize=False),
Property("password", str, writable=True, serialize=False)
])
class FTPCredential(Credential):
"""
An :class:`.FTPCredential` permits a :class:`.Source` to access data stored on an FTP server.
Attributes
----------
username: str
The username for the target FTP server
password: str
The password for the target FTP server. Can be updated, but not read.
"""
username: str
password: str
def serialize(self) -> dict:
result = super().serialize()
result["ftp"] = {"username": self.username, "password": self.password}
return result
def deserialize(self, data: dict) -> None:
super().deserialize(data)
if "ftp" in data:
self.username = data["ftp"].get("username", "")
[docs]@Entity([
Property("username", str, writable=True, serialize=False),
Property("password", str, writable=True, serialize=False),
Property("ssh_key", str, writable=True, serialize=False)
])
class SFTPCredential(Credential):
"""
An :class:`.SFTPCredential` permits a :class:`.Source` to access data stored on an SFTP server.
Attributes
----------
username: str
The username for the target FTP server
password: str
The password for the target FTP server. Can be updated, but not read.
ssh_key: Optional[str]
The ssh key for the target SFTP server. Either ssh_key or password must be set. Can be updated, but not read.
"""
username: str
password: str
ssh_key: str
def serialize(self) -> dict:
result = super().serialize()
result["sftp"] = {
"username": self.username,
"password": self.password,
"ssh_key": self.ssh_key
}
return result
def deserialize(self, data: dict) -> None:
super().deserialize(data)
if "sftp" in data:
self.username = data["sftp"].get("username", "")
[docs]@Entity([
Property("project", str, writable=True, serialize=False),
Property("region", str, writable=True, serialize=False),
Property("client_secrets", str, writable=True, serialize=False)
])
class GoogleStorageCredential(Credential):
"""
A :class:`.GoogleStorageCredential` permits a :class:`.Source` to access data stored in a Google Storage (GS) bucket.
Attributes
----------
project : str
The name of the Google Cloud project in which the bucket can be found
region : str
The Google Cloud region in which the bucket can be found (e.g. us-central1)
client_secrets : str
The client secrets for the Google Storage bucket. Can be updated, but not read.
"""
project: str
region: str
client_secrets: str
def serialize(self) -> dict:
result = super().serialize()
result["gs"] = {
"project": self.project,
"region": self.region,
"client_secrets": self.client_secrets,
}
return result
def deserialize(self, data: dict) -> None:
super().deserialize(data)
if "gs" in data:
self.project = data["gs"].get("project", "")
self.region = data["gs"].get("region", "")
[docs]@Entity([
Property("region", str, writable=True, serialize=False),
Property("access_key_id", str, writable=True, serialize=False),
Property("secret_access_key", str, writable=True, serialize=False)
])
class S3Credential(Credential):
"""
An :class:`.S3Credential` permits a :class:`.Source` to access data stored in an AWS S3 (or other S3-compatible) bucket.
Attributes
----------
region : str
The AWS S3 region in which the bucket resides
access_key_id : str
The AWS Access Key for the S3 bucket. Can be updated but not read.
secret_access_key : str
The AWS Secret Access Key for the S3 bucket. Can be updated but not read.
"""
region: str
access_key_id: str
secret_access_key: str
def serialize(self) -> dict:
result = super().serialize()
result["s3"] = {
"region": self.region,
"access_key_id": self.access_key_id,
"secret_access_key": self.secret_access_key,
}
return result
def deserialize(self, data: dict) -> None:
super().deserialize(data)
if "s3" in data:
self.region = data["s3"].get("region", "")
self.access_key_id = data["s3"].get("access_key_id", "")
[docs]class CredentialFactory:
"""
A :class:`.CredentialFactory` creates specific types of :class:`.Credential`\\ s within a specific :class:`.Organization`
"""
_client: 'Catalog'
_organization_id: str
def __init__(
self,
client: 'Catalog',
organization_id: str,
) -> None:
"""
Initializes :class:`.CredentialFactory` which is capable of creating :class:`.Credential`\\ s
within this :class:`.Organization`.
Parameters
----------
client : Catalog
RPC client, used to communicate with the Data :class:`.Catalog` server
organization_id : str
The unique ID of the :class:`.Organization` to create :class:`.Credential`\\ s within
"""
self._client = client
self._organization_id = organization_id
def get(self, data: dict) -> Credential:
if "namara" in data:
return CatalogCredential(self._client, **data)
elif "ftp" in data:
return FTPCredential(self._client, **data)
elif "sftp" in data:
return SFTPCredential(self._client, **data)
elif "gs" in data:
return GoogleStorageCredential(self._client, **data)
elif "s3" in data:
return S3Credential(self._client, **data)
else:
raise TypeError("Malformed credential data type" + data.__str__())
[docs] def catalog_credential(self, name: str, description: Optional[str],
catalog_api_key: str) -> CatalogCredential:
"""
Constructs a :class:`.CatalogCredential`
Parameters
----------
name : str
A name for this :class:`.Credential`
description : Optional[str]
The Optional description of this :class:`.Credential`
catalog_api_key : str
The API key for the target :class:`.Catalog`
Returns
-------
CatalogCredential
The created :class:`.CatalogCredential`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.Credential`\\ s
CatalogInvalidArgumentException
If one or more of the given credential parameters are invalid
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
credential = CatalogCredential(
self._client,
name=name,
description=description,
organization_id=self._organization_id,
catalog_api_key=catalog_api_key)
res = self._client._create_credential(
credential=credential.serialize())
credential.deserialize(res)
return credential
except Exception as e:
raise _convert_error(e)
[docs] def ftp_credential(self, name: str, description: Optional[str],
username: str, password: str) -> FTPCredential:
"""
Constructs an :class:`.FTPCredential`
Parameters
----------
name : str
A name for this :class:`.Credential`
description : Optional[str]
The Optional description of this :class:`.Credential`
username: str
The username for the target FTP server
password: str
The password for the target FTP server
Returns
-------
FTPCredential
The created FTPCredential
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.Credential`\\ s
CatalogInvalidArgumentException
If one or more of the given credential parameters are invalid
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
credential = FTPCredential(self._client,
name=name,
description=description,
organization_id=self._organization_id,
username=username,
password=password)
res = self._client._create_credential(
credential=credential.serialize())
credential.deserialize(res)
return credential
except Exception as e:
raise _convert_error(e)
[docs] def sftp_with_password_credential(self, name: str,
description: Optional[str],
username: str,
password: str) -> SFTPCredential:
"""
Constructs a password-based :class:`.SFTPCredential`
Parameters
----------
name : str
A name for this :class:`.Credential`
description : Optional[str]
The Optional description of this :class:`.Credential`
username: str
The username for the target SFTP server
password: str
The password for the target SFTP server
Returns
-------
SFTPCredential
The created :class:`.SFTPCredential`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.Credential`\\ s
CatalogInvalidArgumentException
If one or more of the given credential parameters are invalid
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
credential = SFTPCredential(self._client,
name=name,
description=description,
organization_id=self._organization_id,
username=username,
password=password)
res = self._client._create_credential(
credential=credential.serialize())
credential.deserialize(res)
return credential
except Exception as e:
raise _convert_error(e)
[docs] def sftp_with_key_credential(self, name: str, description: Optional[str],
username: str,
ssh_key: str) -> SFTPCredential:
"""
Constructs a key-based :class:`.SFTPCredential`
Parameters
----------
name : str
A name for this :class:`.Credential`
description : Optional[str]
The Optional description of this :class:`.Credential`
username: str
The username for the target SFTP server
ssh_key: str
The ssh_key for the target SFTP server
Returns
-------
SFTPCredential
The created :class:`.SFTPCredential`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.Credential`\\ s
CatalogInvalidArgumentException
If one or more of the given credential parameters are invalid
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
credential = SFTPCredential(self._client,
name=name,
description=description,
organization_id=self._organization_id,
username=username,
ssh_key=ssh_key)
res = self._client._create_credential(
credential=credential.serialize())
credential.deserialize(res)
return credential
except Exception as e:
raise _convert_error(e)
[docs] def google_storage_credential(
self, name: str, description: Optional[str], region: str,
project: str, client_secrets: str) -> GoogleStorageCredential:
"""
Constructs a :class:`.GoogleStorageCredential`
Parameters
----------
name : str
A name for this :class:`.Credential`
description : Optional[str]
The Optional description of this :class:`.Credential`
project : str
The name of the Google Cloud project in which the bucket can be found
region : str
The Google Cloud region in which the bucket can be found (e.g. us-central1)
client_secrets : str
The client secrets for the Google Storage bucket. Can be updated, but not read.
Returns
-------
GoogleStorageCredential
The created :class:`.GoogleStorageCredential`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.Credential`\\ s
CatalogInvalidArgumentException
If one or more of the given credential parameters are invalid
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
credential = GoogleStorageCredential(
self._client,
name=name,
description=description,
organization_id=self._organization_id,
region=region,
project=project,
client_secrets=client_secrets)
res = self._client._create_credential(
credential=credential.serialize())
credential.deserialize(res)
return credential
except Exception as e:
raise _convert_error(e)
[docs] def s3_credential(self, name: str, description: Optional[str], region: str,
access_key_id: str,
secret_access_key: str) -> S3Credential:
"""
Constructs as :class:`.S3Credential`
Parameters
----------
name : str
A name for this :class:`.Credential`
description : Optional[str]
The Optional description of this :class:`.Credential`
region : str
The AWS S3 region in which the bucket resides
access_key_id : str
The AWS Access Key for the S3 bucket. Can be updated but not read.
secret_access_key : str
The AWS Secret Access Key for the S3 bucket. Can be updated but not read.
Returns
-------
S3Credential
The created :class:`.S3Credential`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to create :class:`.Credential`\\ s
CatalogInvalidArgumentException
If one or more of the given credential parameters are invalid
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
credential = S3Credential(self._client,
name=name,
description=description,
organization_id=self._organization_id,
region=region,
access_key_id=access_key_id,
secret_access_key=secret_access_key)
res = self._client._create_credential(
credential=credential.serialize())
credential.deserialize(res)
return credential
except Exception as e:
raise _convert_error(e)