from dataclasses import dataclass
from datetime import datetime
from enum import IntEnum
import sys
from typing import List, Optional, TYPE_CHECKING
from tdw_catalog import organization_member, source, topic, warehouse
from tdw_catalog.utils import LegacyFilter as BaseFilter, FilterSortOrder, ImportState, _convert_datetime_to_nullable_timestamp
if sys.version_info >= (3, 11):
from enum import StrEnum
else:
from backports.strenum import StrEnum
if TYPE_CHECKING:
from tdw_catalog import Catalog
[docs]class SortableField(StrEnum):
"""
The different fields which ``list_datasets`` on :class:`.Organization` can be sorted by
"""
TITLE = "title",
CREATED_AT = "created_at",
IMPORTED_AT = "imported_at",
UPDATED_AT = "updated_at",
STATE = "reference_state",
NEXT_INGEST = "reference_next_ingest",
FAILED_AT = "reference_failed_at",
SOURCE_NAME = "source_label"
[docs]class TimestampField(IntEnum):
"""
The different possible fields that can be used to construct a :class:`.TimestampRange` filter for ``list_datasets`` on :class:`.Organization`
"""
CREATED_AT = 0,
UPDATED_AT = 1,
IMPORTED_AT = 2,
NEXT_INGEST = 3,
FAILED_AT = 5
[docs]@dataclass
class TimestampRange():
"""
Used to construct a temporal filter for ``list_datasets`` on :class:`.Organization`\\ , where a filter specifies a :class:`.TimestampField` and a time range
"""
filter_by: TimestampField
start_time: Optional[datetime]
end_time: Optional[datetime]
[docs]@dataclass
class Sort():
"""
Used to sort the results of ``list_datasets`` on :class:`.Organization`\\ .
"""
field: SortableField
order: Optional[FilterSortOrder] = FilterSortOrder.ASC
def serialize(self):
new_sort = {}
new_sort["value"] = self.field
new_sort[
"order"] = "ASC" if self.order == FilterSortOrder.ASC else "DESC"
[docs]@dataclass
class DatasetAlias():
"""
Used to sort the results of ``list_datasets`` by specific aliases
"""
alias_key: str
alias_values: List[str]
def serialize(self):
return {
"alias_key": self.alias_key,
"alias_values": self.alias_values,
}
[docs]@dataclass
class Filter(BaseFilter):
"""
:class:`.ListOrganizationDatasetsFilter` filters the results from ``list_datasets``
on :class:`.Organization`\\ .
Attributes
----------
keywords : Optional[List[str]]
Filters results according to the specified keyword(s) (fuzzy matching is supported)
dataset_ids : Optional[List[str]]
Filters results to the list of given :class:`.Dataset`\\ id(s)
datset_aliases : Optional[List[DatasetAlias]]
Filters results to the list of given :class:`.Dataset`\\ alias(es)
sources : Optional[List[Source]]
Filters results to the list of given :class:`.Source`\\ (s)
topics : Optional[List[Topic]]
Filters results to the list of given :class:`.Topic`\\ (s)
creators : Optional[List[OrganizationMember]]
Filters results to the list of given :class:`.OrganizationMember`\\ (s), who created the returned :class:`.Dataset`\\ s
state : Optional[List[ImportState]]
Filters results to the list of given :class:`.ImportState`\\ s. Note that virtualized datasets will always be categorized as ``IMPORTED``.
warehouses: Optional[List[Warehouse]]
Filters results to the list of given :class:`.Warehouse`\\ s
timestamp_range : Optional[TimestampRange]
Filters results to the within the given :class:`.TimestampRange`
sort : Optional[Sort]
Sorts filtered results according to the provided :class:`.Sort` structure
"""
keywords: Optional[List[str]] = None
dataset_ids: 'Optional[List[str]]' = None
dataset_aliases: 'Optional[List[DatasetAlias]]' = None
reference_ids: Optional[List[str]] = None
sources: 'Optional[List[source.Source]]' = None
topics: 'Optional[List[topic.Topic]]' = None
creators: 'Optional[List[organization_member.OrganizationMember]]' = None
states: Optional[List[ImportState]] = None
warehouses: 'Optional[List[warehouse.Warehouse]]' = None
timestamp_range: Optional[TimestampRange] = None
sort: Optional[Sort] = None
def serialize(self):
new_filter = super().serialize()
new_filter["exclude_versions"] = True
if self.keywords is not None:
new_filter["keywords"] = self.keywords
if self.dataset_ids is not None:
new_filter["dataset_ids"] = self.dataset_ids
if self.dataset_aliases is not None:
new_filter["dataset_aliases"] = [
a.serialize() for a in self.dataset_aliases
]
if self.sources is not None:
new_filter["source_ids"] = [s.id for s in self.sources]
if self.topics is not None:
new_filter["topic_ids"] = [t.id for t in self.topics]
if self.sort is not None:
new_filter["sort"] = self.sort.serialize()
if self.creators is not None:
new_filter["uploader_ids"] = [u.id for u in self.creators]
if self.states is not None:
new_filter["reference_states"] = self.states
if self.warehouses is not None:
new_filter["warehouses"] = [w.name for w in self.warehouses]
if self.timestamp_range is not None:
start_timestamp = _convert_datetime_to_nullable_timestamp(
self.timestamp_range.start_time)
end_timestamp = _convert_datetime_to_nullable_timestamp(
self.timestamp_range.end_time)
new_filter["timestamp_range"] = {
"filter_by": self.timestamp_range.filter_by,
"from": {
"is_null": start_timestamp["is_null"],
"timestamp": {
"seconds": start_timestamp["timestamp"]["seconds"],
"nanos": start_timestamp["timestamp"]["nanos"]
}
},
"to": {
"is_null": end_timestamp["is_null"],
"timestamp": {
"seconds": end_timestamp["timestamp"]["seconds"],
"nanos": end_timestamp["timestamp"]["nanos"]
}
}
}
return new_filter
class _ListDatasetsIterator():
_client: 'Catalog'
_dataset_ids: List[str]
def __init__(self, client: 'Catalog', dataset_ids: List[str]):
self._client = client
self._dataset_ids = dataset_ids
self._index = 0
def __iter__(self):
return self
def __next__(self):
import tdw_catalog.dataset as dataset
if self._index < len(self._dataset_ids):
item = self._dataset_ids[self._index]
self._index += 1
return dataset.Dataset.get(client=self._client, id=item)
else:
raise StopIteration