"""Compatibility layer for different database engines This modules stores logic specific to different database engines. Things like time-related functions that are similar but not identical, or information as to expose certain features or not and how to expose them. For instance, Hive/Presto supports partitions and have a specific API to list partitions. Other databases like Vertica also support partitions but have different API to get to them. Other databases don't support partitions at all. The classes here will use a common interface to specify all this. The general idea is to use static classes and an inheritance scheme. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals from collections import namedtuple import inspect import textwrap import time from flask_babel import lazy_gettext as _ Grain = namedtuple('Grain', 'name label function') class LimitMethod(object): """Enum the ways that limits can be applied""" FETCH_MANY = 'fetch_many' WRAP_SQL = 'wrap_sql' class BaseEngineSpec(object): engine = 'base' # str as defined in sqlalchemy.engine.engine time_grains = tuple() limit_method = LimitMethod.FETCH_MANY @classmethod def epoch_to_dttm(cls): raise NotImplementedError() @classmethod def epoch_ms_to_dttm(cls): return cls.epoch_to_dttm().replace('{col}', '({col}/1000.0)') @classmethod def extra_table_metadata(cls, database, table_name, schema_name): """Returns engine-specific table metadata""" return {} @classmethod def convert_dttm(cls, target_type, dttm): return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S')) @classmethod def handle_cursor(cls, cursor, query, session): """Handle a live cursor between the execute and fetchall calls The flow works without this method doing anything, but it allows for handling the cursor and updating progress information in the query object""" pass class PostgresEngineSpec(BaseEngineSpec): engine = 'postgresql' time_grains = ( Grain("Time Column", _('Time Column'), "{col}"), Grain("second", _('second'), "DATE_TRUNC('second', {col})"), Grain("minute", _('minute'), "DATE_TRUNC('minute', {col})"), Grain("hour", _('hour'), "DATE_TRUNC('hour', {col})"), Grain("day", _('day'), "DATE_TRUNC('day', {col})"), Grain("week", _('week'), "DATE_TRUNC('week', {col})"), Grain("month", _('month'), "DATE_TRUNC('month', {col})"), Grain("quarter", _('quarter'), "DATE_TRUNC('quarter', {col})"), Grain("year", _('year'), "DATE_TRUNC('year', {col})"), ) @classmethod def epoch_to_dttm(cls): return "(timestamp 'epoch' + {col} * interval '1 second')" @classmethod def convert_dttm(cls, target_type, dttm): return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S')) class SqliteEngineSpec(BaseEngineSpec): engine = 'sqlite' time_grains = ( Grain('Time Column', _('Time Column'), '{col}'), Grain('day', _('day'), 'DATE({col})'), Grain("week", _('week'), "DATE({col}, -strftime('%w', {col}) || ' days')"), Grain("month", _('month'), "DATE({col}, -strftime('%d', {col}) || ' days')"), ) @classmethod def epoch_to_dttm(cls): return "datetime({col}, 'unixepoch')" @classmethod def convert_dttm(cls, target_type, dttm): iso = dttm.isoformat().replace('T', ' ') if '.' not in iso: iso += '.000000' return "'{}'".format(iso) class MySQLEngineSpec(BaseEngineSpec): engine = 'mysql' time_grains = ( Grain('Time Column', _('Time Column'), '{col}'), Grain("second", _('second'), "DATE_ADD(DATE({col}), " "INTERVAL (HOUR({col})*60*60 + MINUTE({col})*60" " + SECOND({col})) SECOND)"), Grain("minute", _('minute'), "DATE_ADD(DATE({col}), " "INTERVAL (HOUR({col})*60 + MINUTE({col})) MINUTE)"), Grain("hour", _('hour'), "DATE_ADD(DATE({col}), " "INTERVAL HOUR({col}) HOUR)"), Grain('day', _('day'), 'DATE({col})'), Grain("week", _('week'), "DATE(DATE_SUB({col}, " "INTERVAL DAYOFWEEK({col}) - 1 DAY))"), Grain("month", _('month'), "DATE(DATE_SUB({col}, " "INTERVAL DAYOFMONTH({col}) - 1 DAY))"), Grain("quarter", _('quarter'), "MAKEDATE(YEAR({col}), 1) " "+ INTERVAL QUARTER({col}) QUARTER - INTERVAL 1 QUARTER"), Grain("year", _('year'), "DATE(DATE_SUB({col}, " "INTERVAL DAYOFYEAR({col}) - 1 DAY))"), ) @classmethod def convert_dttm(cls, target_type, dttm): if target_type.upper() in ('DATETIME', 'DATE'): return "STR_TO_DATE('{}', '%Y-%m-%d %H:%i:%s')".format( dttm.strftime('%Y-%m-%d %H:%M:%S')) return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S')) @classmethod def epoch_to_dttm(cls): return "from_unixtime({col})" class PrestoEngineSpec(BaseEngineSpec): engine = 'presto' time_grains = ( Grain('Time Column', _('Time Column'), '{col}'), Grain('second', _('second'), "date_trunc('second', CAST({col} AS TIMESTAMP))"), Grain('minute', _('minute'), "date_trunc('minute', CAST({col} AS TIMESTAMP))"), Grain('hour', _('hour'), "date_trunc('hour', CAST({col} AS TIMESTAMP))"), Grain('day', _('day'), "date_trunc('day', CAST({col} AS TIMESTAMP))"), Grain('week', _('week'), "date_trunc('week', CAST({col} AS TIMESTAMP))"), Grain('month', _('month'), "date_trunc('month', CAST({col} AS TIMESTAMP))"), Grain('quarter', _('quarter'), "date_trunc('quarter', CAST({col} AS TIMESTAMP))"), Grain("week_ending_saturday", _('week_ending_saturday'), "date_add('day', 5, date_trunc('week', date_add('day', 1, " "CAST({col} AS TIMESTAMP))))"), Grain("week_start_sunday", _('week_start_sunday'), "date_add('day', -1, date_trunc('week', " "date_add('day', 1, CAST({col} AS TIMESTAMP))))"), ) @classmethod def convert_dttm(cls, target_type, dttm): tt = target_type.upper() if tt == 'DATE': return "from_iso8601_date('{}')".format(dttm.isoformat()[:10]) if tt == 'TIMESTAMP': return "from_iso8601_timestamp('{}')".format(dttm.isoformat()) return "'{}'".format(dttm.strftime('%Y-%m-%d %H:%M:%S')) @classmethod def epoch_to_dttm(cls): return "from_unixtime({col})" @staticmethod def show_partition_pql( table_name, schema_name=None, order_by=None, limit=100): if schema_name: table_name = schema_name + '.' + table_name order_by = order_by or [] order_by_clause = '' if order_by: order_by_clause = "ORDER BY " + ', '.join(order_by) + " DESC" limit_clause = '' if limit: limit_clause = "LIMIT {}".format(limit) return textwrap.dedent("""\ SHOW PARTITIONS FROM {table_name} {order_by_clause} {limit_clause} """).format(**locals()) @classmethod def extra_table_metadata(cls, database, table_name, schema_name): indexes = database.get_indexes(table_name, schema_name) if not indexes: return {} cols = indexes[0].get('column_names', []) pql = cls.show_partition_pql(table_name, schema_name, cols) df = database.get_df(pql, schema_name) latest_part = df.to_dict(orient='records')[0] if not df.empty else None partition_query = cls.show_partition_pql(table_name, schema_name, cols) return { 'partitions': { 'cols': cols, 'latest': latest_part, 'partitionQuery': partition_query, } } @classmethod def handle_cursor(cls, cursor, query, session): """Updates progress information""" polled = cursor.poll() # poll returns dict -- JSON status information or ``None`` # if the query is done # https://github.com/dropbox/PyHive/blob/ # b34bdbf51378b3979eaf5eca9e956f06ddc36ca0/pyhive/presto.py#L178 while polled: # Update the object and wait for the kill signal. stats = polled.get('stats', {}) if stats: completed_splits = float(stats.get('completedSplits')) total_splits = float(stats.get('totalSplits')) if total_splits and completed_splits: progress = 100 * (completed_splits / total_splits) if progress > query.progress: query.progress = progress session.commit() time.sleep(1) polled = cursor.poll() class MssqlEngineSpec(BaseEngineSpec): engine = 'mssql' epoch_to_dttm = "dateadd(S, {col}, '1970-01-01')" time_grains = ( Grain("Time Column", _('Time Column'), "{col}"), Grain("second", _('second'), "DATEADD(second, " "DATEDIFF(second, '2000-01-01', {col}), '2000-01-01')"), Grain("minute", _('minute'), "DATEADD(minute, " "DATEDIFF(minute, 0, {col}), 0)"), Grain("5 minute", _('5 minute'), "DATEADD(minute, " "DATEDIFF(minute, 0, {col}) / 5 * 5, 0)"), Grain("half hour", _('half hour'), "DATEADD(minute, " "DATEDIFF(minute, 0, {col}) / 30 * 30, 0)"), Grain("hour", _('hour'), "DATEADD(hour, " "DATEDIFF(hour, 0, {col}), 0)"), Grain("day", _('day'), "DATEADD(day, " "DATEDIFF(day, 0, {col}), 0)"), Grain("week", _('week'), "DATEADD(week, " "DATEDIFF(week, 0, {col}), 0)"), Grain("month", _('month'), "DATEADD(month, " "DATEDIFF(month, 0, {col}), 0)"), Grain("quarter", _('quarter'), "DATEADD(quarter, " "DATEDIFF(quarter, 0, {col}), 0)"), Grain("year", _('year'), "DATEADD(year, " "DATEDIFF(year, 0, {col}), 0)"), ) @classmethod def convert_dttm(cls, target_type, dttm): return "CONVERT(DATETIME, '{}', 126)".format(dttm.isoformat()) class RedshiftEngineSpec(PostgresEngineSpec): engine = 'redshift' class OracleEngineSpec(PostgresEngineSpec): engine = 'oracle' time_grains = ( Grain('Time Column', _('Time Column'), '{col}'), Grain('minute', _('minute'), "TRUNC(TO_DATE({col}), 'MI')"), Grain('hour', _('hour'), "TRUNC(TO_DATE({col}), 'HH')"), Grain('day', _('day'), "TRUNC(TO_DATE({col}), 'DDD')"), Grain('week', _('week'), "TRUNC(TO_DATE({col}), 'WW')"), Grain('month', _('month'), "TRUNC(TO_DATE({col}), 'MONTH')"), Grain('quarter', _('quarter'), "TRUNC(TO_DATE({col}), 'Q')"), Grain('year', _('year'), "TRUNC(TO_DATE({col}), 'YEAR')"), ) @classmethod def convert_dttm(cls, target_type, dttm): return ( """TO_TIMESTAMP('{}', 'YYYY-MM-DD"T"HH24:MI:SS.ff6')""" ).format(dttm.isoformat()) class VerticaEngineSpec(PostgresEngineSpec): engine = 'vertica' engines = { o.engine: o for o in globals().values() if inspect.isclass(o) and issubclass(o, BaseEngineSpec)}