Source code for tdw_catalog.export

from typing import BinaryIO, Dict, List, Optional, Type
from datetime import date, datetime
from tdw_catalog.query import QueryCursor

from tdw_catalog.utils import _ExportFormat, _download_export, _parse_timestamp


class _Export(dict):
    query: str
    _format: _ExportFormat
    created_at: datetime
    started_at: datetime
    finished_at: datetime
    url: str

    def __getattr__(self, attr):
        return self[attr]


[docs]class CSVExport(_Export): """ :class:`.CSVExport` represents a signed download URL pointing to the CSV-formatted result of a :class:`.Dataset` ``export_csv()`` operation, alongside metadata concerning the exported data. This class is deliberately formatted for use with pandas' ``read_csv`` function, as follows: ``e1 = await dataset.export_csv()`` and ``df = pd.read_csv(e1.url, **e1)`` Attributes __________ query: str The query statement which was used to create the :class:`.Export` created_at: datetime The time this :class:`.Export` was originally created started_at: datetime The time this :class:`.Export` was started finished_at: datetime The time this :class:`.Export` was completed url: str The CSV-formatted export results can be downloaded via this signed URL dtype : Dict[str, Type] Metadata describing the schema of the exported data parse_dates: List[str] A list of columns within ``dtype`` that should be interpreted as dates true_values : List[str] A list of values to interpret as "truthy" false_values : List[str] A list of values to interpret as "falsey" compression : Optional[str] Indicates the compression format of the data, if any """ @classmethod def _map_types(cls, type: str) -> Type: if type == 'boolean': return bool elif type == 'string' or type == 'geometry': return str return object @classmethod def _from_export_details(cls, query_res: QueryCursor, finished_export_details: Dict[str, any], format: _ExportFormat) -> 'CSVExport': ex = CSVExport() ex._format = format ex.created_at = _parse_timestamp( finished_export_details['export']['created_at']) ex.started_at = _parse_timestamp( finished_export_details['export']['started_at']) ex.finished_at = _parse_timestamp( finished_export_details['export']['finished_at']) ex.url = finished_export_details['file_url'] ex['dtype'] = { x[0]: cls._map_types(x[1]) for x in query_res.description } ex['parse_dates'] = [ x[0] for x in query_res.description if x[1] == 'date' or x[1] == 'datetime' ] ex['true_values'] = ['t', 'T', '1'] ex['false_values'] = ['f', 'F', '0'] if format == _ExportFormat.CSV_GZIP: ex['compression'] = 'gzip' return ex
[docs] async def to_str(self) -> str: """ Downloads the export into an in-memory `str` Returns ------- str The CSV contents of this export """ return await _download_export(self.url, self._format)
[docs] async def to_stream(self, out: BinaryIO): """ Downloads the export into an on-disk file, or other stream Parameters ---------- out : io.BinaryIO The stream to write CSV data to """ return await _download_export(self.url, self._format, f_out=out)
[docs]class ParquetExport(_Export): @classmethod def _from_export_details( cls, finished_export_details: Dict[str, any]) -> 'ParquetExport': ex = ParquetExport() ex._format = _ExportFormat.PARQUET ex.created_at = _parse_timestamp( finished_export_details['export']['created_at']) ex.started_at = _parse_timestamp( finished_export_details['export']['started_at']) ex.finished_at = _parse_timestamp( finished_export_details['export']['finished_at']) ex.url = finished_export_details['file_url'] return ex
[docs] async def to_bytes(self) -> BinaryIO: """ Downloads the export into an in-memory buffer Returns ------- BinaryIO The Parquet contents of this export """ return await _download_export(self.url, format=_ExportFormat.PARQUET)
[docs] async def to_stream(self, out: BinaryIO): """ Downloads the export into an on-disk file, or other stream Parameters ---------- out : io.BinaryIO The stream to write Parquet data to """ return await _download_export(self.url, format=_ExportFormat.PARQUET, f_out=out)