Source code for tdw_catalog.dataset

from datetime import datetime
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from asyncio import gather, sleep

from tdw_catalog import Catalog
from tdw_catalog.export import CSVExport, ParquetExport
from tdw_catalog.query import QueryCursor
from tdw_catalog.relations import _OrganizationRelation, _SourceRelation
import tdw_catalog.source as source
import tdw_catalog.connection as connection_package
import tdw_catalog._reference as _reference
from tdw_catalog.entity import Entity, Property, EntityBase
from tdw_catalog.errors import CatalogException, CatalogInternalException, _convert_error, _raise_error
from tdw_catalog.dataset_connector import DatasetConnector
import tdw_catalog.topic as topic
from tdw_catalog.utils import _ExportFormat, Filter, ImportState
from tdw_catalog.utils import _parse_timestamp
import tdw_catalog.warehouse as warehouse_package
import tdw_catalog.metadata.editor as editor
import tdw_catalog.metadata.field as metadata_field
import tdw_catalog.metadata_template as metadata_template
from tdw_catalog.data_dictionary import Column, DataDictionary, MetadataOnlyDataDictionary

if TYPE_CHECKING:
    import tdw_catalog.organization as organization


def _deserialize_dataset_metadata_template(
        data: Optional[Dict[str, any]],
        dataset: 'Dataset') -> 'Optional[metadata_template.MetadataTemplate]':
    return None if data is None else metadata_template.MetadataTemplate(
        dataset._client, **data)


def _deserialize_data_dictionary(
        data: List[dict], dataset: 'Dataset'
) -> Union[DataDictionary, MetadataOnlyDataDictionary]:
    # grab the first version, which will be the most recent valid one
    version = data[0] if data is not None and len(data) > 0 else None
    Klass = DataDictionary if dataset.is_connected else MetadataOnlyDataDictionary
    return Klass(dataset=dataset,
                 last_updated_at=_parse_timestamp(version["updated_at"])
                 if version is not None else None,
                 version_id=version["id"]
                 if version is not None and "id" in version else None,
                 columns=[
                     Column._from_property(dataset=dataset, p=p)
                     for p in version["properties"]
                 ] if version is not None else [])


[docs]@Entity([ Property("id", str, serialize=True), Property("title", str, writable=True), Property("description", Optional[str], writable=True), Property("uploader_id", str, display_key="creator_id"), Property("source_id", str, relation="tdw_catalog.source.Source", serialize=True, writable=True), Property("organization_id", str, relation="tdw_catalog.organization.Organization", serialize=True), Property("created_at", datetime), Property("updated_at", datetime), Property("custom_fields", List[metadata_field.MetadataField], deserialize=metadata_field._deserialize_metadata_fields, serialize=True, writable=False, readable=False), Property("field_template", Optional[metadata_template.MetadataTemplate], display_key="metadata_template", deserialize=_deserialize_dataset_metadata_template, serialize=True, writable=False), Property("versions", Union[DataDictionary, MetadataOnlyDataDictionary], display_key="data_dictionary", deserialize=_deserialize_data_dictionary), # TODO warehouse metadata # TODO warehouse custom metadata ]) class Dataset(EntityBase, _OrganizationRelation, _SourceRelation): """ A :class:`.Dataset` represents a cataloged data asset within an :class:`.Organization`. It is a container for structured and custom metadata describing the asset, and can optionally be connected to the data asset via a :class:`.IngestionConnection` or :class:`.VirtualizationConnection` to support queries, health monitoring, etc. Attributes __________ id: str The :class:`.Dataset`'s unique ID title: str The title of the :class:`.Dataset` description: Optional[str] The full description text (supports Markdown) that helps describe this :class:`.Dataset` uploader_id: str The unique ID of the :class:`.User` that created this :class:`.Dataset` source_id: str The unique ID of the :class:`.Source` associated with this :class:`.Dataset` source: str The :class:`.Source` associated with this :class:`.Dataset` organization_id: str The unique ID of the :class:`.Organization` which this :class:`.Dataset` belongs to organization: Organization The :class:`.Organization` which this :class:`.Dataset` belongs to metadata_template: MetadataTemplate The :class:`.MetadataTemplate` attached to this :class:`.Dataset`, if any data_dictionary: DataDictionary The :class:`.DataDictionary` defined within this :class:`.Dataset`, or describing the schema of the connected data if this is a :class:`.ConnectedDataset` created_at: datetime The date this :class:`.Dataset` was originally created updated_at: datetime The date this :class:`.Dataset`\\ 's metadata was last modified """ _client: 'Catalog' _context_organization: 'organization.Organization' id: str title: str description: Optional[str] uploader_id: str source: 'source.Source' source_id: str organization: 'organization.Organization' organization_id: str metadata_template: 'metadata_template.MetadataTemplate' data_dictionary: Union[DataDictionary, MetadataOnlyDataDictionary] created_at: datetime updated_at: datetime def __str__(self) -> str: return f"<Dataset id={self.id} title={self.title} organization_id={self.organization_id}>" def _get_templated_metadata(self) -> List['metadata_field.MetadataField']: if self._field_template is None: return [] templated_fields = [] custom_field_keys = list(map( lambda f: f["key"], self._custom_fields)) if self._custom_fields is not None else [] # Add each field from custom_fields corresponding to a field in the template for field in self._field_template.fields: # if there is field in custom_fields with the same key as a template field, ensure it is the same type as the template field then add it to templated_metadata if field["key"] in custom_field_keys: matched_custom_field = next( (x for x in self._custom_fields if x.key == field.key), None) if metadata_field._check_field_match( field=matched_custom_field, templated_field=field) == False: raise CatalogInternalException( message= f"The type of the field with key {field.key} on this Dataset does not match the type of the field with key {field.key} on its attached MetadataTemplate" ) templated_fields.append(matched_custom_field) # if there is field missing from custom_fields but present in the metadata template, add it directly from the template fields list to templated_metadata else: templated_fields.append( metadata_field._convert_template_field(field)) return templated_fields def _get_custom_metadata(self) -> List['metadata_field.MetadataField']: metadata_template_field_keys = list( map(lambda f: f["key"], self._field_template.fields) ) if self._field_template is not None else [] # Add each field from custom_fields that does not correspond to a field in the template return [ field for field in self._custom_fields if field["key"] not in metadata_template_field_keys ] if self._custom_fields is not None else [] @property def custom_metadata(self) -> List['metadata_field.MetadataField']: """ A list of :class:`.MetadataField`\\ s attached to this :class:`.Dataset` that are not associated with an attached :class:`.MetadataTemplate` """ return self._get_custom_metadata()
[docs] def update_custom_metadata(self) -> 'editor.MetadataEditor': """ Provides a :class:`.MetadataEditor` which allows for the addition, removal, and alteration of :class:`.MetadataField`\\ s on this :class:`.Dataset` that are not associated with an attached :class:`.MetadataTemplate` Parameters ---------- None Returns ------- MetadataEditor An editor for adding, removing, and updating :class:`.MetadataField`\\ s on the :class:`.Dataset` which do not belong to a :class:`.MetadataTemplate` """ return editor.MetadataEditor(fields=self.custom_metadata, dataset=self)
@property def templated_metadata(self) -> List['metadata_field.MetadataField']: """ A list of :class:`.MetadataField`\\ s attached to this :class:`.Dataset` that are associated with an attached :class:`.MetadataTemplate` """ return self._get_templated_metadata()
[docs] def update_templated_metadata(self) -> 'editor.TemplatedMetadataEditor': """ Provides a :class:`.TemplatedMetadataEditor` which allows for the alteration of :class:`.MetadataField`\\ s on this :class:`.Dataset` that are associated with an attached :class:`.MetadataTemplate`. This object cannot add or remove :class:`.MetadataField`\\ s, that must be done on the :class:`.MetadataTemplate` directly. Parameters ---------- None Returns ------- TemplatedMetadataEditor An editor for updating :class:`.MetadataField`\\ s on the :class:`.Dataset` that are associated with an attached :class:`.MetadataTemplate` """ return editor.TemplatedMetadataEditor(fields=self.templated_metadata, template=self._field_template, dataset=self)
@property def is_connected(self) -> bool: return False
[docs] @classmethod def get(cls, client: 'Catalog', id: str, context_organization: Optional['organization.Organization'] = None ): """ Retrieve a :class:`.Dataset` Parameters ---------- client : Catalog The :class:`.Catalog` client to use to get the :class:`.Dataset` id : str The unique ID of the :class:`.Dataset` context_organization : Optional[Organization] The :class:`.Organization` from which this :class:`.Dataset` is being retrieved. :class:`.Dataset`\\ 's may be accessible from multiple :class:`.Organization`\\ 's, but can have differing metadata within each. This context parameter is necessary to determine which metadata to load. Returns ------- Dataset The :class:`.Dataset` associated with the given ID Raises ------ CatalogPermissionDeniedException If the caller is not allowed to retrieve the given :class:`.Dataset` CatalogNotFoundException If the given :class:`.Dataset` ID does not exist CatalogException If call to the :class:`.Catalog` server fails """ try: res = client._get_dataset(id=id) # pick out the valid versions and call list_properties, # which returns glossary terms information versions = None if "versions" not in res["dataset"] or res[ "dataset"]["versions"] is None else [ v for v in res["dataset"]["versions"] if v["state"] == "draft" or v["state"] == "imported" ] if versions is not None and len(versions) > 0: properties = client._list_properties( version_ids=[versions[0]["id"]], organization_ids=[res["dataset"]["organization_id"]]) # overwrite properties for versions[0], to include # glossary term info versions[0]["properties"] = properties res["dataset"]["versions"] = versions if "reference_id" in res["dataset"]: d = ConnectedDataset(client, **res["dataset"]) d._context_organization = context_organization return d d = cls(client, **res["dataset"]) d._context_organization = context_organization return d except Exception as e: err = _raise_error(e, "Unable to fetch Dataset {}".format(id))
[docs] def attach_template(self, template: 'metadata_template.MetadataTemplate'): """ Attach a :class:`.MetadataTemplate` to this :class:`.Dataset`. Values may be supplied to templated fields immediately, but the template will only be attached when class:`.Dataset` `.save()` is called. Parameters ---------- template : MetadataTemplate The :class:`.MetadataTemplate` to be attached to the :class:`.Dataset` Returns ------- Dataset The :class:`.Dataset` with a newly attached :class:`MetadataTemplate` """ if template is not None: _update_fields_after_template_attachment(dataset=self, template=template) return self
[docs] def detach_template(self): """ Remove the attached :class:`.MetadataTemplate` from this :class:`.Dataset`. Any fields from this :class:`.MetadataTemplate` will remain on the :class:`.Dataset` but as individual :class:`.MetadataField`\\ s. Detachment happens instantly and calling :class:`.Dataset`.save() is not necessary for the changes to persist Parameters ---------- None Returns ------- Dataset The :class:`.Dataset` with no attached :class:`.MetadataTemplate` """ try: self._client._detach_field_template( organization_id=self.organization_id, dataset_id=self.id, field_template_id=self._field_template.id) except Exception as e: raise _convert_error(e) self._field_template = None return self
[docs] def save(self) -> None: """ Update this :class:`.Dataset`, saving all changes to its metadata fields. Raises ------ CatalogPermissionDeniedException If the caller is not allowed to update this :class:`.Dataset` CatalogException If call to the :class:`.Catalog` server fails """ try: res = self._client._update_dataset(dataset=self.serialize()) self.deserialize(res) except Exception as e: raise _convert_error(e)
[docs] def delete(self) -> None: """ Delete this :class:`.Dataset`\\. The :class:`.Dataset` object should not be used after this method is invoked successfully. Raises ------ CatalogPermissionDeniedException If the caller is not allowed to delete this :class:`.Dataset` CatalogException If call to the :class:`.Catalog` server fails """ try: self._client._delete_dataset(id=self.id) except Exception as e: raise _convert_error(e)
[docs] def connect(self) -> 'DatasetConnector': """ Converts a :class:`.Dataset` into a ConnectedDataset, by accessing data via an :class:`.IngestionConnection` or :class:`.VirtualizationConnection`. A :class:`.ConnectedDataset` can represent ingested data, which is copied into the :class:`.Catalog` platform, or virtualized data which is accessed remotely by the platform without being copied. There are many methods for connecting a :class:`.Dataset`, thus a helper object is returned with various method-based workflows that aid in connecting to data. Returns ------- DatasetConnector A helper object for configuring this :class:`.Dataset`\\ 's connection to data. """ return DatasetConnector(self)
[docs] def refresh(self) -> 'Dataset': """ Return a fresh copy of this :class:`.Dataset`, with up-to-date property values. Useful after performing an update, connection, etc. """ return Dataset.get(self._client, self.id, self._context_organization)
[docs] def list_topics(self, organization_id: Optional[str] = None, filter: Optional[Filter] = None) -> 'List[topic.Topic]': """ Retrieves the list of all :class:`.Topic`\\ s this :class:`.Dataset` is currently classified under, within the given :class:`.Organization` Parameters ---------- organization_id : Optional[str] An optional ID for an :class:`.Organization` other than the original :class:`.Organization` the :class:`.Dataset` was created in (e.g. if the :class:`.Dataset` has been shared to another organization with a different set of :class:`.Topic`\\ s) filter : Optional[Filter] An optional :class:`.tdw_catalog.utils.Filter` to offset or limit the list of :class:`.Topic`\\ s returned Returns ------- List[Topic] The list of :class:`.Topic`\\ s that have been classified to this :class:`.Dataset` Raises ------ CatalogPermissionDeniedException If the caller is not allowed to list :class:`.Topic`\\ s in this :class:`.Organization` CatalogException If call to the :class:`.Catalog` server fails """ try: topics = self._client._get_topics( dataset_id=self.id, organization_id=self.organization_id if organization_id is None else organization_id, filter=filter) return [topic.Topic(self._client, **t) for t in topics] except Exception as e: raise _convert_error(e)
[docs] def classify(self, topic: 'topic.Topic') -> None: """ Classify this :class:`.Dataset` with a :class:`.Topic`, linking them semantically Parameters ---------- topic : Topic The :class:`.Topic` to classify this :class:`.Dataset` with Returns ------- None Raises ------ CatalogPermissionDeniedException If the caller is not allowed to classify :class:`.Dataset`\\ s, or if the :class:`.Topic` ID provided does not correspond to an existing :class:`.Topic` CatalogException If call to the :class:`.Catalog` server fails """ try: self._client._set_topic(id=topic.id, dataset_id=self.id, organization_id=self.organization_id) except Exception as e: raise _convert_error(e)
[docs] def declassify(self, topic: 'topic.Topic') -> None: """ Remove a :class:`.Topic` classification from this :class:`.Dataset` Parameters ---------- topic : Topic The :class:`.Topic` to be unclassified from this :class:`.Dataset` Returns ------- None Raises ------ CatalogPermissionDeniedException If the caller is not allowed to declassify :class:`.Dataset`\\ s, or if the :class:`.Topic` ID provided does not correspond to an existing :class:`.Topic` CatalogException If call to the :class:`.Catalog` server fails """ try: self._client._unset_topic(id=topic.id, dataset_id=self.id, organization_id=self.organization_id) except Exception as e: raise _convert_error(e)
[docs]@Entity([ Property("reference_id", str, relation="tdw_catalog._reference._Reference", readable=False), Property("exports_disabled", bool, writable=True), Property("imported_at", datetime, display_key="metrics_last_collected_at"), # TODO versions & revisions Property("reference_next_ingest", Optional[datetime], display_key="next_scheduled_metrics_collection_time"), Property("reference_failed_at", Optional[datetime], display_key="last_metrics_collection_failure_time"), Property("warehouse_metadata", Optional[List[metadata_field.MetadataField]], deserialize=metadata_field._deserialize_metadata_fields), ]) class ConnectedDataset(Dataset): """ A :class:`.ConnectedDataset` is identical to a :class:`.Dataset` and inherits all of its fields, but represents a :class:`.Dataset` which is connected to the actual underlying data asset via a Connection. A :class:`.ConnectedDataset` supports queries, export, health monitoring, etc. Attributes __________ exports_disabled: bool A flag to mark if this :class:`.Dataset` may be exported. Setting this to false does not prevent querying on this :class:`.Dataset`. Only relevant if the :class:`.Dataset` is connected to data. warehouse: str The underlying data warehouse where that data resides metrics_last_collected_at: datetime The last time metrics were collected for this :class:`.Dataset` (virtualized :class:`.Dataset`\\ s) or the last time the :class:`.Dataset` was imported (ingested :class:`.Dataset`\\ s). next_scheduled_metrics_collection_time: Optional[datetime] If this :class:`.Dataset` has an associated connection schedule, the next time this dataset will collect metrics (virtualized Dataset) or import (ingested :class:`.Dataset`\\ s). last_metrics_collection_failure_time: datetime The most recent time metrics collection (virtualized :class:`.Dataset`\\ s) or import (ingested :class:`.Dataset`\\ s) failed. ``None`` if metrics collection has never failed. warehouse_metadata: Optional[List[metadata_field.MetadataField]] Harvested metadata from virtualized :class:`.Dataset`\\ s. ``None`` for ingested :class:`.Dataset`\\ s. """ exports_disabled: bool metrics_last_collected_at: datetime next_scheduled_metrics_collection_time: Optional[datetime] last_metrics_collection_failure_time: Optional[datetime] warehouse_metadata: 'Optional[List[metadata_field.MetadataField]]' @property def is_connected(self) -> bool: return True
[docs] def refresh(self) -> 'ConnectedDataset': """ Return a fresh copy of this :class:`.ConnectedDataset`, with up-to-date property values. Useful after performing an update, connection, etc. """ return ConnectedDataset.get(self._client, self.id)
@property def connection_id(self) -> str: """"The ID of the underlying :class:`.IngestionConnection` or :class:`.Virtualization` which links this :class:`.Dataset` to data""" return self._reference.connection_id @property def connection( self ) -> Union['connection_package.IngestionConnection', 'connection_package.VirtualizationConnection']: """"The underlying :class:`.IngestionConnection` or :class:`.VirtualizationConnection` which links this :class:`.Dataset` to data""" return self._reference.connection @property def warehouse(self) -> 'warehouse_package.Warehouse': """ The :class:`.Warehouse` where the connected data is virtualized from, or ingested to """ return warehouse_package.Warehouse.get(self._client, self._reference.warehouse, self.organization_id) @property def import_state(self) -> ImportState: return self._reference.state @property def health_monitoring_enabled(self) -> bool: """ Whether or not :class:`.Catalog` platform health monitoring is enabled for this :class:`.ConnectedDataset` """ return self._reference.dataspec_enabled @health_monitoring_enabled.setter def set_health_monitoring_enabled(self, health_monitoring_enabled: bool): """ Enable or disable health monitoring for this :class:`.ConnectedDataset` """ self._reference.dataspec_enabled = health_monitoring_enabled @property def metrics_collection_schedules( self) -> Optional[List['connection_package.ConnectionSchedule']]: """ Returns all configured schedules for metrics collection, which govern health monitoring intervals and ingestion intervals for ingested :class:`.Dataset`\\ s """ return self._reference.ingest_schedules @metrics_collection_schedules.setter def set_metrics_collection_schedules(self, schedules: Optional[ List['connection_package.ConnectionSchedule']]): """ Set or clear metrics collection schedules """ self._reference.ingest_schedules = schedules @property def advanced_configuration(self) -> str: """ This configuration string is auto-generated during ingest, or when virtualization, inferred from the connected data. It can be modified, with caution, to alter how the :class:`.Catalog` perceives and represents the connected data. Modification of this configuration without support from ThinkData Works is not recommended. """ return self._reference.ingest_config @advanced_configuration.setter def set_advanced_configuration(self, ingest_config: str): """ This configuration string is auto-generated during ingest, or when virtualization, inferred from the connected data. It can be modified, with caution, to alter how the :class:`.Catalog` perceives and represents the connected data. Modification of this configuration without support from ThinkData Works is not recommended. """ self._reference.ingest_config = ingest_config
[docs] def connect(self) -> 'DatasetConnector': """ Manage all connection-related aspects of this :class:`.ConnectedDataset`. There are many methods for connecting a :class:`.Dataset`, thus a helper object is returned with various method-based workflows that aid in connecting to data. Returns ------- DatasetConnector A helper object for configuring this :class:`.Dataset`\\ 's connection to data. """ return super().connect()
[docs] async def reconnect(self): """ Manually triggers a reimport of ingested data for ingested datasets, and metrics collection (health monitoring, etc.) for virtualized and ingested datasets. Useful for forcing a metrics collection, or applying changes made to the advanced_configuration. """ try: self._reference.force_fail() except: pass await self._reference.ingest()
def __str__(self) -> str: return f"<ConnectedDataset id={self.id} title={self.title} organization_id={self.organization_id}>" def _get__reference(self): return _reference._Reference.get(self._client, self._reference_id, self.organization_id)
[docs] async def query(self, query: Optional[str] = None) -> QueryCursor: """ Async function which returns a Python DB API-style Cursor object (PEP 249), representing the results of the supplied SQL-like NiQL query executed against the connected data. Note that NIQL supports most standard SQL keywords, but keywords which modify underlying data (e.g. ``INSERT``, ``UPDATE``, ``DELETE``) may not be used. Note that the :class:`.Catalog` platform supports a global limit on results (10,000 rows) from a single query. To refer to the current dataset in the query, include ``{this}`` in the query, such as: ``"SELECT * FROM {this}"``. Parameters ---------- query : Optional[str] A NiQL query used to filter or reshape the data before exporting Returns ------- QueryCursor The query results cursor, which can be printed, converted to a ``pandas`` DataFrame via ``pd.DataFrame(res.fetchall())``, etc. Raises ------ CatalogPermissionDeniedException If the caller is not allowed to query data CatalogInvalidArgumentException If the given query is invalid CatalogException If call to the :class:`.Catalog` server fails, or the export process itself fails """ try: q = "SELECT * FROM {this}" if query is None else query q = q.format(this=self.id) res = self._client._query(statement=q) return QueryCursor(res) except Exception as e: raise _convert_error(e)
async def _do_export( self, query: Optional[str] = None, format: Optional[_ExportFormat] = None) -> Dict[str, any]: try: file_name = 'export.parquet' if format is _ExportFormat.PARQUET else 'export.csv' export = self._client._create_export(query=query, format=format, file_name=file_name) export_in_progress = True while export_in_progress: finished_export_details = None # when the export was already been done before, this 'export' key # will be available right away. if 'export' in export: finished_export_details = self._client._get_export( id=export['export']['id']) else: finished_export_details = self._client._get_export( id=export['id']) if 'export' in finished_export_details: state = finished_export_details['export']['state'] else: state = finished_export_details['state'] if state == 'finished': break elif state == 'failed': # if a CSV_GZIP export fails, fallback to CSV if format is _ExportFormat.CSV_GZIP: return await self._do_export(query, format=_ExportFormat.CSV) raise CatalogException( message=finished_export_details["error_info"] ["message"]) await sleep(2) return finished_export_details except Exception as e: raise _convert_error(e)
[docs] async def export_csv(self, query: Optional[str] = None) -> CSVExport: """ Async function which returns the URL which can be used to stream a CSV-formatted copy of the connected data, optionally filtered by the supplied SQL-like NiQL query. Note that most standard SQL keywords are supported, but keywords which modify underlying data (e.g. ``INSERT``, ``UPDATE``, ``DELETE``) are not. To refer to the current dataset in the query, include ``{this}`` in the query, such as: ``"SELECT * FROM {this}"``. Unlike ``ConnectedDataset.query()``, there is no limit on exported rows, other than any imposed by the underlying warehouse. Parameters ---------- query : Optional[str] A NiQL query used to filter or reshape the data before exporting Returns ------- CSVExport An :class:`.CSVExport` object containing a signed download URL which can be used to fetch the exported data. It can be downloaded in its entirety, or streamed in chunks. This :class:`.CSVExport` object improves the usability of the CSV data when employing ``pandas``, including a configuration for ``read_csv`` which can be passed via ``**export`` as follows: ``df = pd.read_csv(export.url, **export)``, ensuring that the resultant ``DataFrame`` has the correct schema for all fields (including dates). Note: Is is recommended that `export_parquet` be employed for use with ``pandas`` when supported by the underlying warehouse. Raises ------ CatalogPermissionDeniedException If the caller is not allowed to export data CatalogInvalidArgumentException If the given query is invalid CatalogException If call to the :class:`.Catalog` server fails, or the export process itself fails """ q = "SELECT * FROM {this}" if query is None else query q = q.format(this=self.id) schema_q = "SELECT * FROM ({q}) LIMIT 0".format(q=q) # run the export and query in parellel, using the query to fetch the schema [query_res, finished_export_details ] = await gather(self.query(schema_q), self._do_export(q, format=_ExportFormat.CSV_GZIP)) export_format = _ExportFormat.CSV_GZIP if finished_export_details[ "export"]["format"] == "csv.gzip" else _ExportFormat.CSV return CSVExport._from_export_details(query_res, finished_export_details, export_format)
[docs] async def export_parquet(self, query: Optional[str] = None) -> ParquetExport: """ Async function which returns the URL which can be used to stream a Parquet-formatted copy of the connected data, optionally filtered by the supplied SQL-like NiQL query. Note that most standard SQL keywords are supported, but keywords which modify underlying data (e.g. ``INSERT``, ``UPDATE``, ``DELETE``) are not. To refer to the current dataset in the query, include ``{this}`` in the query, such as: ``"SELECT * FROM {this}"``. Unlike ``ConnectedDataset.query()``, there is no limit on exported rows, other than any imposed by the underlying warehouse. Note: Parquet export is not (yet) supported for all underlying warehouse types, but this export method should be preferred when interfacing with ``pandas`` whenever possible. Parameters ---------- query : Optional[str] A NiQL query used to filter or reshape the data before exporting Returns ------- ParquetExport An :class:`.ParquetExport` object containing a signed download URL which can be used to fetch the exported data. It can be downloaded in its entirety, or streamed in chunks. This :class:`.ParquetExport` object can be directly employed by ``pandas`` as follows: ``df = pd.read_parquet(export.url)``. Note that ``pandas`` requires ``pyarrow`` OR ``fastparquet`` in order to ``read_parquet``. Note: Is is recommended that `export_parquet` be employed for use with pandas when supported by the underlying warehouse. Raises ------ CatalogPermissionDeniedException If the caller is not allowed to export data CatalogInvalidArgumentException If the given query is invalid, or if Parquet export is not available for this warehouse type CatalogException If call to the :class:`.Catalog` server fails, or the export process itself fails """ q = "SELECT * FROM {this}" if query is None else query q = q.format(this=self.id) finished_export_details = await self._do_export( q, format=_ExportFormat.PARQUET) return ParquetExport._from_export_details(finished_export_details)
def _update_fields_after_template_attachment( dataset: 'Dataset', template: 'metadata_template.MetadataTemplate') -> None: # steps for setting the template # remove all of the original template's fields from the custom fields list # convert the new templates fields list to MetadataField objects (instead of MetadataTemplateField objects) # update the values of the new template fields list # set the dataset metadata_template to the new template # set the new template original_fields_list = dataset._custom_fields if dataset._custom_fields is not None else [] original_template_fields = dataset._field_template.fields if dataset._field_template is not None else None original_template_keys = list( map(lambda f: f["key"], original_template_fields) ) if original_template_fields is not None else [] # remove the old template fields from the custom fields list filtered_fields_list = list( filter(lambda f: f["key"] not in original_template_keys, original_fields_list)) # set the new field template, so that it will be saved when dataset.save() is called dataset._field_template = template # map the new field template fields to normal MetadataField's template_fields = list( map(lambda f: metadata_field._convert_template_field(f), dataset._field_template.fields) ) if dataset._field_template.fields is not None else [] # update the template_fields with any pre-existing field values for field in template_fields: pre_existing = next((f for f in filtered_fields_list if f.key == field.key and type(f) == type(field)), None) if pre_existing is not None: # update the template field value if it doesn't have one if field.value is None: field.value = pre_existing.value # remove the pre-existing field from the custom fields list filtered_fields_list = [ f for f in filtered_fields_list if f.key != field.key ] # add the new templates fields to the custom fields list dataset._custom_fields = template_fields + filtered_fields_list