from copy import deepcopy, copy from collections import namedtuple from datetime import timedelta, datetime import json from six import string_types import sqlparse import requests import textwrap from dateutil.parser import parse from flask import flash from flask.ext.appbuilder import Model from flask.ext.appbuilder.models.mixins import AuditMixin import pandas as pd from pandas import read_sql_query from pydruid import client from pydruid.utils.filters import Dimension, Filter import sqlalchemy as sqla from sqlalchemy import ( Column, Integer, String, ForeignKey, Text, Boolean, DateTime, Table, create_engine, MetaData, desc, select, and_, func) from sqlalchemy.orm import relationship from sqlalchemy.sql import table, literal_column, text, column from sqlalchemy.sql.elements import ColumnClause from sqlalchemy_utils import EncryptedType from panoramix import app, db, get_session, utils from panoramix.viz import viz_types from sqlalchemy.ext.declarative import declared_attr config = app.config QueryResult = namedtuple('namedtuple', ['df', 'query', 'duration']) class AuditMixinNullable(AuditMixin): @declared_attr def created_by_fk(cls): return Column(Integer, ForeignKey('ab_user.id'), default=cls.get_user_id, nullable=True) @declared_attr def changed_by_fk(cls): return Column(Integer, ForeignKey('ab_user.id'), default=cls.get_user_id, onupdate=cls.get_user_id, nullable=True) @property def changed_on_(cls): return utils.datetime_f(cls.changed_on) class Slice(Model, AuditMixinNullable): """A slice is essentially a report or a view on data""" __tablename__ = 'slices' id = Column(Integer, primary_key=True) slice_name = Column(String(250)) druid_datasource_id = Column(Integer, ForeignKey('datasources.id')) table_id = Column(Integer, ForeignKey('tables.id')) datasource_type = Column(String(200)) datasource_name = Column(String(2000)) viz_type = Column(String(250)) params = Column(Text) table = relationship( 'SqlaTable', foreign_keys=[table_id], backref='slices') druid_datasource = relationship( 'Datasource', foreign_keys=[druid_datasource_id], backref='slices') def __repr__(self): return self.slice_name @property def datasource(self): return self.table or self.druid_datasource @property @utils.memoized def viz(self): d = json.loads(self.params) viz = viz_types[self.viz_type]( self.datasource, form_data=d) return viz @property def datasource_id(self): return self.table_id or self.druid_datasource_id @property def data(self): d = self.viz.data d['slice_id'] = self.id return d @property def json_data(self): return json.dumps(self.data) @property def slice_url(self): try: slice_params = json.loads(self.params) except Exception as e: slice_params = {} slice_params['slice_id'] = self.id slice_params['slice_name'] = self.slice_name from werkzeug.urls import Href href = Href( "/panoramix/explore/{self.datasource_type}/" "{self.datasource_id}/".format(self=self)) return href(slice_params) @property def edit_url(self): return "/slicemodelview/edit/{}".format(self.id) @property def slice_link(self): url = self.slice_url return '{self.slice_name}'.format(**locals()) @property def js_files(self): from panoramix.viz import viz_types return viz_types[self.viz_type].js_files @property def css_files(self): from panoramix.viz import viz_types return viz_types[self.viz_type].css_files def get_viz(self): pass dashboard_slices = Table('dashboard_slices', Model.metadata, Column('id', Integer, primary_key=True), Column('dashboard_id', Integer, ForeignKey('dashboards.id')), Column('slice_id', Integer, ForeignKey('slices.id')), ) class Dashboard(Model, AuditMixinNullable): """A dash to slash""" __tablename__ = 'dashboards' id = Column(Integer, primary_key=True) dashboard_title = Column(String(500)) position_json = Column(Text) description = Column(Text) css = Column(Text) slug = Column(String(255), unique=True) slices = relationship( 'Slice', secondary=dashboard_slices, backref='dashboards') def __repr__(self): return self.dashboard_title @property def url(self): return "/panoramix/dashboard/{}/".format(self.slug or self.id) def dashboard_link(self): return '{self.dashboard_title}'.format(self=self) @property def js_files(self): l = [] for o in self.slices: l += [f for f in o.js_files if f not in l] return l @property def css_files(self): l = [] for o in self.slices: l += o.css_files return list(set(l)) class Queryable(object): @property def column_names(self): return sorted([c.column_name for c in self.columns]) @property def main_dttm_col(self): return "timestamp" @property def groupby_column_names(self): return sorted([c.column_name for c in self.columns if c.groupby]) @property def filterable_column_names(self): return sorted([c.column_name for c in self.columns if c.filterable]) @property def dttm_cols(self): return [] class Database(Model, AuditMixinNullable): __tablename__ = 'dbs' id = Column(Integer, primary_key=True) database_name = Column(String(250), unique=True) sqlalchemy_uri = Column(String(1024)) password = Column(EncryptedType(String(1024), config.get('SECRET_KEY'))) def __repr__(self): return self.database_name def get_sqla_engine(self): return create_engine(self.sqlalchemy_uri_decrypted) def safe_sqlalchemy_uri(self): return self.sqlalchemy_uri def get_table(self, table_name): meta = MetaData() return Table( table_name, meta, autoload=True, autoload_with=self.get_sqla_engine()) @property def sqlalchemy_uri_decrypted(self): conn = sqla.engine.url.make_url(self.sqlalchemy_uri) conn.password = self.password return str(conn) class SqlaTable(Model, Queryable, AuditMixinNullable): type = "table" __tablename__ = 'tables' id = Column(Integer, primary_key=True) table_name = Column(String(250), unique=True) main_dttm_col = Column(String(250)) description = Column(Text) default_endpoint = Column(Text) database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False) is_featured = Column(Boolean, default=False) user_id = Column(Integer, ForeignKey('ab_user.id')) owner = relationship('User', backref='tables', foreign_keys=[user_id]) database = relationship( 'Database', backref='tables', foreign_keys=[database_id]) offset = Column(Integer, default=0) baselink = "tableview" def __repr__(self): return self.table_name @property def description_markeddown(self): return utils.markdown(self.description) @property def perm(self): return ( "[{self.database}].[{self.table_name}]" "(id:{self.id})").format(self=self) @property def full_name(self): return "[{self.database}].[{self.table_name}]".format(self=self) @property def dttm_cols(self): l = [c.column_name for c in self.columns if c.is_dttm] if self.main_dttm_col not in l: l.append(self.main_dttm_col) return l @property def name(self): return self.table_name @property def table_link(self): url = "/panoramix/explore/{self.type}/{self.id}/".format(self=self) return '{self.table_name}'.format(**locals()) @property def metrics_combo(self): return sorted( [ (m.metric_name, m.verbose_name) for m in self.metrics], key=lambda x: x[1]) def query_bkp( self, groupby, metrics, granularity, from_dttm, to_dttm, limit_spec=None, filter=None, is_timeseries=True, timeseries_limit=15, row_limit=None, extras=None): # pragma: no cover """ Unused, legacy way of querying by building a SQL string without using the sqlalchemy expression API (new approach which supports all dialects) """ from pandas import read_sql_query qry_start_dttm = datetime.now() metrics_exprs = [ "{} AS {}".format(m.expression, m.metric_name) for m in self.metrics if m.metric_name in metrics] from_dttm_iso = from_dttm.isoformat() to_dttm_iso = to_dttm.isoformat() if metrics: main_metric_expr = [ m.expression for m in self.metrics if m.metric_name == metrics[0]][0] else: main_metric_expr = "COUNT(*)" select_exprs = [] groupby_exprs = [] if groupby: select_exprs = copy(groupby) groupby_exprs = [s for s in groupby] inner_groupby_exprs = [s for s in groupby] select_exprs += metrics_exprs if granularity != "all": select_exprs += ['ds as timestamp'] groupby_exprs += ['ds'] select_exprs = ",\n".join(select_exprs) groupby_exprs = ",\n".join(groupby_exprs) where_clause = [ "ds >= '{from_dttm_iso}'", "ds < '{to_dttm_iso}'" ] for col, op, eq in filter: if op in ('in', 'not in'): l = ["'{}'".format(s) for s in eq.split(",")] l = ", ".join(l) op = op.upper() where_clause.append( "{col} {op} ({l})".format(**locals()) ) where_clause = " AND\n".join(where_clause).format(**locals()) on_clause = " AND ".join(["{g} = __{g}".format(g=g) for g in groupby]) limiting_join = "" if timeseries_limit and groupby: inner_select = ", ".join([ "{g} as __{g}".format(g=g) for g in inner_groupby_exprs]) inner_groupby_exprs = ", ".join(inner_groupby_exprs) limiting_join = ( "JOIN ( \n" " SELECT {inner_select} \n" " FROM {self.table_name} \n" " WHERE \n" " {where_clause}\n" " GROUP BY {inner_groupby_exprs}\n" " ORDER BY {main_metric_expr} DESC\n" " LIMIT {timeseries_limit}\n" ") z ON {on_clause}\n" ).format(**locals()) sql = ( "SELECT\n" " {select_exprs}\n" "FROM {self.table_name}\n" "{limiting_join}" "WHERE\n" " {where_clause}\n" "GROUP BY\n" " {groupby_exprs}\n" ).format(**locals()) df = read_sql_query( sql=sql, con=self.database.get_sqla_engine() ) textwrap.dedent(sql) return QueryResult( df=df, duration=datetime.now() - qry_start_dttm, query=sql) def query( self, groupby, metrics, granularity, from_dttm, to_dttm, limit_spec=None, filter=None, is_timeseries=True, timeseries_limit=15, row_limit=None, inner_from_dttm=None, inner_to_dttm=None, extras=None, columns=None): # For backward compatibility if granularity not in self.dttm_cols: granularity = self.main_dttm_col cols = {col.column_name: col for col in self.columns} qry_start_dttm = datetime.now() if not self.main_dttm_col: raise Exception( "Datetime column not provided as part table configuration") dttm_expr = cols[granularity].expression if dttm_expr: timestamp = ColumnClause(dttm_expr, is_literal=True).label('timestamp') else: timestamp = literal_column(granularity).label('timestamp') metrics_exprs = [ literal_column(m.expression).label(m.metric_name) for m in self.metrics if m.metric_name in metrics] if metrics: main_metric_expr = literal_column([ m.expression for m in self.metrics if m.metric_name == metrics[0]][0]) else: main_metric_expr = literal_column("COUNT(*)") select_exprs = [] groupby_exprs = [] if groupby: select_exprs = [] inner_select_exprs = [] inner_groupby_exprs = [] for s in groupby: col = cols[s] expr = col.expression if expr: outer = ColumnClause(expr, is_literal=True).label(s) inner = ColumnClause(expr, is_literal=True).label('__' + s) else: outer = column(s).label(s) inner = column(s).label('__' + s) groupby_exprs.append(outer) select_exprs.append(outer) inner_groupby_exprs.append(inner) inner_select_exprs.append(inner) elif columns: for s in columns: select_exprs.append(s) metrics_exprs = [] if is_timeseries: select_exprs += [timestamp] groupby_exprs += [timestamp] select_exprs += metrics_exprs qry = select(select_exprs) from_clause = table(self.table_name) if not columns: qry = qry.group_by(*groupby_exprs) time_filter = [ timestamp >= from_dttm.isoformat(), timestamp <= to_dttm.isoformat(), ] inner_time_filter = copy(time_filter) if inner_from_dttm: inner_time_filter[0] = timestamp >= inner_from_dttm.isoformat() if inner_to_dttm: inner_time_filter[1] = timestamp <= inner_to_dttm.isoformat() where_clause_and = [] having_clause_and = [] for col, op, eq in filter: col_obj = cols[col] if op in ('in', 'not in'): values = eq.split(",") if col_obj.expression: cond = ColumnClause( col_obj.expression, is_literal=True).in_(values) else: cond = column(col).in_(values) if op == 'not in': cond = ~cond where_clause_and.append(cond) if extras and 'where' in extras: where_clause_and += [text(extras['where'])] if extras and 'having' in extras: having_clause_and += [text(extras['having'])] qry = qry.where(and_(*(time_filter + where_clause_and))) qry = qry.having(and_(*having_clause_and)) if groupby: qry = qry.order_by(desc(main_metric_expr)) qry = qry.limit(row_limit) if timeseries_limit and groupby: subq = select(inner_select_exprs) subq = subq.select_from(table(self.table_name)) subq = subq.where(and_(*(where_clause_and + inner_time_filter))) subq = subq.group_by(*inner_groupby_exprs) subq = subq.order_by(desc(main_metric_expr)) subq = subq.limit(timeseries_limit) on_clause = [] for i, gb in enumerate(groupby): on_clause.append( groupby_exprs[i] == column("__" + gb)) from_clause = from_clause.join(subq.alias(), and_(*on_clause)) qry = qry.select_from(from_clause) engine = self.database.get_sqla_engine() sql = str(qry.compile(engine, compile_kwargs={"literal_binds": True})) df = read_sql_query( sql=sql, con=engine ) sql = sqlparse.format(sql, reindent=True) return QueryResult( df=df, duration=datetime.now() - qry_start_dttm, query=sql) def fetch_metadata(self): table = self.database.get_table(self.table_name) try: table = self.database.get_table(self.table_name) except Exception as e: flash(str(e)) flash( "Table doesn't seem to exist in the specified database, " "couldn't fetch column information", "danger") return TC = TableColumn M = SqlMetric metrics = [] any_date_col = None for col in table.columns: try: datatype = str(col.type) except Exception as e: datatype = "UNKNOWN" dbcol = ( db.session .query(TC) .filter(TC.table == self) .filter(TC.column_name == col.name) .first() ) db.session.flush() if not dbcol: dbcol = TableColumn(column_name=col.name) if ( str(datatype).startswith('VARCHAR') or str(datatype).startswith('STRING')): dbcol.groupby = True dbcol.filterable = True elif str(datatype).upper() in ('DOUBLE', 'FLOAT', 'INT', 'BIGINT'): dbcol.sum = True db.session.merge(self) self.columns.append(dbcol) if not any_date_col and 'date' in datatype.lower(): any_date_col = col.name quoted = "{}".format( column(dbcol.column_name).compile(dialect=db.engine.dialect)) if dbcol.sum: metrics.append(M( metric_name='sum__' + dbcol.column_name, verbose_name='sum__' + dbcol.column_name, metric_type='sum', expression="SUM({})".format(quoted) )) if dbcol.max: metrics.append(M( metric_name='max__' + dbcol.column_name, verbose_name='max__' + dbcol.column_name, metric_type='max', expression="MAX({})".format(quoted) )) if dbcol.min: metrics.append(M( metric_name='min__' + dbcol.column_name, verbose_name='min__' + dbcol.column_name, metric_type='min', expression="MIN({})".format(quoted) )) if dbcol.count_distinct: metrics.append(M( metric_name='count_distinct__' + dbcol.column_name, verbose_name='count_distinct__' + dbcol.column_name, metric_type='count_distinct', expression="COUNT(DISTINCT {})".format(quoted) )) dbcol.type = datatype db.session.merge(self) db.session.commit() metrics.append(M( metric_name='count', verbose_name='COUNT(*)', metric_type='count', expression="COUNT(*)" )) for metric in metrics: m = ( db.session.query(M) .filter(M.metric_name == metric.metric_name) .filter(M.table_id == self.id) .first() ) metric.table_id = self.id if not m: db.session.add(metric) db.session.commit() if not self.main_dttm_col: self.main_dttm_col = any_date_col class SqlMetric(Model, AuditMixinNullable): __tablename__ = 'sql_metrics' id = Column(Integer, primary_key=True) metric_name = Column(String(512)) verbose_name = Column(String(1024)) metric_type = Column(String(32)) table_id = Column(Integer, ForeignKey('tables.id')) table = relationship( 'SqlaTable', backref='metrics', foreign_keys=[table_id]) expression = Column(Text) description = Column(Text) class TableColumn(Model, AuditMixinNullable): __tablename__ = 'table_columns' id = Column(Integer, primary_key=True) table_id = Column(Integer, ForeignKey('tables.id')) table = relationship( 'SqlaTable', backref='columns', foreign_keys=[table_id]) column_name = Column(String(256)) is_dttm = Column(Boolean, default=False) is_active = Column(Boolean, default=True) type = Column(String(32), default='') groupby = Column(Boolean, default=False) count_distinct = Column(Boolean, default=False) sum = Column(Boolean, default=False) max = Column(Boolean, default=False) min = Column(Boolean, default=False) filterable = Column(Boolean, default=False) expression = Column(Text, default='') description = Column(Text, default='') def __repr__(self): return self.column_name @property def isnum(self): return self.type in ('LONG', 'DOUBLE', 'FLOAT') class Cluster(Model, AuditMixinNullable): __tablename__ = 'clusters' id = Column(Integer, primary_key=True) cluster_name = Column(String(250), unique=True) coordinator_host = Column(String(256)) coordinator_port = Column(Integer) coordinator_endpoint = Column( String(256), default='druid/coordinator/v1/metadata') broker_host = Column(String(256)) broker_port = Column(Integer) broker_endpoint = Column(String(256), default='druid/v2') metadata_last_refreshed = Column(DateTime) def __repr__(self): return self.cluster_name def get_pydruid_client(self): cli = client.PyDruid( "http://{0}:{1}/".format(self.broker_host, self.broker_port), self.broker_endpoint) return cli def refresh_datasources(self): endpoint = ( "http://{self.coordinator_host}:{self.coordinator_port}/" "{self.coordinator_endpoint}/datasources" ).format(self=self) datasources = json.loads(requests.get(endpoint).text) for datasource in datasources: Datasource.sync_to_db(datasource, self) class Datasource(Model, AuditMixinNullable, Queryable): type = "druid" baselink = "datasourcemodelview" __tablename__ = 'datasources' id = Column(Integer, primary_key=True) datasource_name = Column(String(250), unique=True) is_featured = Column(Boolean, default=False) is_hidden = Column(Boolean, default=False) description = Column(Text) default_endpoint = Column(Text) user_id = Column(Integer, ForeignKey('ab_user.id')) owner = relationship('User', backref='datasources', foreign_keys=[user_id]) cluster_name = Column( String(250), ForeignKey('clusters.cluster_name')) cluster = relationship( 'Cluster', backref='datasources', foreign_keys=[cluster_name]) offset = Column(Integer, default=0) @property def metrics_combo(self): return sorted( [(m.metric_name, m.verbose_name) for m in self.metrics], key=lambda x: x[1]) @property def name(self): return self.datasource_name @property def perm(self): return ( "[{self.cluster_name}].[{self.datasource_name}]" "(id:{self.id})").format(self=self) @property def full_name(self): return "[{self.cluster_name}].[{self.datasource_name}]".format(self=self) def __repr__(self): return self.datasource_name @property def datasource_link(self): url = "/panoramix/explore/{self.type}/{self.id}/".format(self=self) return '{self.datasource_name}'.format(**locals()) def get_metric_obj(self, metric_name): return [ m.json_obj for m in self.metrics if m.metric_name == metric_name ][0] def latest_metadata(self): client = self.cluster.get_pydruid_client() results = client.time_boundary(datasource=self.datasource_name) if not results: return max_time = results[0]['result']['minTime'] max_time = parse(max_time) intervals = (max_time - timedelta(seconds=1)).isoformat() + '/' intervals += (max_time + timedelta(seconds=1)).isoformat() segment_metadata = client.segment_metadata( datasource=self.datasource_name, intervals=intervals) if segment_metadata: return segment_metadata[-1]['columns'] def generate_metrics(self): for col in self.columns: col.generate_metrics() @classmethod def sync_to_db(cls, name, cluster): session = get_session() datasource = session.query(cls).filter_by(datasource_name=name).first() if not datasource: datasource = cls(datasource_name=name) session.add(datasource) datasource.cluster = cluster cols = datasource.latest_metadata() if not cols: return for col in cols: col_obj = ( session .query(Column) .filter_by(datasource_name=name, column_name=col) .first() ) datatype = cols[col]['type'] if not col_obj: col_obj = Column(datasource_name=name, column_name=col) session.add(col_obj) if datatype == "STRING": col_obj.groupby = True col_obj.filterable = True if col_obj: col_obj.type = cols[col]['type'] col_obj.datasource = datasource col_obj.generate_metrics() def query( self, groupby, metrics, granularity, from_dttm, to_dttm, limit_spec=None, filter=None, is_timeseries=True, timeseries_limit=None, row_limit=None, inner_from_dttm=None, inner_to_dttm=None, extras=None, select=None): qry_start_dttm = datetime.now() inner_from_dttm = inner_from_dttm or from_dttm inner_to_dttm = inner_to_dttm or to_dttm # add tzinfo to native datetime with config from_dttm = from_dttm.replace(tzinfo=config.get("DRUID_TZ")) to_dttm = to_dttm.replace(tzinfo=config.get("DRUID_TZ")) query_str = "" aggregations = { m.metric_name: m.json_obj for m in self.metrics if m.metric_name in metrics } granularity = granularity or "all" if granularity != "all": granularity = utils.parse_human_timedelta( granularity).total_seconds() * 1000 if not isinstance(granularity, string_types): granularity = {"type": "duration", "duration": granularity} qry = dict( datasource=self.datasource_name, dimensions=groupby, aggregations=aggregations, granularity=granularity, intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(), ) filters = None for col, op, eq in filter: cond = None if op == '==': cond = Dimension(col) == eq elif op == '!=': cond = ~(Dimension(col) == eq) elif op in ('in', 'not in'): fields = [] splitted = eq.split(',') if len(splitted) > 1: for s in eq.split(','): s = s.strip() fields.append(Filter.build_filter(Dimension(col) == s)) cond = Filter(type="or", fields=fields) else: cond = Dimension(col) == eq if op == 'not in': cond = ~cond if filters: filters = Filter(type="and", fields=[ Filter.build_filter(cond), Filter.build_filter(filters) ]) else: filters = cond if filters: qry['filter'] = filters client = self.cluster.get_pydruid_client() orig_filters = filters if timeseries_limit and is_timeseries: # Limit on the number of timeseries, doing a two-phases query pre_qry = deepcopy(qry) pre_qry['granularity'] = "all" pre_qry['limit_spec'] = { "type": "default", "limit": timeseries_limit, 'intervals': inner_from_dttm.isoformat() + '/' + inner_to_dttm.isoformat(), "columns": [{ "dimension": metrics[0] if metrics else self.metrics[0], "direction": "descending", }], } client.groupby(**pre_qry) query_str += "// Two phase query\n// Phase 1\n" query_str += json.dumps(client.query_dict, indent=2) + "\n" query_str += "//\nPhase 2 (built based on phase one's results)\n" df = client.export_pandas() if df is not None and not df.empty: dims = qry['dimensions'] filters = [] for index, row in df.iterrows(): fields = [] for dim in dims: f = Filter.build_filter(Dimension(dim) == row[dim]) fields.append(f) if len(fields) > 1: filt = Filter(type="and", fields=fields) filters.append(Filter.build_filter(filt)) elif fields: filters.append(fields[0]) if filters: ff = Filter(type="or", fields=filters) if not orig_filters: qry['filter'] = ff else: qry['filter'] = Filter(type="and", fields=[ Filter.build_filter(ff), Filter.build_filter(orig_filters)]) qry['limit_spec'] = None if row_limit: qry['limit_spec'] = { "type": "default", "limit": row_limit, "columns": [{ "dimension": metrics[0] if metrics else self.metrics[0], "direction": "descending", }], } client.groupby(**qry) query_str += json.dumps(client.query_dict, indent=2) df = client.export_pandas() return QueryResult( df=df, query=query_str, duration=datetime.now() - qry_start_dttm) class Log(Model): __tablename__ = 'logs' id = Column(Integer, primary_key=True) action = Column(String(512)) user_id = Column(Integer, ForeignKey('ab_user.id')) json = Column(Text) user = relationship('User', backref='logs', foreign_keys=[user_id]) dttm = Column(DateTime, default=func.now()) class Metric(Model): __tablename__ = 'metrics' id = Column(Integer, primary_key=True) metric_name = Column(String(512)) verbose_name = Column(String(1024)) metric_type = Column(String(32)) datasource_name = Column( String(250), ForeignKey('datasources.datasource_name')) datasource = relationship('Datasource', backref='metrics') json = Column(Text) description = Column(Text) @property def json_obj(self): try: obj = json.loads(self.json) except: obj = {} return obj class Column(Model, AuditMixinNullable): __tablename__ = 'columns' id = Column(Integer, primary_key=True) datasource_name = Column( String(250), ForeignKey('datasources.datasource_name')) datasource = relationship('Datasource', backref='columns') column_name = Column(String(256)) is_active = Column(Boolean, default=True) type = Column(String(32)) groupby = Column(Boolean, default=False) count_distinct = Column(Boolean, default=False) sum = Column(Boolean, default=False) max = Column(Boolean, default=False) min = Column(Boolean, default=False) filterable = Column(Boolean, default=False) description = Column(Text) def __repr__(self): return self.column_name @property def isnum(self): return self.type in ('LONG', 'DOUBLE', 'FLOAT') def generate_metrics(self): M = Metric metrics = [] metrics.append(Metric( metric_name='count', verbose_name='COUNT(*)', metric_type='count', json=json.dumps({'type': 'count', 'name': 'count'}) )) # Somehow we need to reassign this for UDAFs if self.type in ('DOUBLE', 'FLOAT'): corrected_type = 'DOUBLE' else: corrected_type = self.type if self.sum and self.isnum: mt = corrected_type.lower() + 'Sum' name = 'sum__' + self.column_name metrics.append(Metric( metric_name=name, metric_type='sum', verbose_name='SUM({})'.format(self.column_name), json=json.dumps({ 'type': mt, 'name': name, 'fieldName': self.column_name}) )) if self.min and self.isnum: mt = corrected_type.lower() + 'Min' name = 'min__' + self.column_name metrics.append(Metric( metric_name=name, metric_type='min', verbose_name='MIN({})'.format(self.column_name), json=json.dumps({ 'type': mt, 'name': name, 'fieldName': self.column_name}) )) if self.max and self.isnum: mt = corrected_type.lower() + 'Max' name = 'max__' + self.column_name metrics.append(Metric( metric_name=name, metric_type='max', verbose_name='MAX({})'.format(self.column_name), json=json.dumps({ 'type': mt, 'name': name, 'fieldName': self.column_name}) )) if self.count_distinct: mt = 'count_distinct' name = 'count_distinct__' + self.column_name metrics.append(Metric( metric_name=name, verbose_name='COUNT(DISTINCT {})'.format(self.column_name), metric_type='count_distinct', json=json.dumps({ 'type': 'cardinality', 'name': name, 'fieldNames': [self.column_name]}) )) session = get_session() for metric in metrics: m = ( session.query(M) .filter(M.metric_name == metric.metric_name) .filter(M.datasource_name == self.datasource_name) .filter(Cluster.cluster_name == self.datasource.cluster_name) .first() ) metric.datasource_name = self.datasource_name if not m: session.add(metric) session.commit()