Source code for tdw_catalog.list_datasets

from dataclasses import dataclass
from datetime import datetime
from enum import IntEnum
import sys
from typing import List, Optional, TYPE_CHECKING
from tdw_catalog import organization_member, source, topic, warehouse

from tdw_catalog.utils import LegacyFilter as BaseFilter, FilterSortOrder, ImportState, _convert_datetime_to_nullable_timestamp
if sys.version_info >= (3, 11):
    from enum import StrEnum
else:
    from backports.strenum import StrEnum

if TYPE_CHECKING:
    from tdw_catalog import Catalog


[docs]class SortableField(StrEnum): """ The different fields which ``list_datasets`` on :class:`.Organization` can be sorted by """ TITLE = "title", CREATED_AT = "created_at", IMPORTED_AT = "imported_at", UPDATED_AT = "updated_at", STATE = "reference_state", NEXT_INGEST = "reference_next_ingest", FAILED_AT = "reference_failed_at", SOURCE_NAME = "source_label"
[docs]class TimestampField(IntEnum): """ The different possible fields that can be used to construct a :class:`.TimestampRange` filter for ``list_datasets`` on :class:`.Organization` """ CREATED_AT = 0, UPDATED_AT = 1, IMPORTED_AT = 2, NEXT_INGEST = 3, FAILED_AT = 5
[docs]@dataclass class TimestampRange(): """ Used to construct a temporal filter for ``list_datasets`` on :class:`.Organization`\\ , where a filter specifies a :class:`.TimestampField` and a time range """ filter_by: TimestampField start_time: Optional[datetime] end_time: Optional[datetime]
[docs]@dataclass class Sort(): """ Used to sort the results of ``list_datasets`` on :class:`.Organization`\\ . """ field: SortableField order: Optional[FilterSortOrder] = FilterSortOrder.ASC def serialize(self): new_sort = {} new_sort["value"] = self.field new_sort[ "order"] = "ASC" if self.order == FilterSortOrder.ASC else "DESC"
[docs]@dataclass class DatasetAlias(): """ Used to sort the results of ``list_datasets`` by specific aliases """ alias_key: str alias_values: List[str] def serialize(self): return { "alias_key": self.alias_key, "alias_values": self.alias_values, }
[docs]@dataclass class Filter(BaseFilter): """ :class:`.ListOrganizationDatasetsFilter` filters the results from ``list_datasets`` on :class:`.Organization`\\ . Attributes ---------- keywords : Optional[List[str]] Filters results according to the specified keyword(s) (fuzzy matching is supported) dataset_ids : Optional[List[str]] Filters results to the list of given :class:`.Dataset`\\ id(s) datset_aliases : Optional[List[DatasetAlias]] Filters results to the list of given :class:`.Dataset`\\ alias(es) sources : Optional[List[Source]] Filters results to the list of given :class:`.Source`\\ (s) topics : Optional[List[Topic]] Filters results to the list of given :class:`.Topic`\\ (s) creators : Optional[List[OrganizationMember]] Filters results to the list of given :class:`.OrganizationMember`\\ (s), who created the returned :class:`.Dataset`\\ s state : Optional[List[ImportState]] Filters results to the list of given :class:`.ImportState`\\ s. Note that virtualized datasets will always be categorized as ``IMPORTED``. warehouses: Optional[List[Warehouse]] Filters results to the list of given :class:`.Warehouse`\\ s timestamp_range : Optional[TimestampRange] Filters results to the within the given :class:`.TimestampRange` sort : Optional[Sort] Sorts filtered results according to the provided :class:`.Sort` structure """ keywords: Optional[List[str]] = None dataset_ids: 'Optional[List[str]]' = None dataset_aliases: 'Optional[List[DatasetAlias]]' = None reference_ids: Optional[List[str]] = None sources: 'Optional[List[source.Source]]' = None topics: 'Optional[List[topic.Topic]]' = None creators: 'Optional[List[organization_member.OrganizationMember]]' = None states: Optional[List[ImportState]] = None warehouses: 'Optional[List[warehouse.Warehouse]]' = None timestamp_range: Optional[TimestampRange] = None sort: Optional[Sort] = None def serialize(self): new_filter = super().serialize() new_filter["exclude_versions"] = True if self.keywords is not None: new_filter["keywords"] = self.keywords if self.dataset_ids is not None: new_filter["dataset_ids"] = self.dataset_ids if self.dataset_aliases is not None: new_filter["dataset_aliases"] = [ a.serialize() for a in self.dataset_aliases ] if self.sources is not None: new_filter["source_ids"] = [s.id for s in self.sources] if self.topics is not None: new_filter["topic_ids"] = [t.id for t in self.topics] if self.sort is not None: new_filter["sort"] = self.sort.serialize() if self.creators is not None: new_filter["uploader_ids"] = [u.id for u in self.creators] if self.states is not None: new_filter["reference_states"] = self.states if self.warehouses is not None: new_filter["warehouses"] = [w.name for w in self.warehouses] if self.timestamp_range is not None: start_timestamp = _convert_datetime_to_nullable_timestamp( self.timestamp_range.start_time) end_timestamp = _convert_datetime_to_nullable_timestamp( self.timestamp_range.end_time) new_filter["timestamp_range"] = { "filter_by": self.timestamp_range.filter_by, "from": { "is_null": start_timestamp["is_null"], "timestamp": { "seconds": start_timestamp["timestamp"]["seconds"], "nanos": start_timestamp["timestamp"]["nanos"] } }, "to": { "is_null": end_timestamp["is_null"], "timestamp": { "seconds": end_timestamp["timestamp"]["seconds"], "nanos": end_timestamp["timestamp"]["nanos"] } } } return new_filter
class _ListDatasetsIterator(): _client: 'Catalog' _dataset_ids: List[str] def __init__(self, client: 'Catalog', dataset_ids: List[str]): self._client = client self._dataset_ids = dataset_ids self._index = 0 def __iter__(self): return self def __next__(self): import tdw_catalog.dataset as dataset if self._index < len(self._dataset_ids): item = self._dataset_ids[self._index] self._index += 1 return dataset.Dataset.get(client=self._client, id=item) else: raise StopIteration