from datetime import datetime, timezone
from itertools import count
import threading
from typing import Dict, List, Optional
[docs]class QueryCursor:
"""
:class:`.QueryCursor` is a Python DB API-style Cursor object (PEP 249) for query
results from the :class:`.Catalog`.
Attributes
__________
arraysize: number
Read/write attribute that controls the number of rows returned by fetchmany().
The default value is 1 which means a single row would be fetched per call.
description: List[tuple]
Read-only attribute that provides the column names of the last query.
To remain compatible with the Python DB API, it returns a 7-tuple for each
column where the last five items of each tuple are None.
"""
_rows: List[tuple]
_current_row: 0
_rowcount: count
_array_size: int
_description: List[tuple]
def __init__(self, res: Dict[str, any]):
def __format_cell(c: Dict[str, any], index: int) -> str:
coltype = res['column_metadata'][index]['type'] if 'type' in res[
'column_metadata'][index] else None
jdbctype = res['column_metadata'][index][
'jdbc_type'] if 'jdbc_type' in res['column_metadata'][
index] else None
try:
if 'is_null' in c:
return None
elif coltype == 'integer' or jdbctype == 'BIGINT' or jdbctype == 'SMALLINT' or jdbctype == 'TINYINT':
return int(c['value'])
elif coltype == 'decimal' or jdbctype == 'REAL' or jdbctype == 'FLOAT' or jdbctype == 'DOUBLE':
return float(c['value'])
elif coltype == 'date' and jdbctype == 'DATE':
return datetime.strptime(
c['value'], "%Y-%m-%d").replace(tzinfo=timezone.utc)
elif coltype == 'date' and jdbctype == 'TIMESTAMP':
# e.g. '1986-08-20 00:57:42.000'
return datetime.strptime(
c['value'],
"%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=timezone.utc)
else:
return c['value']
except:
return c['value']
self._description = [(h['key'], h['type'] if 'type' in h else None,
None, None, None, None, None)
for h in res['column_metadata']]
self._rows = list(
map(
lambda row: tuple(
[__format_cell(c, i) for i, c in enumerate(row['cells'])]),
res['rows'] if 'rows' in res else [],
))
self._rowcount = len(self._rows)
self._current_row = count()
self._lock = threading.Lock()
self._array_size = 1
@property
def arraysize(self) -> int:
return self._array_size
@arraysize.setter
def set_arraysize(self, new_size: int):
with self._lock:
self._array_size = new_size
@property
def description(self) -> List[tuple]:
return self._description
[docs] def fetchone(self) -> Optional[tuple]:
"""
Returns the next row query result set as a tuple.
Return None if no more data is available.
"""
# lock fetchone so it is threadsafe
with self._lock:
next_index = next(self._current_row)
try:
return self._rows[next_index]
except IndexError:
return None
[docs] def fetchmany(self, size=None) -> List[tuple]:
"""
Return the next set of rows of a query result as a list.
Return an empty list if no more rows are available.
The number of rows to fetch per call is specified by the size parameter.
If size is not given, arraysize determines the number of rows to be fetched.
If fewer than size rows are available, as many rows as are available are returned.
Note there are performance considerations involved with the size parameter.
For optimal performance, it is usually best to use the arraysize attribute.
If the size parameter is used, then it is best for it to retain the same value
from one ``fetchmany()`` call to the next.
"""
with self._lock:
tofetch = self._array_size if size == None else size
# only keep indices that are within bounds
indices = filter(lambda x: x < len(self._rows),
[next(self._current_row) for i in range(tofetch)])
return [self._rows[i] for i in indices]
[docs] def fetchall(self) -> List[tuple]:
"""
Return all (remaining) rows of a query result as a list. Return an empty list
if no rows are available.
"""
return self.fetchmany(len(self._rows))
[docs] def close(self):
"""
A no-op which is included to increase API-compability with Python DB API
"""
pass