import mimetypes
import os
from typing import Optional, Union
from asyncio import sleep
from aiohttp import ClientSession, ServerDisconnectedError
from tdw_catalog import Catalog
import tdw_catalog.dataset as dataset
from tdw_catalog.connection import IngestionConnection
import tdw_catalog._reference as _reference
from tdw_catalog.errors import CatalogException, CatalogInvalidArgumentException
from tdw_catalog.utils import ConnectionPortalType
from tdw_catalog.warehouse import TargetWarehouse
[docs]class DatasetConnector:
"""
A helper object for configuring a :class:`.Dataset`\\ 's connection to data.
Can either connect a :class:`.Dataset` for the first time, or reconnect
an already-connected :class:`.Dataset` to different data.
"""
_client: 'Catalog'
_d: 'Union[dataset.Dataset, dataset.ConnectedDataset]'
_r: '_reference._Reference'
def __init__(self, d: 'Union[dataset.Dataset,dataset.ConnectedDataset]'):
self._d = d
self._client = d._client
if d.is_connected:
self._r = d._reference
else:
self._r = _reference._Reference(dataset_id=d.id,
organization_id=d.organization_id,
client=self._client)
# TODO support ingest from csv string (probably some kind of StringIO)
[docs] async def ingest_from_file(
self,
local_file_path: str,
connection: Optional[IngestionConnection] = None,
target_warehouse: Optional[TargetWarehouse] = None,
) -> 'dataset.ConnectedDataset':
"""
Async function which uploads a local file to the :class:`.Catalog`
platform and ingests it, connecting this :class:`.Dataset` to
that ingested data.
Parameters
----------
file_path : str
The path to the file on disk. The file will be streamed
from disk, rather than read into memory, to ensure
large files upload successfully.
connection : Optional[IngestionConnection]
Optionally specify a file upload-type IngestionConnection for use.
This :class:`.IngestionConnection` must reside within the existing :class:`.Dataset`\\ 's
Source, and must be of the correct type (:class:`.ConnectionPortalType.IMPORT_LITE`).
If not provided, the first available file upload Connection
within the :class:`.Dataset`'s source will be used, or one will be
created if none are available.
warehouse : Optional[TargetWarehouse]
Optionally specify a target warehouse to ingest to. If omitted,
the :class:`.TargetWarehouse` specified by the :class:`.IngestionConnection` will be used,
or the default :class:`.TargetWarehouse` for the :class:`.Organization` if the
:class:`.IngestionConnection` does not specify a default :class:`.TargetWarehouse`.
Returns
-------
ConnectedDataset
The newly connected :class:`.Dataset`, if it was not connected previously,
or an updated version of the existing :class:`.ConnectedDataset` if it was
connected previously. Further :class:`.Dataset` operations should be performed
on this returned object.
Raises
------
FileNotFoundError
If the specified file_path does not exist
CatalogPermissionDeniedException
If the caller is not allowed to perform any of the steps involved in ingest data from a file
CatalogInvalidArgumentException
If the given :class:`.IngestionConnection` cannot be used
CatalogException
If call to the :class:`.Catalog` server fails, or the ingest process itself fails
"""
file_name = os.path.basename(local_file_path)
file_size = os.path.getsize(local_file_path)
(mime_type, encoding) = mimetypes.guess_type(local_file_path)
if mime_type is None:
mime_type = 'text/csv'
# TODO verify whether already connected and prevent illegal
# connection-type switches
# Use provided connection, or find a file upload connection on source,
# or create one if one doesn't exist
conn = connection
if conn is not None and conn.source_id != self._d.source.id:
raise CatalogInvalidArgumentException(
message=
"Provided Connection's source_id \"{csid}\" must match dataset source_id \"{dsid}\""
.format(csid=connection.source_id, dsid=self._d.source_id))
if conn is not None and conn.portal != ConnectionPortalType.IMPORT_LITE:
raise CatalogInvalidArgumentException(
message=
"Provided Connection is not a file upload-type Connection")
if conn is None:
conn = next(
iter([
c for c in self._d.source.list_connections()
if c.portal == ConnectionPortalType.IMPORT_LITE
]), None)
if conn is None:
conn = self._d.source.create_ingestion_connection(
label="{source_name} - File Upload".format(
source_name=self._d.source.label),
portal=ConnectionPortalType.IMPORT_LITE,
url="upload://")
# get a signed upload url for GCS
upload_req = self._client._get_import_lite_signed_url(
organization_id=self._d.organization_id,
content_length=str(file_size),
content_type=mime_type,
filename=file_name,
source_id=self._d.source_id,
dataset_id=self._d.id,
)
async with ClientSession() as session:
# snag an upload location from the signed url
location_fetch_res = await session.post(
upload_req["signed_url"],
allow_redirects=True,
headers={
'Content-Type': mime_type,
'x-goog-resumable': 'start',
'x-goog-meta-original-file': file_name,
})
if location_fetch_res.status != 201:
raise CatalogException(
message=
'Unable to generate upload staging location from signed URL'
)
location = location_fetch_res.headers['location']
# upload file to signed location (streaming)
# https://docs.aiohttp.org/en/stable/client_quickstart.html#streaming-uploads
with open(local_file_path, 'rb') as f:
upload_res = await session.put(
location, headers={'Content-Type': mime_type}, data=f)
if upload_res.status != 200:
raise CatalogException(
message='Unable to upload file to staging location')
# verify file is available in bucket (wait up to 3 seconds)
upload_verified = False
upload_check_url = upload_req["signed_url_get"]
for i in range(0, 15):
try:
check_res = await session.head(
upload_check_url, headers={'Content-Type': mime_type})
if check_res.status == 200:
upload_verified = True
break
except ServerDisconnectedError:
pass
await sleep(0.2)
if not upload_verified:
raise CatalogException(
message=
'Unable to verify that file was successfully uploaded')
# figure out target warehouse
warehouse = self._r.warehouse
if target_warehouse is not None:
warehouse = target_warehouse.name
if warehouse is None or len(warehouse) == 0:
warehouse = conn.warehouse
if warehouse is None or len(warehouse) == 0:
warehouses = self._client._list_warehouses(
organization_id=self._r.organization_id)
warehouse = next(map(lambda w: w["name"], warehouses), None)
if warehouse is None:
raise CatalogException(
message='Unable to determine warehouse for data import')
# update reference ingest properties
self._r.url = upload_req['data_reference_url']
self._r.connection_id = conn.id
self._r.warehouse = warehouse
self._r.pipeline = "jabba"
# force fail other ingests if reference exists already
try:
self._r.force_fail()
except:
pass
# call CreateReference or UpdateReference as appropriate
self._r.save()
# call IngestReference and poll on job until it is finished
await self._r.ingest()
# update dataset and reference fields, or fetch newly connected dataset
if self._d.is_connected and self._d._reference is None:
self._d._reference = self._r
self._d._reference_id = self._r.id
else:
return self._d.get(self._client, self._d.id,
self._d._context_organization)
async def virtualize(self):
pass