Source code for tdw_catalog.query

from datetime import datetime, timezone
from itertools import count
import threading
from typing import Dict, List, Optional


[docs]class QueryCursor: """ :class:`.QueryCursor` is a Python DB API-style Cursor object (PEP 249) for query results from the :class:`.Catalog`. Attributes __________ arraysize: number Read/write attribute that controls the number of rows returned by fetchmany(). The default value is 1 which means a single row would be fetched per call. description: List[tuple] Read-only attribute that provides the column names of the last query. To remain compatible with the Python DB API, it returns a 7-tuple for each column where the last five items of each tuple are None. """ _rows: List[tuple] _current_row: 0 _rowcount: count _array_size: int _description: List[tuple] def __init__(self, res: Dict[str, any]): def __format_cell(c: Dict[str, any], index: int) -> str: coltype = res['column_metadata'][index]['type'] if 'type' in res[ 'column_metadata'][index] else None jdbctype = res['column_metadata'][index][ 'jdbc_type'] if 'jdbc_type' in res['column_metadata'][ index] else None try: if 'is_null' in c: return None elif coltype == 'integer' or jdbctype == 'BIGINT' or jdbctype == 'SMALLINT' or jdbctype == 'TINYINT': return int(c['value']) elif coltype == 'decimal' or jdbctype == 'REAL' or jdbctype == 'FLOAT' or jdbctype == 'DOUBLE': return float(c['value']) elif coltype == 'date' and jdbctype == 'DATE': return datetime.strptime( c['value'], "%Y-%m-%d").replace(tzinfo=timezone.utc) elif coltype == 'date' and jdbctype == 'TIMESTAMP': # e.g. '1986-08-20 00:57:42.000' return datetime.strptime( c['value'], "%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=timezone.utc) else: return c['value'] except: return c['value'] self._description = [(h['key'], h['type'] if 'type' in h else None, None, None, None, None, None) for h in res['column_metadata']] self._rows = list( map( lambda row: tuple( [__format_cell(c, i) for i, c in enumerate(row['cells'])]), res['rows'] if 'rows' in res else [], )) self._rowcount = len(self._rows) self._current_row = count() self._lock = threading.Lock() self._array_size = 1 @property def arraysize(self) -> int: return self._array_size @arraysize.setter def set_arraysize(self, new_size: int): with self._lock: self._array_size = new_size @property def description(self) -> List[tuple]: return self._description
[docs] def fetchone(self) -> Optional[tuple]: """ Returns the next row query result set as a tuple. Return None if no more data is available. """ # lock fetchone so it is threadsafe with self._lock: next_index = next(self._current_row) try: return self._rows[next_index] except IndexError: return None
[docs] def fetchmany(self, size=None) -> List[tuple]: """ Return the next set of rows of a query result as a list. Return an empty list if no more rows are available. The number of rows to fetch per call is specified by the size parameter. If size is not given, arraysize determines the number of rows to be fetched. If fewer than size rows are available, as many rows as are available are returned. Note there are performance considerations involved with the size parameter. For optimal performance, it is usually best to use the arraysize attribute. If the size parameter is used, then it is best for it to retain the same value from one ``fetchmany()`` call to the next. """ with self._lock: tofetch = self._array_size if size == None else size # only keep indices that are within bounds indices = filter(lambda x: x < len(self._rows), [next(self._current_row) for i in range(tofetch)]) return [self._rows[i] for i in indices]
[docs] def fetchall(self) -> List[tuple]: """ Return all (remaining) rows of a query result as a list. Return an empty list if no rows are available. """ return self.fetchmany(len(self._rows))
[docs] def close(self): """ A no-op which is included to increase API-compability with Python DB API """ pass