from datetime import datetime
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from asyncio import gather, sleep
from tdw_catalog import Catalog
from tdw_catalog.export import CSVExport, ParquetExport
from tdw_catalog.query import QueryCursor
from tdw_catalog.relations import _OrganizationRelation, _SourceRelation
import tdw_catalog.source as source
import tdw_catalog.connection as connection_package
import tdw_catalog._reference as _reference
from tdw_catalog.entity import Entity, Property, EntityBase
from tdw_catalog.errors import CatalogException, CatalogInternalException, _convert_error, _raise_error
from tdw_catalog.dataset_connector import DatasetConnector
import tdw_catalog.topic as topic
from tdw_catalog.utils import _ExportFormat, Filter, ImportState
from tdw_catalog.utils import _parse_timestamp
import tdw_catalog.warehouse as warehouse_package
import tdw_catalog.metadata.editor as editor
import tdw_catalog.metadata.field as metadata_field
import tdw_catalog.metadata_template as metadata_template
from tdw_catalog.data_dictionary import Column, DataDictionary, MetadataOnlyDataDictionary
if TYPE_CHECKING:
import tdw_catalog.organization as organization
def _deserialize_dataset_metadata_template(
data: Optional[Dict[str, any]],
dataset: 'Dataset') -> 'Optional[metadata_template.MetadataTemplate]':
return None if data is None else metadata_template.MetadataTemplate(
dataset._client, **data)
def _deserialize_data_dictionary(
data: List[dict], dataset: 'Dataset'
) -> Union[DataDictionary, MetadataOnlyDataDictionary]:
# grab the first version, which will be the most recent valid one
version = data[0] if data is not None and len(data) > 0 else None
Klass = DataDictionary if dataset.is_connected else MetadataOnlyDataDictionary
return Klass(dataset=dataset,
last_updated_at=_parse_timestamp(version["updated_at"])
if version is not None else None,
version_id=version["id"]
if version is not None and "id" in version else None,
columns=[
Column._from_property(dataset=dataset, p=p)
for p in version["properties"]
] if version is not None else [])
[docs]@Entity([
Property("id", str, serialize=True),
Property("title", str, writable=True),
Property("description", Optional[str], writable=True),
Property("uploader_id", str, display_key="creator_id"),
Property("source_id",
str,
relation="tdw_catalog.source.Source",
serialize=True,
writable=True),
Property("organization_id",
str,
relation="tdw_catalog.organization.Organization",
serialize=True),
Property("created_at", datetime),
Property("updated_at", datetime),
Property("custom_fields",
List[metadata_field.MetadataField],
deserialize=metadata_field._deserialize_metadata_fields,
serialize=True,
writable=False,
readable=False),
Property("field_template",
Optional[metadata_template.MetadataTemplate],
display_key="metadata_template",
deserialize=_deserialize_dataset_metadata_template,
serialize=True,
writable=False),
Property("versions",
Union[DataDictionary, MetadataOnlyDataDictionary],
display_key="data_dictionary",
deserialize=_deserialize_data_dictionary),
# TODO warehouse metadata
# TODO warehouse custom metadata
])
class Dataset(EntityBase, _OrganizationRelation, _SourceRelation):
"""
A :class:`.Dataset` represents a cataloged data asset within an :class:`.Organization`.
It is a container for structured and custom metadata describing the
asset, and can optionally be connected to the data asset via a
:class:`.IngestionConnection` or :class:`.VirtualizationConnection` to support queries,
health monitoring, etc.
Attributes
__________
id: str
The :class:`.Dataset`'s unique ID
title: str
The title of the :class:`.Dataset`
description: Optional[str]
The full description text (supports Markdown) that helps describe this :class:`.Dataset`
uploader_id: str
The unique ID of the :class:`.User` that created this :class:`.Dataset`
source_id: str
The unique ID of the :class:`.Source` associated with this :class:`.Dataset`
source: str
The :class:`.Source` associated with this :class:`.Dataset`
organization_id: str
The unique ID of the :class:`.Organization` which this :class:`.Dataset` belongs to
organization: Organization
The :class:`.Organization` which this :class:`.Dataset` belongs to
metadata_template: MetadataTemplate
The :class:`.MetadataTemplate` attached to this :class:`.Dataset`, if any
data_dictionary: DataDictionary
The :class:`.DataDictionary` defined within this :class:`.Dataset`, or describing the schema of the connected data if this is a :class:`.ConnectedDataset`
created_at: datetime
The date this :class:`.Dataset` was originally created
updated_at: datetime
The date this :class:`.Dataset`\\ 's metadata was last modified
"""
_client: 'Catalog'
_context_organization: 'organization.Organization'
id: str
title: str
description: Optional[str]
uploader_id: str
source: 'source.Source'
source_id: str
organization: 'organization.Organization'
organization_id: str
metadata_template: 'metadata_template.MetadataTemplate'
data_dictionary: Union[DataDictionary, MetadataOnlyDataDictionary]
created_at: datetime
updated_at: datetime
def __str__(self) -> str:
return f"<Dataset id={self.id} title={self.title} organization_id={self.organization_id}>"
def _get_templated_metadata(self) -> List['metadata_field.MetadataField']:
if self._field_template is None:
return []
templated_fields = []
custom_field_keys = list(map(
lambda f: f["key"],
self._custom_fields)) if self._custom_fields is not None else []
# Add each field from custom_fields corresponding to a field in the template
for field in self._field_template.fields:
# if there is field in custom_fields with the same key as a template field, ensure it is the same type as the template field then add it to templated_metadata
if field["key"] in custom_field_keys:
matched_custom_field = next(
(x for x in self._custom_fields if x.key == field.key),
None)
if metadata_field._check_field_match(
field=matched_custom_field,
templated_field=field) == False:
raise CatalogInternalException(
message=
f"The type of the field with key {field.key} on this Dataset does not match the type of the field with key {field.key} on its attached MetadataTemplate"
)
templated_fields.append(matched_custom_field)
# if there is field missing from custom_fields but present in the metadata template, add it directly from the template fields list to templated_metadata
else:
templated_fields.append(
metadata_field._convert_template_field(field))
return templated_fields
def _get_custom_metadata(self) -> List['metadata_field.MetadataField']:
metadata_template_field_keys = list(
map(lambda f: f["key"], self._field_template.fields)
) if self._field_template is not None else []
# Add each field from custom_fields that does not correspond to a field in the template
return [
field for field in self._custom_fields
if field["key"] not in metadata_template_field_keys
] if self._custom_fields is not None else []
@property
def custom_metadata(self) -> List['metadata_field.MetadataField']:
"""
A list of :class:`.MetadataField`\\ s attached to this :class:`.Dataset` that are not associated with an attached :class:`.MetadataTemplate`
"""
return self._get_custom_metadata()
@property
def templated_metadata(self) -> List['metadata_field.MetadataField']:
"""
A list of :class:`.MetadataField`\\ s attached to this :class:`.Dataset` that are associated with an attached :class:`.MetadataTemplate`
"""
return self._get_templated_metadata()
@property
def is_connected(self) -> bool:
return False
[docs] @classmethod
def get(cls,
client: 'Catalog',
id: str,
context_organization: Optional['organization.Organization'] = None
):
"""
Retrieve a :class:`.Dataset`
Parameters
----------
client : Catalog
The :class:`.Catalog` client to use to get the :class:`.Dataset`
id : str
The unique ID of the :class:`.Dataset`
context_organization : Optional[Organization]
The :class:`.Organization` from which this :class:`.Dataset` is being retrieved.
:class:`.Dataset`\\ 's may be accessible from multiple :class:`.Organization`\\ 's,
but can have differing metadata within each. This context parameter is necessary
to determine which metadata to load.
Returns
-------
Dataset
The :class:`.Dataset` associated with the given ID
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to retrieve the given :class:`.Dataset`
CatalogNotFoundException
If the given :class:`.Dataset` ID does not exist
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = client._get_dataset(id=id)
# pick out the valid versions and call list_properties,
# which returns glossary terms information
versions = None if "versions" not in res["dataset"] or res[
"dataset"]["versions"] is None else [
v for v in res["dataset"]["versions"]
if v["state"] == "draft" or v["state"] == "imported"
]
if versions is not None and len(versions) > 0:
properties = client._list_properties(
version_ids=[versions[0]["id"]],
organization_ids=[res["dataset"]["organization_id"]])
# overwrite properties for versions[0], to include
# glossary term info
versions[0]["properties"] = properties
res["dataset"]["versions"] = versions
if "reference_id" in res["dataset"]:
d = ConnectedDataset(client, **res["dataset"])
d._context_organization = context_organization
return d
d = cls(client, **res["dataset"])
d._context_organization = context_organization
return d
except Exception as e:
err = _raise_error(e, "Unable to fetch Dataset {}".format(id))
[docs] def attach_template(self, template: 'metadata_template.MetadataTemplate'):
"""
Attach a :class:`.MetadataTemplate` to this :class:`.Dataset`. Values may be supplied to
templated fields immediately, but the template will only be attached when
class:`.Dataset` `.save()` is called.
Parameters
----------
template : MetadataTemplate
The :class:`.MetadataTemplate` to be attached to the :class:`.Dataset`
Returns
-------
Dataset
The :class:`.Dataset` with a newly attached :class:`MetadataTemplate`
"""
if template is not None:
_update_fields_after_template_attachment(dataset=self,
template=template)
return self
[docs] def detach_template(self):
"""
Remove the attached :class:`.MetadataTemplate` from this :class:`.Dataset`. Any fields from this :class:`.MetadataTemplate` will remain on the :class:`.Dataset` but as individual :class:`.MetadataField`\\ s. Detachment happens instantly and calling :class:`.Dataset`.save() is not necessary for the changes to persist
Parameters
----------
None
Returns
-------
Dataset
The :class:`.Dataset` with no attached :class:`.MetadataTemplate`
"""
try:
self._client._detach_field_template(
organization_id=self.organization_id,
dataset_id=self.id,
field_template_id=self._field_template.id)
except Exception as e:
raise _convert_error(e)
self._field_template = None
return self
[docs] def save(self) -> None:
"""
Update this :class:`.Dataset`, saving all changes to its metadata fields.
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to update this :class:`.Dataset`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
res = self._client._update_dataset(dataset=self.serialize())
self.deserialize(res)
except Exception as e:
raise _convert_error(e)
[docs] def delete(self) -> None:
"""
Delete this :class:`.Dataset`\\. The :class:`.Dataset` object should not be used after this
method is invoked successfully.
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to delete this :class:`.Dataset`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
self._client._delete_dataset(id=self.id)
except Exception as e:
raise _convert_error(e)
[docs] def connect(self) -> 'DatasetConnector':
"""
Converts a :class:`.Dataset` into a ConnectedDataset, by accessing
data via an :class:`.IngestionConnection` or :class:`.VirtualizationConnection`.
A :class:`.ConnectedDataset` can represent ingested data, which is copied into
the :class:`.Catalog` platform, or virtualized data which is accessed remotely by the
platform without being copied.
There are many methods for connecting a :class:`.Dataset`, thus a
helper object is returned with various method-based
workflows that aid in connecting to data.
Returns
-------
DatasetConnector
A helper object for configuring this :class:`.Dataset`\\ 's
connection to data.
"""
return DatasetConnector(self)
[docs] def refresh(self) -> 'Dataset':
"""
Return a fresh copy of this :class:`.Dataset`, with up-to-date
property values. Useful after performing an update, connection,
etc.
"""
return Dataset.get(self._client, self.id, self._context_organization)
[docs] def list_topics(self,
organization_id: Optional[str] = None,
filter: Optional[Filter] = None) -> 'List[topic.Topic]':
"""
Retrieves the list of all :class:`.Topic`\\ s this :class:`.Dataset` is currently classified under, within the given :class:`.Organization`
Parameters
----------
organization_id : Optional[str]
An optional ID for an :class:`.Organization` other than the original :class:`.Organization` the :class:`.Dataset` was created in (e.g. if the :class:`.Dataset` has been shared to another organization with a different set of :class:`.Topic`\\ s)
filter : Optional[Filter]
An optional :class:`.tdw_catalog.utils.Filter` to offset or limit the list of :class:`.Topic`\\ s returned
Returns
-------
List[Topic]
The list of :class:`.Topic`\\ s that have been classified to this :class:`.Dataset`
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to list :class:`.Topic`\\ s in this :class:`.Organization`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
topics = self._client._get_topics(
dataset_id=self.id,
organization_id=self.organization_id
if organization_id is None else organization_id,
filter=filter)
return [topic.Topic(self._client, **t) for t in topics]
except Exception as e:
raise _convert_error(e)
[docs] def classify(self, topic: 'topic.Topic') -> None:
"""
Classify this :class:`.Dataset` with a :class:`.Topic`, linking them semantically
Parameters
----------
topic : Topic
The :class:`.Topic` to classify this :class:`.Dataset` with
Returns
-------
None
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to classify :class:`.Dataset`\\ s, or if the :class:`.Topic` ID provided does not correspond to an existing :class:`.Topic`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
self._client._set_topic(id=topic.id,
dataset_id=self.id,
organization_id=self.organization_id)
except Exception as e:
raise _convert_error(e)
[docs] def declassify(self, topic: 'topic.Topic') -> None:
"""
Remove a :class:`.Topic` classification from this :class:`.Dataset`
Parameters
----------
topic : Topic
The :class:`.Topic` to be unclassified from this :class:`.Dataset`
Returns
-------
None
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to declassify :class:`.Dataset`\\ s, or if the :class:`.Topic` ID provided does not correspond to an existing :class:`.Topic`
CatalogException
If call to the :class:`.Catalog` server fails
"""
try:
self._client._unset_topic(id=topic.id,
dataset_id=self.id,
organization_id=self.organization_id)
except Exception as e:
raise _convert_error(e)
[docs]@Entity([
Property("reference_id",
str,
relation="tdw_catalog._reference._Reference",
readable=False),
Property("exports_disabled", bool, writable=True),
Property("imported_at", datetime, display_key="metrics_last_collected_at"),
# TODO versions & revisions
Property("reference_next_ingest",
Optional[datetime],
display_key="next_scheduled_metrics_collection_time"),
Property("reference_failed_at",
Optional[datetime],
display_key="last_metrics_collection_failure_time"),
Property("warehouse_metadata",
Optional[List[metadata_field.MetadataField]],
deserialize=metadata_field._deserialize_metadata_fields),
])
class ConnectedDataset(Dataset):
"""
A :class:`.ConnectedDataset` is identical to a :class:`.Dataset` and inherits
all of its fields, but represents a :class:`.Dataset` which is connected
to the actual underlying data asset via a Connection. A
:class:`.ConnectedDataset` supports queries, export, health monitoring, etc.
Attributes
__________
exports_disabled: bool
A flag to mark if this :class:`.Dataset` may be exported. Setting this
to false does not prevent querying on this :class:`.Dataset`. Only relevant if
the :class:`.Dataset` is connected to data.
warehouse: str
The underlying data warehouse where that data resides
metrics_last_collected_at: datetime
The last time metrics were collected for this :class:`.Dataset` (virtualized :class:`.Dataset`\\ s)
or the last time the :class:`.Dataset` was imported (ingested :class:`.Dataset`\\ s).
next_scheduled_metrics_collection_time: Optional[datetime]
If this :class:`.Dataset` has an associated connection schedule,
the next time this dataset will collect metrics (virtualized Dataset)
or import (ingested :class:`.Dataset`\\ s).
last_metrics_collection_failure_time: datetime
The most recent time metrics collection (virtualized :class:`.Dataset`\\ s)
or import (ingested :class:`.Dataset`\\ s) failed. ``None`` if metrics collection
has never failed.
warehouse_metadata: Optional[List[metadata_field.MetadataField]]
Harvested metadata from virtualized :class:`.Dataset`\\ s. ``None`` for ingested :class:`.Dataset`\\ s.
"""
exports_disabled: bool
metrics_last_collected_at: datetime
next_scheduled_metrics_collection_time: Optional[datetime]
last_metrics_collection_failure_time: Optional[datetime]
warehouse_metadata: 'Optional[List[metadata_field.MetadataField]]'
@property
def is_connected(self) -> bool:
return True
[docs] def refresh(self) -> 'ConnectedDataset':
"""
Return a fresh copy of this :class:`.ConnectedDataset`, with up-to-date
property values. Useful after performing an update, connection,
etc.
"""
return ConnectedDataset.get(self._client, self.id)
@property
def connection_id(self) -> str:
""""The ID of the underlying :class:`.IngestionConnection` or :class:`.Virtualization` which links this :class:`.Dataset` to data"""
return self._reference.connection_id
@property
def connection(
self
) -> Union['connection_package.IngestionConnection',
'connection_package.VirtualizationConnection']:
""""The underlying :class:`.IngestionConnection` or :class:`.VirtualizationConnection` which links this :class:`.Dataset` to data"""
return self._reference.connection
@property
def warehouse(self) -> 'warehouse_package.Warehouse':
"""
The :class:`.Warehouse` where the connected data is virtualized from, or ingested to
"""
return warehouse_package.Warehouse.get(self._client,
self._reference.warehouse,
self.organization_id)
@property
def import_state(self) -> ImportState:
return self._reference.state
@property
def health_monitoring_enabled(self) -> bool:
"""
Whether or not :class:`.Catalog` platform health monitoring is enabled for this :class:`.ConnectedDataset`
"""
return self._reference.dataspec_enabled
@health_monitoring_enabled.setter
def set_health_monitoring_enabled(self, health_monitoring_enabled: bool):
"""
Enable or disable health monitoring for this :class:`.ConnectedDataset`
"""
self._reference.dataspec_enabled = health_monitoring_enabled
@property
def metrics_collection_schedules(
self) -> Optional[List['connection_package.ConnectionSchedule']]:
"""
Returns all configured schedules for metrics collection, which govern health monitoring intervals and ingestion intervals for ingested :class:`.Dataset`\\ s
"""
return self._reference.ingest_schedules
@metrics_collection_schedules.setter
def set_metrics_collection_schedules(self, schedules: Optional[
List['connection_package.ConnectionSchedule']]):
"""
Set or clear metrics collection schedules
"""
self._reference.ingest_schedules = schedules
@property
def advanced_configuration(self) -> str:
"""
This configuration string is auto-generated during ingest,
or when virtualization, inferred from the connected data.
It can be modified, with caution, to alter how the :class:`.Catalog`
perceives and represents the connected data.
Modification of this configuration without support
from ThinkData Works is not recommended.
"""
return self._reference.ingest_config
@advanced_configuration.setter
def set_advanced_configuration(self, ingest_config: str):
"""
This configuration string is auto-generated during ingest,
or when virtualization, inferred from the connected data.
It can be modified, with caution, to alter how the :class:`.Catalog`
perceives and represents the connected data.
Modification of this configuration without support
from ThinkData Works is not recommended.
"""
self._reference.ingest_config = ingest_config
[docs] def connect(self) -> 'DatasetConnector':
"""
Manage all connection-related aspects of this
:class:`.ConnectedDataset`.
There are many methods for connecting a :class:`.Dataset`, thus a
helper object is returned with various method-based
workflows that aid in connecting to data.
Returns
-------
DatasetConnector
A helper object for configuring this :class:`.Dataset`\\ 's
connection to data.
"""
return super().connect()
[docs] async def reconnect(self):
"""
Manually triggers a reimport of ingested data for
ingested datasets, and metrics collection (health monitoring, etc.)
for virtualized and ingested datasets.
Useful for forcing a metrics collection, or applying changes
made to the advanced_configuration.
"""
try:
self._reference.force_fail()
except:
pass
await self._reference.ingest()
def __str__(self) -> str:
return f"<ConnectedDataset id={self.id} title={self.title} organization_id={self.organization_id}>"
def _get__reference(self):
return _reference._Reference.get(self._client, self._reference_id,
self.organization_id)
[docs] async def query(self, query: Optional[str] = None) -> QueryCursor:
"""
Async function which returns a Python DB API-style Cursor object (PEP 249),
representing the results of the supplied SQL-like NiQL query
executed against the connected data.
Note that NIQL supports most standard SQL keywords, but keywords
which modify underlying data (e.g. ``INSERT``, ``UPDATE``, ``DELETE``)
may not be used.
Note that the :class:`.Catalog` platform supports a global limit on
results (10,000 rows) from a single query.
To refer to the current dataset in the query, include ``{this}``
in the query, such as: ``"SELECT * FROM {this}"``.
Parameters
----------
query : Optional[str]
A NiQL query used to filter or reshape the data before exporting
Returns
-------
QueryCursor
The query results cursor, which can be printed, converted to
a ``pandas`` DataFrame via ``pd.DataFrame(res.fetchall())``, etc.
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to query data
CatalogInvalidArgumentException
If the given query is invalid
CatalogException
If call to the :class:`.Catalog` server fails, or the export process itself fails
"""
try:
q = "SELECT * FROM {this}" if query is None else query
q = q.format(this=self.id)
res = self._client._query(statement=q)
return QueryCursor(res)
except Exception as e:
raise _convert_error(e)
async def _do_export(
self,
query: Optional[str] = None,
format: Optional[_ExportFormat] = None) -> Dict[str, any]:
try:
file_name = 'export.parquet' if format is _ExportFormat.PARQUET else 'export.csv'
export = self._client._create_export(query=query,
format=format,
file_name=file_name)
export_in_progress = True
while export_in_progress:
finished_export_details = None
# when the export was already been done before, this 'export' key
# will be available right away.
if 'export' in export:
finished_export_details = self._client._get_export(
id=export['export']['id'])
else:
finished_export_details = self._client._get_export(
id=export['id'])
if 'export' in finished_export_details:
state = finished_export_details['export']['state']
else:
state = finished_export_details['state']
if state == 'finished':
break
elif state == 'failed':
# if a CSV_GZIP export fails, fallback to CSV
if format is _ExportFormat.CSV_GZIP:
return await self._do_export(query,
format=_ExportFormat.CSV)
raise CatalogException(
message=finished_export_details["error_info"]
["message"])
await sleep(2)
return finished_export_details
except Exception as e:
raise _convert_error(e)
[docs] async def export_csv(self, query: Optional[str] = None) -> CSVExport:
"""
Async function which returns the URL which can be used to stream a
CSV-formatted copy of the connected data, optionally filtered by
the supplied SQL-like NiQL query. Note that most standard SQL
keywords are supported, but keywords which modify underlying data
(e.g. ``INSERT``, ``UPDATE``, ``DELETE``) are not.
To refer to the current dataset in the query, include ``{this}``
in the query, such as: ``"SELECT * FROM {this}"``.
Unlike ``ConnectedDataset.query()``, there is no limit on exported rows,
other than any imposed by the underlying warehouse.
Parameters
----------
query : Optional[str]
A NiQL query used to filter or reshape the data before exporting
Returns
-------
CSVExport
An :class:`.CSVExport` object containing a signed download URL which can be used to
fetch the exported data. It can be downloaded in its entirety, or streamed in chunks.
This :class:`.CSVExport` object improves the usability of the CSV data when employing ``pandas``,
including a configuration for ``read_csv`` which can be passed via ``**export``
as follows: ``df = pd.read_csv(export.url, **export)``, ensuring that the
resultant ``DataFrame`` has the correct schema for all fields (including dates).
Note: Is is recommended that `export_parquet` be employed for use with ``pandas`` when
supported by the underlying warehouse.
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to export data
CatalogInvalidArgumentException
If the given query is invalid
CatalogException
If call to the :class:`.Catalog` server fails, or the export process itself fails
"""
q = "SELECT * FROM {this}" if query is None else query
q = q.format(this=self.id)
schema_q = "SELECT * FROM ({q}) LIMIT 0".format(q=q)
# run the export and query in parellel, using the query to fetch the schema
[query_res, finished_export_details
] = await gather(self.query(schema_q),
self._do_export(q, format=_ExportFormat.CSV_GZIP))
export_format = _ExportFormat.CSV_GZIP if finished_export_details[
"export"]["format"] == "csv.gzip" else _ExportFormat.CSV
return CSVExport._from_export_details(query_res,
finished_export_details,
export_format)
[docs] async def export_parquet(self,
query: Optional[str] = None) -> ParquetExport:
"""
Async function which returns the URL which can be used to stream a
Parquet-formatted copy of the connected data, optionally filtered by
the supplied SQL-like NiQL query. Note that most standard SQL
keywords are supported, but keywords which modify underlying data
(e.g. ``INSERT``, ``UPDATE``, ``DELETE``) are not.
To refer to the current dataset in the query, include ``{this}``
in the query, such as: ``"SELECT * FROM {this}"``.
Unlike ``ConnectedDataset.query()``, there is no limit on exported rows,
other than any imposed by the underlying warehouse.
Note: Parquet export is not (yet) supported for all underlying warehouse types, but this
export method should be preferred when interfacing with ``pandas`` whenever possible.
Parameters
----------
query : Optional[str]
A NiQL query used to filter or reshape the data before exporting
Returns
-------
ParquetExport
An :class:`.ParquetExport` object containing a signed download URL which can be used to
fetch the exported data. It can be downloaded in its entirety, or streamed in chunks.
This :class:`.ParquetExport` object can be directly employed by ``pandas``
as follows: ``df = pd.read_parquet(export.url)``. Note that ``pandas`` requires
``pyarrow`` OR ``fastparquet`` in order to ``read_parquet``.
Note: Is is recommended that `export_parquet` be employed for use with pandas when
supported by the underlying warehouse.
Raises
------
CatalogPermissionDeniedException
If the caller is not allowed to export data
CatalogInvalidArgumentException
If the given query is invalid, or if Parquet export is not available for this warehouse type
CatalogException
If call to the :class:`.Catalog` server fails, or the export process itself fails
"""
q = "SELECT * FROM {this}" if query is None else query
q = q.format(this=self.id)
finished_export_details = await self._do_export(
q, format=_ExportFormat.PARQUET)
return ParquetExport._from_export_details(finished_export_details)
def _update_fields_after_template_attachment(
dataset: 'Dataset',
template: 'metadata_template.MetadataTemplate') -> None:
# steps for setting the template
# remove all of the original template's fields from the custom fields list
# convert the new templates fields list to MetadataField objects (instead of MetadataTemplateField objects)
# update the values of the new template fields list
# set the dataset metadata_template to the new template
# set the new template
original_fields_list = dataset._custom_fields if dataset._custom_fields is not None else []
original_template_fields = dataset._field_template.fields if dataset._field_template is not None else None
original_template_keys = list(
map(lambda f: f["key"], original_template_fields)
) if original_template_fields is not None else []
# remove the old template fields from the custom fields list
filtered_fields_list = list(
filter(lambda f: f["key"] not in original_template_keys,
original_fields_list))
# set the new field template, so that it will be saved when dataset.save() is called
dataset._field_template = template
# map the new field template fields to normal MetadataField's
template_fields = list(
map(lambda f: metadata_field._convert_template_field(f),
dataset._field_template.fields)
) if dataset._field_template.fields is not None else []
# update the template_fields with any pre-existing field values
for field in template_fields:
pre_existing = next((f for f in filtered_fields_list
if f.key == field.key and type(f) == type(field)),
None)
if pre_existing is not None:
# update the template field value if it doesn't have one
if field.value is None:
field.value = pre_existing.value
# remove the pre-existing field from the custom fields list
filtered_fields_list = [
f for f in filtered_fields_list if f.key != field.key
]
# add the new templates fields to the custom fields list
dataset._custom_fields = template_fields + filtered_fields_list