# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # pylint: disable=C,R,W """A collection of ORM sqlalchemy models for Superset""" from contextlib import closing from copy import copy, deepcopy from datetime import datetime import functools import json import logging import textwrap from flask import escape, g, Markup, request from flask_appbuilder import Model from flask_appbuilder.models.decorators import renders from flask_appbuilder.security.sqla.models import User import numpy import pandas as pd import sqlalchemy as sqla from sqlalchemy import ( Boolean, Column, create_engine, DateTime, ForeignKey, Integer, MetaData, String, Table, Text, ) from sqlalchemy.engine import url from sqlalchemy.engine.url import make_url from sqlalchemy.orm import relationship, sessionmaker, subqueryload from sqlalchemy.orm.session import make_transient from sqlalchemy.pool import NullPool from sqlalchemy.schema import UniqueConstraint from sqlalchemy_utils import EncryptedType import sqlparse from superset import app, db, db_engine_specs, security_manager from superset.connectors.connector_registry import ConnectorRegistry from superset.legacy import update_time_range from superset.models.helpers import AuditMixinNullable, ImportMixin from superset.models.tags import ChartUpdater, DashboardUpdater, FavStarUpdater from superset.models.user_attributes import UserAttribute from superset.utils import ( cache as cache_util, core as utils, ) from superset.viz import viz_types from urllib import parse # noqa config = app.config custom_password_store = config.get('SQLALCHEMY_CUSTOM_PASSWORD_STORE') stats_logger = config.get('STATS_LOGGER') log_query = config.get('QUERY_LOGGER') metadata = Model.metadata # pylint: disable=no-member PASSWORD_MASK = 'X' * 10 def set_related_perm(mapper, connection, target): # noqa src_class = target.cls_model id_ = target.datasource_id if id_: ds = db.session.query(src_class).filter_by(id=int(id_)).first() if ds: target.perm = ds.perm def copy_dashboard(mapper, connection, target): dashboard_id = config.get('DASHBOARD_TEMPLATE_ID') if dashboard_id is None: return Session = sessionmaker(autoflush=False) session = Session(bind=connection) new_user = session.query(User).filter_by(id=target.id).first() # copy template dashboard to user template = session.query(Dashboard).filter_by(id=int(dashboard_id)).first() dashboard = Dashboard( dashboard_title=template.dashboard_title, position_json=template.position_json, description=template.description, css=template.css, json_metadata=template.json_metadata, slices=template.slices, owners=[new_user], ) session.add(dashboard) session.commit() # set dashboard as the welcome dashboard extra_attributes = UserAttribute( user_id=target.id, welcome_dashboard_id=dashboard.id, ) session.add(extra_attributes) session.commit() sqla.event.listen(User, 'after_insert', copy_dashboard) class Url(Model, AuditMixinNullable): """Used for the short url feature""" __tablename__ = 'url' id = Column(Integer, primary_key=True) url = Column(Text) class KeyValue(Model): """Used for any type of key-value store""" __tablename__ = 'keyvalue' id = Column(Integer, primary_key=True) value = Column(Text, nullable=False) class CssTemplate(Model, AuditMixinNullable): """CSS templates for dashboards""" __tablename__ = 'css_templates' id = Column(Integer, primary_key=True) template_name = Column(String(250)) css = Column(Text, default='') slice_user = Table('slice_user', metadata, Column('id', Integer, primary_key=True), Column('user_id', Integer, ForeignKey('ab_user.id')), Column('slice_id', Integer, ForeignKey('slices.id'))) class Slice(Model, AuditMixinNullable, ImportMixin): """A slice is essentially a report or a view on data""" __tablename__ = 'slices' id = Column(Integer, primary_key=True) slice_name = Column(String(250)) datasource_id = Column(Integer) datasource_type = Column(String(200)) datasource_name = Column(String(2000)) viz_type = Column(String(250)) params = Column(Text) description = Column(Text) cache_timeout = Column(Integer) perm = Column(String(1000)) owners = relationship(security_manager.user_model, secondary=slice_user) export_fields = ('slice_name', 'datasource_type', 'datasource_name', 'viz_type', 'params', 'cache_timeout') def __repr__(self): return self.slice_name @property def cls_model(self): return ConnectorRegistry.sources[self.datasource_type] @property def datasource(self): return self.get_datasource def clone(self): return Slice( slice_name=self.slice_name, datasource_id=self.datasource_id, datasource_type=self.datasource_type, datasource_name=self.datasource_name, viz_type=self.viz_type, params=self.params, description=self.description, cache_timeout=self.cache_timeout) @datasource.getter @utils.memoized def get_datasource(self): return ( db.session.query(self.cls_model) .filter_by(id=self.datasource_id) .first() ) @renders('datasource_name') def datasource_link(self): # pylint: disable=no-member datasource = self.datasource return datasource.link if datasource else None def datasource_name_text(self): # pylint: disable=no-member datasource = self.datasource return datasource.name if datasource else None @property def datasource_edit_url(self): # pylint: disable=no-member datasource = self.datasource return datasource.url if datasource else None @property @utils.memoized def viz(self): d = json.loads(self.params) viz_class = viz_types[self.viz_type] # pylint: disable=no-member return viz_class(datasource=self.datasource, form_data=d) @property def description_markeddown(self): return utils.markdown(self.description) @property def data(self): """Data used to render slice in templates""" d = {} self.token = '' try: d = self.viz.data self.token = d.get('token') except Exception as e: logging.exception(e) d['error'] = str(e) return { 'datasource': self.datasource_name, 'description': self.description, 'description_markeddown': self.description_markeddown, 'edit_url': self.edit_url, 'form_data': self.form_data, 'slice_id': self.id, 'slice_name': self.slice_name, 'slice_url': self.slice_url, 'modified': self.modified(), 'changed_on': self.changed_on.isoformat(), } @property def json_data(self): return json.dumps(self.data) @property def form_data(self): form_data = {} try: form_data = json.loads(self.params) except Exception as e: logging.error("Malformed json in slice's params") logging.exception(e) form_data.update({ 'slice_id': self.id, 'viz_type': self.viz_type, 'datasource': '{}__{}'.format( self.datasource_id, self.datasource_type), }) if self.cache_timeout: form_data['cache_timeout'] = self.cache_timeout update_time_range(form_data) return form_data def get_explore_url(self, base_url='/superset/explore', overrides=None): overrides = overrides or {} form_data = {'slice_id': self.id} form_data.update(overrides) params = parse.quote(json.dumps(form_data)) return f'{base_url}/?form_data={params}' @property def slice_url(self): """Defines the url to access the slice""" return self.get_explore_url() @property def explore_json_url(self): """Defines the url to access the slice""" return self.get_explore_url('/superset/explore_json') @property def edit_url(self): return '/chart/edit/{}'.format(self.id) @property def slice_link(self): url = self.slice_url name = escape(self.slice_name) return Markup(f'{name}') def get_viz(self, force=False): """Creates :py:class:viz.BaseViz object from the url_params_multidict. :return: object of the 'viz_type' type that is taken from the url_params_multidict or self.params. :rtype: :py:class:viz.BaseViz """ slice_params = json.loads(self.params) slice_params['slice_id'] = self.id slice_params['json'] = 'false' slice_params['slice_name'] = self.slice_name slice_params['viz_type'] = self.viz_type if self.viz_type else 'table' return viz_types[slice_params.get('viz_type')]( self.datasource, form_data=slice_params, force=force, ) @property def icons(self): return f""" """ @classmethod def import_obj(cls, slc_to_import, slc_to_override, import_time=None): """Inserts or overrides slc in the database. remote_id and import_time fields in params_dict are set to track the slice origin and ensure correct overrides for multiple imports. Slice.perm is used to find the datasources and connect them. :param Slice slc_to_import: Slice object to import :param Slice slc_to_override: Slice to replace, id matches remote_id :returns: The resulting id for the imported slice :rtype: int """ session = db.session make_transient(slc_to_import) slc_to_import.dashboards = [] slc_to_import.alter_params( remote_id=slc_to_import.id, import_time=import_time) slc_to_import = slc_to_import.copy() params = slc_to_import.params_dict slc_to_import.datasource_id = ConnectorRegistry.get_datasource_by_name( session, slc_to_import.datasource_type, params['datasource_name'], params['schema'], params['database_name']).id if slc_to_override: slc_to_override.override(slc_to_import) session.flush() return slc_to_override.id session.add(slc_to_import) logging.info('Final slice: {}'.format(slc_to_import.to_json())) session.flush() return slc_to_import.id @property def url(self): return ( '/superset/explore/?form_data=%7B%22slice_id%22%3A%20{0}%7D' .format(self.id) ) sqla.event.listen(Slice, 'before_insert', set_related_perm) sqla.event.listen(Slice, 'before_update', set_related_perm) dashboard_slices = Table( 'dashboard_slices', metadata, Column('id', Integer, primary_key=True), Column('dashboard_id', Integer, ForeignKey('dashboards.id')), Column('slice_id', Integer, ForeignKey('slices.id')), ) dashboard_user = Table( 'dashboard_user', metadata, Column('id', Integer, primary_key=True), Column('user_id', Integer, ForeignKey('ab_user.id')), Column('dashboard_id', Integer, ForeignKey('dashboards.id')), ) class Dashboard(Model, AuditMixinNullable, ImportMixin): """The dashboard object!""" __tablename__ = 'dashboards' id = Column(Integer, primary_key=True) dashboard_title = Column(String(500)) position_json = Column(utils.MediumText()) description = Column(Text) css = Column(Text) json_metadata = Column(Text) slug = Column(String(255), unique=True) slices = relationship( 'Slice', secondary=dashboard_slices, backref='dashboards') owners = relationship(security_manager.user_model, secondary=dashboard_user) export_fields = ('dashboard_title', 'position_json', 'json_metadata', 'description', 'css', 'slug') def __repr__(self): return self.dashboard_title @property def table_names(self): # pylint: disable=no-member return ', '.join( {'{}'.format(s.datasource.full_name) for s in self.slices}) @property def url(self): if self.json_metadata: # add default_filters to the preselect_filters of dashboard json_metadata = json.loads(self.json_metadata) default_filters = json_metadata.get('default_filters') # make sure default_filters is not empty and is valid if default_filters and default_filters != '{}': try: if json.loads(default_filters): filters = parse.quote(default_filters.encode('utf8')) return '/superset/dashboard/{}/?preselect_filters={}'.format( self.slug or self.id, filters) except Exception: pass return '/superset/dashboard/{}/'.format(self.slug or self.id) @property def datasources(self): return {slc.datasource for slc in self.slices} @property def sqla_metadata(self): # pylint: disable=no-member metadata = MetaData(bind=self.get_sqla_engine()) return metadata.reflect() def dashboard_link(self): title = escape(self.dashboard_title) return Markup(f'{title}') @property def data(self): positions = self.position_json if positions: positions = json.loads(positions) return { 'id': self.id, 'metadata': self.params_dict, 'css': self.css, 'dashboard_title': self.dashboard_title, 'slug': self.slug, 'slices': [slc.data for slc in self.slices], 'position_json': positions, } @property def params(self): return self.json_metadata @params.setter def params(self, value): self.json_metadata = value @property def position(self): if self.position_json: return json.loads(self.position_json) return {} @classmethod def import_obj(cls, dashboard_to_import, import_time=None): """Imports the dashboard from the object to the database. Once dashboard is imported, json_metadata field is extended and stores remote_id and import_time. It helps to decide if the dashboard has to be overridden or just copies over. Slices that belong to this dashboard will be wired to existing tables. This function can be used to import/export dashboards between multiple superset instances. Audit metadata isn't copied over. """ def alter_positions(dashboard, old_to_new_slc_id_dict): """ Updates slice_ids in the position json. Sample position_json data: { "DASHBOARD_VERSION_KEY": "v2", "DASHBOARD_ROOT_ID": { "type": "DASHBOARD_ROOT_TYPE", "id": "DASHBOARD_ROOT_ID", "children": ["DASHBOARD_GRID_ID"] }, "DASHBOARD_GRID_ID": { "type": "DASHBOARD_GRID_TYPE", "id": "DASHBOARD_GRID_ID", "children": ["DASHBOARD_CHART_TYPE-2"] }, "DASHBOARD_CHART_TYPE-2": { "type": "DASHBOARD_CHART_TYPE", "id": "DASHBOARD_CHART_TYPE-2", "children": [], "meta": { "width": 4, "height": 50, "chartId": 118 } }, } """ position_data = json.loads(dashboard.position_json) position_json = position_data.values() for value in position_json: if (isinstance(value, dict) and value.get('meta') and value.get('meta').get('chartId')): old_slice_id = value.get('meta').get('chartId') if old_slice_id in old_to_new_slc_id_dict: value['meta']['chartId'] = ( old_to_new_slc_id_dict[old_slice_id] ) dashboard.position_json = json.dumps(position_data) logging.info('Started import of the dashboard: {}' .format(dashboard_to_import.to_json())) session = db.session logging.info('Dashboard has {} slices' .format(len(dashboard_to_import.slices))) # copy slices object as Slice.import_slice will mutate the slice # and will remove the existing dashboard - slice association slices = copy(dashboard_to_import.slices) old_to_new_slc_id_dict = {} new_filter_immune_slices = [] new_timed_refresh_immune_slices = [] new_expanded_slices = {} i_params_dict = dashboard_to_import.params_dict remote_id_slice_map = { slc.params_dict['remote_id']: slc for slc in session.query(Slice).all() if 'remote_id' in slc.params_dict } for slc in slices: logging.info('Importing slice {} from the dashboard: {}'.format( slc.to_json(), dashboard_to_import.dashboard_title)) remote_slc = remote_id_slice_map.get(slc.id) new_slc_id = Slice.import_obj(slc, remote_slc, import_time=import_time) old_to_new_slc_id_dict[slc.id] = new_slc_id # update json metadata that deals with slice ids new_slc_id_str = '{}'.format(new_slc_id) old_slc_id_str = '{}'.format(slc.id) if ('filter_immune_slices' in i_params_dict and old_slc_id_str in i_params_dict['filter_immune_slices']): new_filter_immune_slices.append(new_slc_id_str) if ('timed_refresh_immune_slices' in i_params_dict and old_slc_id_str in i_params_dict['timed_refresh_immune_slices']): new_timed_refresh_immune_slices.append(new_slc_id_str) if ('expanded_slices' in i_params_dict and old_slc_id_str in i_params_dict['expanded_slices']): new_expanded_slices[new_slc_id_str] = ( i_params_dict['expanded_slices'][old_slc_id_str]) # override the dashboard existing_dashboard = None for dash in session.query(Dashboard).all(): if ('remote_id' in dash.params_dict and dash.params_dict['remote_id'] == dashboard_to_import.id): existing_dashboard = dash dashboard_to_import.id = None alter_positions(dashboard_to_import, old_to_new_slc_id_dict) dashboard_to_import.alter_params(import_time=import_time) if new_expanded_slices: dashboard_to_import.alter_params( expanded_slices=new_expanded_slices) if new_filter_immune_slices: dashboard_to_import.alter_params( filter_immune_slices=new_filter_immune_slices) if new_timed_refresh_immune_slices: dashboard_to_import.alter_params( timed_refresh_immune_slices=new_timed_refresh_immune_slices) new_slices = session.query(Slice).filter( Slice.id.in_(old_to_new_slc_id_dict.values())).all() if existing_dashboard: existing_dashboard.override(dashboard_to_import) existing_dashboard.slices = new_slices session.flush() return existing_dashboard.id else: # session.add(dashboard_to_import) causes sqlachemy failures # related to the attached users / slices. Creating new object # allows to avoid conflicts in the sql alchemy state. copied_dash = dashboard_to_import.copy() copied_dash.slices = new_slices session.add(copied_dash) session.flush() return copied_dash.id @classmethod def export_dashboards(cls, dashboard_ids): copied_dashboards = [] datasource_ids = set() for dashboard_id in dashboard_ids: # make sure that dashboard_id is an integer dashboard_id = int(dashboard_id) copied_dashboard = ( db.session.query(Dashboard) .options(subqueryload(Dashboard.slices)) .filter_by(id=dashboard_id).first() ) make_transient(copied_dashboard) for slc in copied_dashboard.slices: datasource_ids.add((slc.datasource_id, slc.datasource_type)) # add extra params for the import slc.alter_params( remote_id=slc.id, datasource_name=slc.datasource.name, schema=slc.datasource.name, database_name=slc.datasource.database.name, ) copied_dashboard.alter_params(remote_id=dashboard_id) copied_dashboards.append(copied_dashboard) eager_datasources = [] for dashboard_id, dashboard_type in datasource_ids: eager_datasource = ConnectorRegistry.get_eager_datasource( db.session, dashboard_type, dashboard_id) eager_datasource.alter_params( remote_id=eager_datasource.id, database_name=eager_datasource.database.name, ) make_transient(eager_datasource) eager_datasources.append(eager_datasource) return json.dumps({ 'dashboards': copied_dashboards, 'datasources': eager_datasources, }, cls=utils.DashboardEncoder, indent=4) class Database(Model, AuditMixinNullable, ImportMixin): """An ORM object that stores Database related information""" __tablename__ = 'dbs' type = 'table' __table_args__ = (UniqueConstraint('database_name'),) id = Column(Integer, primary_key=True) verbose_name = Column(String(250), unique=True) # short unique name, used in permissions database_name = Column(String(250), unique=True) sqlalchemy_uri = Column(String(1024)) password = Column(EncryptedType(String(1024), config.get('SECRET_KEY'))) cache_timeout = Column(Integer) select_as_create_table_as = Column(Boolean, default=False) expose_in_sqllab = Column(Boolean, default=True) allow_run_async = Column(Boolean, default=False) allow_csv_upload = Column(Boolean, default=False) allow_ctas = Column(Boolean, default=False) allow_dml = Column(Boolean, default=False) force_ctas_schema = Column(String(250)) allow_multi_schema_metadata_fetch = Column(Boolean, default=False) extra = Column(Text, default=textwrap.dedent("""\ { "metadata_params": {}, "engine_params": {}, "metadata_cache_timeout": {}, "schemas_allowed_for_csv_upload": [] } """)) perm = Column(String(1000)) impersonate_user = Column(Boolean, default=False) export_fields = ('database_name', 'sqlalchemy_uri', 'cache_timeout', 'expose_in_sqllab', 'allow_run_async', 'allow_ctas', 'allow_csv_upload', 'extra') export_children = ['tables'] def __repr__(self): return self.verbose_name if self.verbose_name else self.database_name @property def name(self): return self.verbose_name if self.verbose_name else self.database_name @property def allows_subquery(self): return self.db_engine_spec.allows_subquery @property def data(self): return { 'id': self.id, 'name': self.database_name, 'backend': self.backend, 'allow_multi_schema_metadata_fetch': self.allow_multi_schema_metadata_fetch, 'allows_subquery': self.allows_subquery, } @property def unique_name(self): return self.database_name @property def url_object(self): return make_url(self.sqlalchemy_uri_decrypted) @property def backend(self): url = make_url(self.sqlalchemy_uri_decrypted) return url.get_backend_name() @property def metadata_cache_timeout(self): return self.get_extra().get('metadata_cache_timeout', {}) @property def schema_cache_enabled(self): return 'schema_cache_timeout' in self.metadata_cache_timeout @property def schema_cache_timeout(self): return self.metadata_cache_timeout.get('schema_cache_timeout') @property def table_cache_enabled(self): return 'table_cache_timeout' in self.metadata_cache_timeout @property def table_cache_timeout(self): return self.metadata_cache_timeout.get('table_cache_timeout') @classmethod def get_password_masked_url_from_uri(cls, uri): url = make_url(uri) return cls.get_password_masked_url(url) @classmethod def get_password_masked_url(cls, url): url_copy = deepcopy(url) if url_copy.password is not None and url_copy.password != PASSWORD_MASK: url_copy.password = PASSWORD_MASK return url_copy def set_sqlalchemy_uri(self, uri): conn = sqla.engine.url.make_url(uri.strip()) if conn.password != PASSWORD_MASK and not custom_password_store: # do not over-write the password with the password mask self.password = conn.password conn.password = PASSWORD_MASK if conn.password else None self.sqlalchemy_uri = str(conn) # hides the password def get_effective_user(self, url, user_name=None): """ Get the effective user, especially during impersonation. :param url: SQL Alchemy URL object :param user_name: Default username :return: The effective username """ effective_username = None if self.impersonate_user: effective_username = url.username if user_name: effective_username = user_name elif ( hasattr(g, 'user') and hasattr(g.user, 'username') and g.user.username is not None ): effective_username = g.user.username return effective_username @utils.memoized( watch=('impersonate_user', 'sqlalchemy_uri_decrypted', 'extra')) def get_sqla_engine(self, schema=None, nullpool=True, user_name=None, source=None): extra = self.get_extra() url = make_url(self.sqlalchemy_uri_decrypted) url = self.db_engine_spec.adjust_database_uri(url, schema) effective_username = self.get_effective_user(url, user_name) # If using MySQL or Presto for example, will set url.username # If using Hive, will not do anything yet since that relies on a # configuration parameter instead. self.db_engine_spec.modify_url_for_impersonation( url, self.impersonate_user, effective_username) masked_url = self.get_password_masked_url(url) logging.info('Database.get_sqla_engine(). Masked URL: {0}'.format(masked_url)) params = extra.get('engine_params', {}) if nullpool: params['poolclass'] = NullPool # If using Hive, this will set hive.server2.proxy.user=$effective_username configuration = {} configuration.update( self.db_engine_spec.get_configuration_for_impersonation( str(url), self.impersonate_user, effective_username)) if configuration: d = params.get('connect_args', {}) d['configuration'] = configuration params['connect_args'] = d DB_CONNECTION_MUTATOR = config.get('DB_CONNECTION_MUTATOR') if DB_CONNECTION_MUTATOR: url, params = DB_CONNECTION_MUTATOR( url, params, effective_username, security_manager, source) return create_engine(url, **params) def get_reserved_words(self): return self.get_dialect().preparer.reserved_words def get_quoter(self): return self.get_dialect().identifier_preparer.quote def get_df(self, sql, schema): sqls = [str(s).strip().strip(';') for s in sqlparse.parse(sql)] source_key = None if request and request.referrer: if '/superset/dashboard/' in request.referrer: source_key = 'dashboard' elif '/superset/explore/' in request.referrer: source_key = 'chart' engine = self.get_sqla_engine( schema=schema, source=utils.sources.get(source_key, None)) username = utils.get_username() def needs_conversion(df_series): if df_series.empty: return False if isinstance(df_series[0], (list, dict)): return True return False def _log_query(sql): if log_query: log_query(engine.url, sql, schema, username, __name__, security_manager) with closing(engine.raw_connection()) as conn: with closing(conn.cursor()) as cursor: for sql in sqls[:-1]: _log_query(sql) self.db_engine_spec.execute(cursor, sql) cursor.fetchall() _log_query(sqls[-1]) self.db_engine_spec.execute(cursor, sqls[-1]) if cursor.description is not None: columns = [col_desc[0] for col_desc in cursor.description] else: columns = [] df = pd.DataFrame.from_records( data=list(cursor.fetchall()), columns=columns, coerce_float=True, ) for k, v in df.dtypes.items(): if v.type == numpy.object_ and needs_conversion(df[k]): df[k] = df[k].apply(utils.json_dumps_w_dates) return df def compile_sqla_query(self, qry, schema=None): engine = self.get_sqla_engine(schema=schema) sql = str( qry.compile( engine, compile_kwargs={'literal_binds': True}, ), ) if engine.dialect.identifier_preparer._double_percents: sql = sql.replace('%%', '%') return sql def select_star( self, table_name, schema=None, limit=100, show_cols=False, indent=True, latest_partition=False, cols=None): """Generates a ``select *`` statement in the proper dialect""" eng = self.get_sqla_engine( schema=schema, source=utils.sources.get('sql_lab', None)) return self.db_engine_spec.select_star( self, table_name, schema=schema, engine=eng, limit=limit, show_cols=show_cols, indent=indent, latest_partition=latest_partition, cols=cols) def apply_limit_to_sql(self, sql, limit=1000): return self.db_engine_spec.apply_limit_to_sql(sql, limit, self) def safe_sqlalchemy_uri(self): return self.sqlalchemy_uri @property def inspector(self): engine = self.get_sqla_engine() return sqla.inspect(engine) @cache_util.memoized_func( key=lambda *args, **kwargs: 'db:{}:schema:None:table_list', attribute_in_key='id') def all_table_names_in_database(self, cache=False, cache_timeout=None, force=False): """Parameters need to be passed as keyword arguments.""" if not self.allow_multi_schema_metadata_fetch: return [] return self.db_engine_spec.fetch_result_sets(self, 'table') @cache_util.memoized_func( key=lambda *args, **kwargs: 'db:{}:schema:None:view_list', attribute_in_key='id') def all_view_names_in_database(self, cache=False, cache_timeout=None, force=False): """Parameters need to be passed as keyword arguments.""" if not self.allow_multi_schema_metadata_fetch: return [] return self.db_engine_spec.fetch_result_sets(self, 'view') @cache_util.memoized_func( key=lambda *args, **kwargs: 'db:{{}}:schema:{}:table_list'.format( kwargs.get('schema')), attribute_in_key='id') def all_table_names_in_schema(self, schema, cache=False, cache_timeout=None, force=False): """Parameters need to be passed as keyword arguments. For unused parameters, they are referenced in cache_util.memoized_func decorator. :param schema: schema name :type schema: str :param cache: whether cache is enabled for the function :type cache: bool :param cache_timeout: timeout in seconds for the cache :type cache_timeout: int :param force: whether to force refresh the cache :type force: bool :return: table list :rtype: list """ tables = [] try: tables = self.db_engine_spec.get_table_names( inspector=self.inspector, schema=schema) except Exception as e: logging.exception(e) return tables @cache_util.memoized_func( key=lambda *args, **kwargs: 'db:{{}}:schema:{}:view_list'.format( kwargs.get('schema')), attribute_in_key='id') def all_view_names_in_schema(self, schema, cache=False, cache_timeout=None, force=False): """Parameters need to be passed as keyword arguments. For unused parameters, they are referenced in cache_util.memoized_func decorator. :param schema: schema name :type schema: str :param cache: whether cache is enabled for the function :type cache: bool :param cache_timeout: timeout in seconds for the cache :type cache_timeout: int :param force: whether to force refresh the cache :type force: bool :return: view list :rtype: list """ views = [] try: views = self.db_engine_spec.get_view_names( inspector=self.inspector, schema=schema) except Exception as e: logging.exception(e) return views @cache_util.memoized_func( key=lambda *args, **kwargs: 'db:{}:schema_list', attribute_in_key='id') def all_schema_names(self, cache=False, cache_timeout=None, force=False): """Parameters need to be passed as keyword arguments. For unused parameters, they are referenced in cache_util.memoized_func decorator. :param cache: whether cache is enabled for the function :type cache: bool :param cache_timeout: timeout in seconds for the cache :type cache_timeout: int :param force: whether to force refresh the cache :type force: bool :return: schema list :rtype: list """ return self.db_engine_spec.get_schema_names(self.inspector) @property def db_engine_spec(self): return db_engine_specs.engines.get( self.backend, db_engine_specs.BaseEngineSpec) @classmethod def get_db_engine_spec_for_backend(cls, backend): return db_engine_specs.engines.get(backend, db_engine_specs.BaseEngineSpec) def grains(self): """Defines time granularity database-specific expressions. The idea here is to make it easy for users to change the time grain form a datetime (maybe the source grain is arbitrary timestamps, daily or 5 minutes increments) to another, "truncated" datetime. Since each database has slightly different but similar datetime functions, this allows a mapping between database engines and actual functions. """ return self.db_engine_spec.get_time_grains() def grains_dict(self): """Allowing to lookup grain by either label or duration For backward compatibility""" d = {grain.duration: grain for grain in self.grains()} d.update({grain.label: grain for grain in self.grains()}) return d def get_extra(self): extra = {} if self.extra: try: extra = json.loads(self.extra) except Exception as e: logging.error(e) raise e return extra def get_table(self, table_name, schema=None): extra = self.get_extra() meta = MetaData(**extra.get('metadata_params', {})) return Table( table_name, meta, schema=schema or None, autoload=True, autoload_with=self.get_sqla_engine()) def get_columns(self, table_name, schema=None): return self.inspector.get_columns(table_name, schema) def get_indexes(self, table_name, schema=None): return self.inspector.get_indexes(table_name, schema) def get_pk_constraint(self, table_name, schema=None): return self.inspector.get_pk_constraint(table_name, schema) def get_foreign_keys(self, table_name, schema=None): return self.inspector.get_foreign_keys(table_name, schema) def get_schema_access_for_csv_upload(self): return self.get_extra().get('schemas_allowed_for_csv_upload', []) @property def sqlalchemy_uri_decrypted(self): conn = sqla.engine.url.make_url(self.sqlalchemy_uri) if custom_password_store: conn.password = custom_password_store(conn) else: conn.password = self.password return str(conn) @property def sql_url(self): return '/superset/sql/{}/'.format(self.id) def get_perm(self): return ( '[{obj.database_name}].(id:{obj.id})').format(obj=self) def has_table(self, table): engine = self.get_sqla_engine() return engine.has_table( table.table_name, table.schema or None) @utils.memoized def get_dialect(self): sqla_url = url.make_url(self.sqlalchemy_uri_decrypted) return sqla_url.get_dialect()() sqla.event.listen(Database, 'after_insert', security_manager.set_perm) sqla.event.listen(Database, 'after_update', security_manager.set_perm) class Log(Model): """ORM object used to log Superset actions to the database""" __tablename__ = 'logs' id = Column(Integer, primary_key=True) action = Column(String(512)) user_id = Column(Integer, ForeignKey('ab_user.id')) dashboard_id = Column(Integer) slice_id = Column(Integer) json = Column(Text) user = relationship( security_manager.user_model, backref='logs', foreign_keys=[user_id]) dttm = Column(DateTime, default=datetime.utcnow) duration_ms = Column(Integer) referrer = Column(String(1024)) @classmethod def log_this(cls, f): """Decorator to log user actions""" @functools.wraps(f) def wrapper(*args, **kwargs): user_id = None if g.user: user_id = g.user.get_id() d = request.form.to_dict() or {} # request parameters can overwrite post body request_params = request.args.to_dict() d.update(request_params) d.update(kwargs) slice_id = d.get('slice_id') dashboard_id = d.get('dashboard_id') try: slice_id = int( slice_id or json.loads(d.get('form_data')).get('slice_id')) except (ValueError, TypeError): slice_id = 0 stats_logger.incr(f.__name__) start_dttm = datetime.now() value = f(*args, **kwargs) duration_ms = (datetime.now() - start_dttm).total_seconds() * 1000 # bulk insert try: explode_by = d.get('explode') records = json.loads(d.get(explode_by)) except Exception: records = [d] referrer = request.referrer[:1000] if request.referrer else None logs = [] for record in records: try: json_string = json.dumps(record) except Exception: json_string = None log = cls( action=f.__name__, json=json_string, dashboard_id=dashboard_id, slice_id=slice_id, duration_ms=duration_ms, referrer=referrer, user_id=user_id) logs.append(log) sesh = db.session() sesh.bulk_save_objects(logs) sesh.commit() return value return wrapper class FavStar(Model): __tablename__ = 'favstar' id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey('ab_user.id')) class_name = Column(String(50)) obj_id = Column(Integer) dttm = Column(DateTime, default=datetime.utcnow) class DatasourceAccessRequest(Model, AuditMixinNullable): """ORM model for the access requests for datasources and dbs.""" __tablename__ = 'access_request' id = Column(Integer, primary_key=True) datasource_id = Column(Integer) datasource_type = Column(String(200)) ROLES_BLACKLIST = set(config.get('ROBOT_PERMISSION_ROLES', [])) @property def cls_model(self): return ConnectorRegistry.sources[self.datasource_type] @property def username(self): return self.creator() @property def datasource(self): return self.get_datasource @datasource.getter @utils.memoized def get_datasource(self): # pylint: disable=no-member ds = db.session.query(self.cls_model).filter_by( id=self.datasource_id).first() return ds @property def datasource_link(self): return self.datasource.link # pylint: disable=no-member @property def roles_with_datasource(self): action_list = '' perm = self.datasource.perm # pylint: disable=no-member pv = security_manager.find_permission_view_menu('datasource_access', perm) for r in pv.role: if r.name in self.ROLES_BLACKLIST: continue # pylint: disable=no-member url = ( f'/superset/approve?datasource_type={self.datasource_type}&' f'datasource_id={self.datasource_id}&' f'created_by={self.created_by.username}&role_to_grant={r.name}' ) href = 'Grant {} Role'.format(url, r.name) action_list = action_list + '
  • ' + href + '
  • ' return '' @property def user_roles(self): action_list = '' for r in self.created_by.roles: # pylint: disable=no-member # pylint: disable=no-member url = ( f'/superset/approve?datasource_type={self.datasource_type}&' f'datasource_id={self.datasource_id}&' f'created_by={self.created_by.username}&role_to_extend={r.name}' ) href = 'Extend {} Role'.format(url, r.name) if r.name in self.ROLES_BLACKLIST: href = '{} Role'.format(r.name) action_list = action_list + '
  • ' + href + '
  • ' return '' # events for updating tags sqla.event.listen(Slice, 'after_insert', ChartUpdater.after_insert) sqla.event.listen(Slice, 'after_update', ChartUpdater.after_update) sqla.event.listen(Slice, 'after_delete', ChartUpdater.after_delete) sqla.event.listen(Dashboard, 'after_insert', DashboardUpdater.after_insert) sqla.event.listen(Dashboard, 'after_update', DashboardUpdater.after_update) sqla.event.listen(Dashboard, 'after_delete', DashboardUpdater.after_delete) sqla.event.listen(FavStar, 'after_insert', FavStarUpdater.after_insert) sqla.event.listen(FavStar, 'after_delete', FavStarUpdater.after_delete)