Source code for tdw_catalog.dataset_connector

import mimetypes
import os
from typing import Optional, Union
from asyncio import sleep
from aiohttp import ClientSession, ServerDisconnectedError

from tdw_catalog import Catalog
import tdw_catalog.dataset as dataset
from tdw_catalog.connection import IngestionConnection
import tdw_catalog._reference as _reference
from tdw_catalog.errors import CatalogException, CatalogInvalidArgumentException
from tdw_catalog.utils import ConnectionPortalType
from tdw_catalog.warehouse import TargetWarehouse


[docs]class DatasetConnector: """ A helper object for configuring a :class:`.Dataset`\\ 's connection to data. Can either connect a :class:`.Dataset` for the first time, or reconnect an already-connected :class:`.Dataset` to different data. """ _client: 'Catalog' _d: 'Union[dataset.Dataset, dataset.ConnectedDataset]' _r: '_reference._Reference' def __init__(self, d: 'Union[dataset.Dataset,dataset.ConnectedDataset]'): self._d = d self._client = d._client if d.is_connected: self._r = d._reference else: self._r = _reference._Reference(dataset_id=d.id, organization_id=d.organization_id, client=self._client) # TODO support ingest from csv string (probably some kind of StringIO)
[docs] async def ingest_from_file( self, local_file_path: str, connection: Optional[IngestionConnection] = None, target_warehouse: Optional[TargetWarehouse] = None, ) -> 'dataset.ConnectedDataset': """ Async function which uploads a local file to the :class:`.Catalog` platform and ingests it, connecting this :class:`.Dataset` to that ingested data. Parameters ---------- file_path : str The path to the file on disk. The file will be streamed from disk, rather than read into memory, to ensure large files upload successfully. connection : Optional[IngestionConnection] Optionally specify a file upload-type IngestionConnection for use. This :class:`.IngestionConnection` must reside within the existing :class:`.Dataset`\\ 's Source, and must be of the correct type (:class:`.ConnectionPortalType.IMPORT_LITE`). If not provided, the first available file upload Connection within the :class:`.Dataset`'s source will be used, or one will be created if none are available. warehouse : Optional[TargetWarehouse] Optionally specify a target warehouse to ingest to. If omitted, the :class:`.TargetWarehouse` specified by the :class:`.IngestionConnection` will be used, or the default :class:`.TargetWarehouse` for the :class:`.Organization` if the :class:`.IngestionConnection` does not specify a default :class:`.TargetWarehouse`. Returns ------- ConnectedDataset The newly connected :class:`.Dataset`, if it was not connected previously, or an updated version of the existing :class:`.ConnectedDataset` if it was connected previously. Further :class:`.Dataset` operations should be performed on this returned object. Raises ------ FileNotFoundError If the specified file_path does not exist CatalogPermissionDeniedException If the caller is not allowed to perform any of the steps involved in ingest data from a file CatalogInvalidArgumentException If the given :class:`.IngestionConnection` cannot be used CatalogException If call to the :class:`.Catalog` server fails, or the ingest process itself fails """ file_name = os.path.basename(local_file_path) file_size = os.path.getsize(local_file_path) (mime_type, encoding) = mimetypes.guess_type(local_file_path) if mime_type is None: mime_type = 'text/csv' # TODO verify whether already connected and prevent illegal # connection-type switches # Use provided connection, or find a file upload connection on source, # or create one if one doesn't exist conn = connection if conn is not None and conn.source_id != self._d.source.id: raise CatalogInvalidArgumentException( message= "Provided Connection's source_id \"{csid}\" must match dataset source_id \"{dsid}\"" .format(csid=connection.source_id, dsid=self._d.source_id)) if conn is not None and conn.portal != ConnectionPortalType.IMPORT_LITE: raise CatalogInvalidArgumentException( message= "Provided Connection is not a file upload-type Connection") if conn is None: conn = next( iter([ c for c in self._d.source.list_connections() if c.portal == ConnectionPortalType.IMPORT_LITE ]), None) if conn is None: conn = self._d.source.create_ingestion_connection( label="{source_name} - File Upload".format( source_name=self._d.source.label), portal=ConnectionPortalType.IMPORT_LITE, url="upload://") # get a signed upload url for GCS upload_req = self._client._get_import_lite_signed_url( organization_id=self._d.organization_id, content_length=str(file_size), content_type=mime_type, filename=file_name, source_id=self._d.source_id, dataset_id=self._d.id, ) async with ClientSession() as session: # snag an upload location from the signed url location_fetch_res = await session.post( upload_req["signed_url"], allow_redirects=True, headers={ 'Content-Type': mime_type, 'x-goog-resumable': 'start', 'x-goog-meta-original-file': file_name, }) if location_fetch_res.status != 201: raise CatalogException( message= 'Unable to generate upload staging location from signed URL' ) location = location_fetch_res.headers['location'] # upload file to signed location (streaming) # https://docs.aiohttp.org/en/stable/client_quickstart.html#streaming-uploads with open(local_file_path, 'rb') as f: upload_res = await session.put( location, headers={'Content-Type': mime_type}, data=f) if upload_res.status != 200: raise CatalogException( message='Unable to upload file to staging location') # verify file is available in bucket (wait up to 3 seconds) upload_verified = False upload_check_url = upload_req["signed_url_get"] for i in range(0, 15): try: check_res = await session.head( upload_check_url, headers={'Content-Type': mime_type}) if check_res.status == 200: upload_verified = True break except ServerDisconnectedError: pass await sleep(0.2) if not upload_verified: raise CatalogException( message= 'Unable to verify that file was successfully uploaded') # figure out target warehouse warehouse = self._r.warehouse if target_warehouse is not None: warehouse = target_warehouse.name if warehouse is None or len(warehouse) == 0: warehouse = conn.warehouse if warehouse is None or len(warehouse) == 0: warehouses = self._client._list_warehouses( organization_id=self._r.organization_id) warehouse = next(map(lambda w: w["name"], warehouses), None) if warehouse is None: raise CatalogException( message='Unable to determine warehouse for data import') # update reference ingest properties self._r.url = upload_req['data_reference_url'] self._r.connection_id = conn.id self._r.warehouse = warehouse self._r.pipeline = "jabba" # force fail other ingests if reference exists already try: self._r.force_fail() except: pass # call CreateReference or UpdateReference as appropriate self._r.save() # call IngestReference and poll on job until it is finished await self._r.ingest() # update dataset and reference fields, or fetch newly connected dataset if self._d.is_connected and self._d._reference is None: self._d._reference = self._r self._d._reference_id = self._r.id else: return self._d.get(self._client, self._d.id, self._d._context_organization)
async def virtualize(self): pass