Source code for intake_odbc.intake_odbc

from intake.source import base
import numpy as np
from . import __version__


[docs]class ODBCSource(base.DataSource): """ One-shot ODBC to dataframe reader Parameters ---------- uri: str or None Full connection string for TurbODBC. If using keyword parameters, this should be ``None`` sql_expr: str Query expression to pass to the DB backend Further connection arguments, such as username/password, and may also include the following: head_rows: int (10) Number of rows that are read from the start of the data to infer data types upon discovery mssql: bool (False) Whether to use MS SQL Server syntax - depends on the backend target of the connection """ name = 'odbc' version = __version__ container = 'dataframe' partition_access = False def __init__(self, uri, sql_expr, metadata=None, **odbc_kwargs): odbc_kwargs = odbc_kwargs.copy() self._uri = uri self._sql_expr = sql_expr self._head_rows = odbc_kwargs.pop('head_rows', 10) self._ms = odbc_kwargs.pop('mssql', False) self._odbc_kwargs = odbc_kwargs self._dataframe = None self._connection = None self._cursor = None super(ODBCSource, self).__init__(metadata=metadata) def _get_schema(self): from turbodbc import connect if self._dataframe is None: self._connection = connect(connection_string=self._uri, **self._odbc_kwargs) cursor = self._connection.cursor() self._cursor = cursor if self._ms: q = ms_limit(self._sql_expr, self._head_rows) else: q = limit(self._sql_expr, self._head_rows) cursor.execute(q) head = cursor.fetchallarrow().to_pandas() dtype = head[:0] shape = (None, head.shape[1]) else: dtype = {k: str(v) for k, v in self._dataframe.dtypes.to_dict().items()} shape = self._dataframe.shape return base.Schema(datashape=None, dtype=dtype, shape=shape, npartitions=1, extra_metadata={}) def _get_partition(self, _): self._get_schema() if self._dataframe is None: self._cursor.execute(self._sql_expr) self._dataframe = self._cursor.fetchallarrow().to_pandas() self._schema = None return self._dataframe def _close(self): self._dataframe = None self._connection = None self._cursor = None
def ms_limit(q, lim): """MS SQL Server implementation of 'limit'""" return "SELECT TOP {} sq.* FROM ({}) sq".format(lim, q) def limit(q, lim): """Non-MS SQL Server implementation of 'limit'""" return "SELECT sq.* FROM ({}) sq LIMIT {}".format(q, lim)
[docs]class ODBCPartitionedSource(base.DataSource): """ ODBC partitioned reader This source produces new queries for each partition, where an index column is used to select rows belonging to each partition Parameters ---------- uri: str or None Full connection string for TurbODBC. If using keyword parameters, this should be ``None`` sql_expr: str Query expression to pass to the DB backend Further connection arguments, such as username/password, and may also include the following: head_rows: int (10) Number of rows that are read from the start of the data to infer data types upon discovery mssql: bool (False) Whether to use MS SQL Server syntax - depends on the backend target of the connection index: str Column to use for partitioning max, min: str Range of values in index to consider (will query DB if not given) npartitions: int Number of partitions to assume divisions: list of values If given, use these as partition boundaries - and therefore ignore max/min and npartitions """ name = 'odbc' version = __version__ container = 'dataframe' partition_access = True def __init__(self, uri, sql_expr, metadata=None, **odbc_kwargs): odbc_kwargs = odbc_kwargs.copy() self._uri = uri self._sql_expr = sql_expr self._head_rows = odbc_kwargs.pop('head_rows', 10) self._ms = odbc_kwargs.pop('mssql', False) self._index = odbc_kwargs.pop('index') # required self._max = odbc_kwargs.pop('max', None) self._min = odbc_kwargs.pop('min', None) self._npartitions = odbc_kwargs.pop('npartitions', None) self._divisions = odbc_kwargs.pop('divisions', None) self._odbc_kwargs = odbc_kwargs self._connection = None self._cursor = None super(ODBCPartitionedSource, self).__init__(metadata=metadata) def _get_schema(self): from turbodbc import connect self._connection = connect(connection_string=self._uri, **self._odbc_kwargs) cursor = self._connection.cursor() self._cursor = cursor if self._ms: q = ms_limit(self._sql_expr, self._head_rows) else: q = limit(self._sql_expr, self._head_rows) cursor.execute(q) head = cursor.fetchallarrow().to_pandas().set_index(self._index) dtype = head[:0] shape = (None, head.shape[1]) # could have called COUNT() nparts = self._npartitions or len(self._divisions) return base.Schema(datashape=None, dtype=dtype, shape=shape, npartitions=nparts, extra_metadata={}) def _get_partition(self, i): if self._divisions is None: # compute divisions if self._max is None: # get data boundaries from DB q = "SELECT MAX(sq.{ind}) as ma, MIN(sq.{ind}) as mi " \ "FROM ({exp}) sq".format(ind=self._index, exp=self._sql_expr) self._cursor.execute(q) self._max, self._min = self._cursor.fetchone() self._max += 0.001 self._divisions = np.linspace(self._min, self._max, self._npartitions + 1) mi, ma = self._divisions[i:i+2] q = "SELECT sq.* FROM ({exp}) as sq WHERE " \ "sq.{ind} >= {mi} AND sq.{ind} < {ma}".format( exp=self._sql_expr, ind=self._index, mi=mi, ma=ma) self._cursor.execute(q) df = self._cursor.fetchallarrow().to_pandas() return df.set_index(self._index) def _close(self): if self._connection is not None: self._connection.close() self._connection = None self._cursor = None