From 90b08fa0958c6d93bec5cee1f1566bb86047fe11 Mon Sep 17 00:00:00 2001 From: Jesse Yang Date: Fri, 8 Apr 2022 21:45:06 -0700 Subject: [PATCH] chore: skip SIP-68 shadow writing for LTS --- superset/connectors/sqla/models.py | 501 +------------- .../b8d3a24d9131_new_dataset_models.py | 610 +----------------- superset/tables/models.py | 13 +- tests/integration_tests/sqla_models_tests.py | 1 + .../commands/importers/v1/import_test.py | 8 +- tests/unit_tests/datasets/test_models.py | 15 +- tests/unit_tests/tables/test_models.py | 2 + 7 files changed, 31 insertions(+), 1119 deletions(-) diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index b8d3a7d0914..30e60a78c3d 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# pylint: disable=too-many-lines, redefined-outer-name +# pylint: disable=too-many-lines import dataclasses import json import logging @@ -72,16 +72,13 @@ from sqlalchemy.sql.expression import Label, Select, TextAsFrom from sqlalchemy.sql.selectable import Alias, TableClause from superset import app, db, is_feature_enabled, security_manager -from superset.columns.models import Column as NewColumn from superset.common.db_query_status import QueryStatus from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric from superset.connectors.sqla.utils import ( get_physical_table_metadata, get_virtual_table_metadata, - load_or_create_tables, validate_adhoc_subquery, ) -from superset.datasets.models import Dataset as NewDataset from superset.db_engine_specs.base import BaseEngineSpec, CTE_ALIAS, TimestampExpression from superset.exceptions import ( QueryClauseValidationException, @@ -94,12 +91,7 @@ from superset.jinja_context import ( ) from superset.models.annotations import Annotation from superset.models.core import Database -from superset.models.helpers import ( - AuditMixinNullable, - CertificationMixin, - clone_model, - QueryResult, -) +from superset.models.helpers import AuditMixinNullable, CertificationMixin, QueryResult from superset.sql_parse import ParsedQuery, sanitize_clause from superset.superset_typing import ( AdhocColumn, @@ -108,7 +100,6 @@ from superset.superset_typing import ( OrderBy, QueryObjectDict, ) -from superset.tables.models import Table as NewTable from superset.utils import core as utils from superset.utils.core import ( GenericDataType, @@ -1940,496 +1931,12 @@ class SqlaTable(Model, BaseDatasource): # pylint: disable=too-many-public-metho """ inspector = inspect(target) session = inspector.session - - # get DB-specific conditional quoter for expressions that point to columns or - # table names - database = ( - target.table.database - or session.query(Database).filter_by(id=target.database_id).one() - ) - engine = database.get_sqla_engine(schema=target.table.schema) - conditional_quote = engine.dialect.identifier_preparer.quote - session.execute(update(SqlaTable).where(SqlaTable.id == target.table.id)) - dataset = ( - session.query(NewDataset) - .filter_by(sqlatable_id=target.table.id) - .one_or_none() - ) - - if not dataset: - # if dataset is not found create a new copy - # of the dataset instead of updating the existing - - SqlaTable.write_shadow_dataset(target.table, database, session) - return - - # update ``Column`` model as well - if isinstance(target, TableColumn): - columns = [ - column - for column in dataset.columns - if column.name == target.column_name - ] - if not columns: - return - - column = columns[0] - extra_json = json.loads(target.extra or "{}") - for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}: - value = getattr(target, attr) - if value: - extra_json[attr] = value - - column.name = target.column_name - column.type = target.type or "Unknown" - column.expression = target.expression or conditional_quote( - target.column_name - ) - column.description = target.description - column.is_temporal = target.is_dttm - column.is_physical = target.expression is None - column.extra_json = json.dumps(extra_json) if extra_json else None - - else: # SqlMetric - columns = [ - column - for column in dataset.columns - if column.name == target.metric_name - ] - if not columns: - return - - column = columns[0] - extra_json = json.loads(target.extra or "{}") - for attr in {"verbose_name", "metric_type", "d3format"}: - value = getattr(target, attr) - if value: - extra_json[attr] = value - - is_additive = ( - target.metric_type - and target.metric_type.lower() in ADDITIVE_METRIC_TYPES - ) - - column.name = target.metric_name - column.expression = target.expression - column.warning_text = target.warning_text - column.description = target.description - column.is_additive = is_additive - column.extra_json = json.dumps(extra_json) if extra_json else None - - @staticmethod - def after_insert( - mapper: Mapper, - connection: Connection, - target: "SqlaTable", - ) -> None: - """ - Shadow write the dataset to new models. - - The ``SqlaTable`` model is currently being migrated to two new models, ``Table`` - and ``Dataset``. In the first phase of the migration the new models are populated - whenever ``SqlaTable`` is modified (created, updated, or deleted). - - In the second phase of the migration reads will be done from the new models. - Finally, in the third phase of the migration the old models will be removed. - - For more context: https://github.com/apache/superset/issues/14909 - """ - session = inspect(target).session - # set permissions - security_manager.set_perm(mapper, connection, target) - - # get DB-specific conditional quoter for expressions that point to columns or - # table names - database = ( - target.database - or session.query(Database).filter_by(id=target.database_id).one() - ) - - SqlaTable.write_shadow_dataset(target, database, session) - - @staticmethod - def after_delete( # pylint: disable=unused-argument - mapper: Mapper, - connection: Connection, - target: "SqlaTable", - ) -> None: - """ - Shadow write the dataset to new models. - - The ``SqlaTable`` model is currently being migrated to two new models, ``Table`` - and ``Dataset``. In the first phase of the migration the new models are populated - whenever ``SqlaTable`` is modified (created, updated, or deleted). - - In the second phase of the migration reads will be done from the new models. - Finally, in the third phase of the migration the old models will be removed. - - For more context: https://github.com/apache/superset/issues/14909 - """ - session = inspect(target).session - dataset = ( - session.query(NewDataset).filter_by(sqlatable_id=target.id).one_or_none() - ) - if dataset: - session.delete(dataset) - - @staticmethod - def after_update( # pylint: disable=too-many-branches, too-many-locals, too-many-statements - mapper: Mapper, - connection: Connection, - target: "SqlaTable", - ) -> None: - """ - Shadow write the dataset to new models. - - The ``SqlaTable`` model is currently being migrated to two new models, ``Table`` - and ``Dataset``. In the first phase of the migration the new models are populated - whenever ``SqlaTable`` is modified (created, updated, or deleted). - - In the second phase of the migration reads will be done from the new models. - Finally, in the third phase of the migration the old models will be removed. - - For more context: https://github.com/apache/superset/issues/14909 - """ - inspector = inspect(target) - session = inspector.session - - # double-check that ``UPDATE``s are actually pending (this method is called even - # for instances that have no net changes to their column-based attributes) - if not session.is_modified(target, include_collections=True): - return - - # set permissions - security_manager.set_perm(mapper, connection, target) - - dataset = ( - session.query(NewDataset).filter_by(sqlatable_id=target.id).one_or_none() - ) - if not dataset: - return - - # get DB-specific conditional quoter for expressions that point to columns or - # table names - database = ( - target.database - or session.query(Database).filter_by(id=target.database_id).one() - ) - engine = database.get_sqla_engine(schema=target.schema) - conditional_quote = engine.dialect.identifier_preparer.quote - - # update columns - if inspector.attrs.columns.history.has_changes(): - # handle deleted columns - if inspector.attrs.columns.history.deleted: - column_names = { - column.column_name - for column in inspector.attrs.columns.history.deleted - } - dataset.columns = [ - column - for column in dataset.columns - if column.name not in column_names - ] - - # handle inserted columns - for column in inspector.attrs.columns.history.added: - # ``is_active`` might be ``None``, but it defaults to ``True``. - if column.is_active is False: - continue - - extra_json = json.loads(column.extra or "{}") - for attr in { - "groupby", - "filterable", - "verbose_name", - "python_date_format", - }: - value = getattr(column, attr) - if value: - extra_json[attr] = value - - dataset.columns.append( - NewColumn( - name=column.column_name, - type=column.type or "Unknown", - expression=column.expression - or conditional_quote(column.column_name), - description=column.description, - is_temporal=column.is_dttm, - is_aggregation=False, - is_physical=column.expression is None, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ) - ) - - # update metrics - if inspector.attrs.metrics.history.has_changes(): - # handle deleted metrics - if inspector.attrs.metrics.history.deleted: - column_names = { - metric.metric_name - for metric in inspector.attrs.metrics.history.deleted - } - dataset.columns = [ - column - for column in dataset.columns - if column.name not in column_names - ] - - # handle inserted metrics - for metric in inspector.attrs.metrics.history.added: - extra_json = json.loads(metric.extra or "{}") - for attr in {"verbose_name", "metric_type", "d3format"}: - value = getattr(metric, attr) - if value: - extra_json[attr] = value - - is_additive = ( - metric.metric_type - and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES - ) - - dataset.columns.append( - NewColumn( - name=metric.metric_name, - type="Unknown", - expression=metric.expression, - warning_text=metric.warning_text, - description=metric.description, - is_aggregation=True, - is_additive=is_additive, - is_physical=False, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ) - ) - - # physical dataset - if target.sql is None: - physical_columns = [ - column for column in dataset.columns if column.is_physical - ] - - # if the table name changed we should create a new table instance, instead - # of reusing the original one - if ( - inspector.attrs.table_name.history.has_changes() - or inspector.attrs.schema.history.has_changes() - or inspector.attrs.database_id.history.has_changes() - ): - # does the dataset point to an existing table? - table = ( - session.query(NewTable) - .filter_by( - database_id=target.database_id, - schema=target.schema, - name=target.table_name, - ) - .first() - ) - if not table: - # create new columns - physical_columns = [ - clone_model(column, ignore=["uuid"]) - for column in physical_columns - ] - - # create new table - table = NewTable( - name=target.table_name, - schema=target.schema, - catalog=None, - database_id=target.database_id, - columns=physical_columns, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ) - dataset.tables = [table] - elif dataset.tables: - table = dataset.tables[0] - table.columns = physical_columns - - # virtual dataset - else: - # mark all columns as virtual (not physical) - for column in dataset.columns: - column.is_physical = False - - # update referenced tables if SQL changed - if inspector.attrs.sql.history.has_changes(): - parsed = ParsedQuery(target.sql) - referenced_tables = parsed.tables - - predicate = or_( - *[ - and_( - NewTable.schema == (table.schema or target.schema), - NewTable.name == table.table, - ) - for table in referenced_tables - ] - ) - dataset.tables = session.query(NewTable).filter(predicate).all() - - # update other attributes - dataset.name = target.table_name - dataset.expression = target.sql or conditional_quote(target.table_name) - dataset.is_physical = target.sql is None - - @staticmethod - def write_shadow_dataset( # pylint: disable=too-many-locals - dataset: "SqlaTable", database: Database, session: Session - ) -> None: - """ - Shadow write the dataset to new models. - - The ``SqlaTable`` model is currently being migrated to two new models, ``Table`` - and ``Dataset``. In the first phase of the migration the new models are populated - whenever ``SqlaTable`` is modified (created, updated, or deleted). - - In the second phase of the migration reads will be done from the new models. - Finally, in the third phase of the migration the old models will be removed. - - For more context: https://github.com/apache/superset/issues/14909 - """ - - engine = database.get_sqla_engine(schema=dataset.schema) - conditional_quote = engine.dialect.identifier_preparer.quote - - # create columns - columns = [] - for column in dataset.columns: - # ``is_active`` might be ``None`` at this point, but it defaults to ``True``. - if column.is_active is False: - continue - - try: - extra_json = json.loads(column.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}: - value = getattr(column, attr) - if value: - extra_json[attr] = value - - columns.append( - NewColumn( - name=column.column_name, - type=column.type or "Unknown", - expression=column.expression - or conditional_quote(column.column_name), - description=column.description, - is_temporal=column.is_dttm, - is_aggregation=False, - is_physical=column.expression is None, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=dataset.is_managed_externally, - external_url=dataset.external_url, - ), - ) - - # create metrics - for metric in dataset.metrics: - try: - extra_json = json.loads(metric.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"verbose_name", "metric_type", "d3format"}: - value = getattr(metric, attr) - if value: - extra_json[attr] = value - - is_additive = ( - metric.metric_type - and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES - ) - - columns.append( - NewColumn( - name=metric.metric_name, - type="Unknown", # figuring this out would require a type inferrer - expression=metric.expression, - warning_text=metric.warning_text, - description=metric.description, - is_aggregation=True, - is_additive=is_additive, - is_physical=False, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=dataset.is_managed_externally, - external_url=dataset.external_url, - ), - ) - - # physical dataset - if not dataset.sql: - physical_columns = [column for column in columns if column.is_physical] - - # create table - table = NewTable( - name=dataset.table_name, - schema=dataset.schema, - catalog=None, # currently not supported - database_id=dataset.database_id, - columns=physical_columns, - is_managed_externally=dataset.is_managed_externally, - external_url=dataset.external_url, - ) - tables = [table] - - # virtual dataset - else: - # mark all columns as virtual (not physical) - for column in columns: - column.is_physical = False - - # find referenced tables - parsed = ParsedQuery(dataset.sql) - referenced_tables = parsed.tables - tables = load_or_create_tables( - session, - dataset.database_id, - dataset.schema, - referenced_tables, - conditional_quote, - engine, - ) - - # create the new dataset - new_dataset = NewDataset( - sqlatable_id=dataset.id, - name=dataset.table_name, - expression=dataset.sql or conditional_quote(dataset.table_name), - tables=tables, - columns=columns, - is_physical=not dataset.sql, - is_managed_externally=dataset.is_managed_externally, - external_url=dataset.external_url, - ) - session.add(new_dataset) - +sa.event.listen(SqlaTable, "after_insert", security_manager.set_perm) +sa.event.listen(SqlaTable, "after_update", security_manager.set_perm) sa.event.listen(SqlaTable, "before_update", SqlaTable.before_update) -sa.event.listen(SqlaTable, "after_insert", SqlaTable.after_insert) -sa.event.listen(SqlaTable, "after_delete", SqlaTable.after_delete) -sa.event.listen(SqlaTable, "after_update", SqlaTable.after_update) sa.event.listen(SqlMetric, "after_update", SqlaTable.update_table) sa.event.listen(TableColumn, "after_update", SqlaTable.update_table) diff --git a/superset/migrations/versions/b8d3a24d9131_new_dataset_models.py b/superset/migrations/versions/b8d3a24d9131_new_dataset_models.py index 22aa2f451c8..50df3e1f37b 100644 --- a/superset/migrations/versions/b8d3a24d9131_new_dataset_models.py +++ b/superset/migrations/versions/b8d3a24d9131_new_dataset_models.py @@ -16,7 +16,7 @@ # under the License. # pylint: disable=too-few-public-methods -"""New dataset models +"""New dataset models (skipped) Revision ID: b8d3a24d9131 Revises: 5afbb1a5849b @@ -24,618 +24,14 @@ Create Date: 2021-11-11 16:41:53.266965 """ -import json -from datetime import date, datetime, time, timedelta -from typing import Callable, List, Optional, Set -from uuid import uuid4 - -import sqlalchemy as sa -from alembic import op -from sqlalchemy import and_, inspect, or_ -from sqlalchemy.engine.url import make_url -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import backref, relationship, Session -from sqlalchemy.schema import UniqueConstraint -from sqlalchemy.sql.type_api import TypeEngine -from sqlalchemy_utils import UUIDType - -from superset import app, db -from superset.connectors.sqla.models import ADDITIVE_METRIC_TYPES -from superset.extensions import encrypted_field_factory -from superset.migrations.shared.utils import extract_table_references -from superset.models.core import Database as OriginalDatabase -from superset.sql_parse import Table - # revision identifiers, used by Alembic. revision = "b8d3a24d9131" down_revision = "5afbb1a5849b" -Base = declarative_base() -custom_password_store = app.config["SQLALCHEMY_CUSTOM_PASSWORD_STORE"] -DB_CONNECTION_MUTATOR = app.config["DB_CONNECTION_MUTATOR"] - - -class Database(Base): - - __tablename__ = "dbs" - __table_args__ = (UniqueConstraint("database_name"),) - - id = sa.Column(sa.Integer, primary_key=True) - database_name = sa.Column(sa.String(250), unique=True, nullable=False) - sqlalchemy_uri = sa.Column(sa.String(1024), nullable=False) - password = sa.Column(encrypted_field_factory.create(sa.String(1024))) - impersonate_user = sa.Column(sa.Boolean, default=False) - encrypted_extra = sa.Column(encrypted_field_factory.create(sa.Text), nullable=True) - extra = sa.Column( - sa.Text, - default=json.dumps( - dict( - metadata_params={}, - engine_params={}, - metadata_cache_timeout={}, - schemas_allowed_for_file_upload=[], - ) - ), - ) - server_cert = sa.Column(encrypted_field_factory.create(sa.Text), nullable=True) - - -class TableColumn(Base): - - __tablename__ = "table_columns" - __table_args__ = (UniqueConstraint("table_id", "column_name"),) - - id = sa.Column(sa.Integer, primary_key=True) - table_id = sa.Column(sa.Integer, sa.ForeignKey("tables.id")) - is_active = sa.Column(sa.Boolean, default=True) - extra = sa.Column(sa.Text) - column_name = sa.Column(sa.String(255), nullable=False) - type = sa.Column(sa.String(32)) - expression = sa.Column(sa.Text) - description = sa.Column(sa.Text) - is_dttm = sa.Column(sa.Boolean, default=False) - filterable = sa.Column(sa.Boolean, default=True) - groupby = sa.Column(sa.Boolean, default=True) - verbose_name = sa.Column(sa.String(1024)) - python_date_format = sa.Column(sa.String(255)) - - -class SqlMetric(Base): - - __tablename__ = "sql_metrics" - __table_args__ = (UniqueConstraint("table_id", "metric_name"),) - - id = sa.Column(sa.Integer, primary_key=True) - table_id = sa.Column(sa.Integer, sa.ForeignKey("tables.id")) - extra = sa.Column(sa.Text) - metric_type = sa.Column(sa.String(32)) - metric_name = sa.Column(sa.String(255), nullable=False) - expression = sa.Column(sa.Text, nullable=False) - warning_text = sa.Column(sa.Text) - description = sa.Column(sa.Text) - d3format = sa.Column(sa.String(128)) - verbose_name = sa.Column(sa.String(1024)) - - -class SqlaTable(Base): - - __tablename__ = "tables" - __table_args__ = (UniqueConstraint("database_id", "schema", "table_name"),) - - def fetch_columns_and_metrics(self, session: Session) -> None: - self.columns = session.query(TableColumn).filter( - TableColumn.table_id == self.id - ) - self.metrics = session.query(SqlMetric).filter(TableColumn.table_id == self.id) - - id = sa.Column(sa.Integer, primary_key=True) - columns: List[TableColumn] = [] - column_class = TableColumn - metrics: List[SqlMetric] = [] - metric_class = SqlMetric - - database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False) - database: Database = relationship( - "Database", - backref=backref("tables", cascade="all, delete-orphan"), - foreign_keys=[database_id], - ) - schema = sa.Column(sa.String(255)) - table_name = sa.Column(sa.String(250), nullable=False) - sql = sa.Column(sa.Text) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) - - -table_column_association_table = sa.Table( - "sl_table_columns", - Base.metadata, - sa.Column("table_id", sa.ForeignKey("sl_tables.id")), - sa.Column("column_id", sa.ForeignKey("sl_columns.id")), -) - -dataset_column_association_table = sa.Table( - "sl_dataset_columns", - Base.metadata, - sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id")), - sa.Column("column_id", sa.ForeignKey("sl_columns.id")), -) - -dataset_table_association_table = sa.Table( - "sl_dataset_tables", - Base.metadata, - sa.Column("dataset_id", sa.ForeignKey("sl_datasets.id")), - sa.Column("table_id", sa.ForeignKey("sl_tables.id")), -) - - -class NewColumn(Base): - - __tablename__ = "sl_columns" - - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.Text) - type = sa.Column(sa.Text) - expression = sa.Column(sa.Text) - is_physical = sa.Column(sa.Boolean, default=True) - description = sa.Column(sa.Text) - warning_text = sa.Column(sa.Text) - is_temporal = sa.Column(sa.Boolean, default=False) - is_aggregation = sa.Column(sa.Boolean, default=False) - is_additive = sa.Column(sa.Boolean, default=False) - is_spatial = sa.Column(sa.Boolean, default=False) - is_partition = sa.Column(sa.Boolean, default=False) - is_increase_desired = sa.Column(sa.Boolean, default=True) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) - extra_json = sa.Column(sa.Text, default="{}") - - -class NewTable(Base): - - __tablename__ = "sl_tables" - __table_args__ = (UniqueConstraint("database_id", "catalog", "schema", "name"),) - - id = sa.Column(sa.Integer, primary_key=True) - name = sa.Column(sa.Text) - schema = sa.Column(sa.Text) - catalog = sa.Column(sa.Text) - database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False) - database: Database = relationship( - "Database", - backref=backref("new_tables", cascade="all, delete-orphan"), - foreign_keys=[database_id], - ) - columns: List[NewColumn] = relationship( - "NewColumn", secondary=table_column_association_table, cascade="all, delete" - ) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) - - -class NewDataset(Base): - - __tablename__ = "sl_datasets" - - id = sa.Column(sa.Integer, primary_key=True) - sqlatable_id = sa.Column(sa.Integer, nullable=True, unique=True) - name = sa.Column(sa.Text) - expression = sa.Column(sa.Text) - tables: List[NewTable] = relationship( - "NewTable", secondary=dataset_table_association_table - ) - columns: List[NewColumn] = relationship( - "NewColumn", secondary=dataset_column_association_table, cascade="all, delete" - ) - is_physical = sa.Column(sa.Boolean, default=False) - is_managed_externally = sa.Column(sa.Boolean, nullable=False, default=False) - external_url = sa.Column(sa.Text, nullable=True) - - -TEMPORAL_TYPES = {date, datetime, time, timedelta} - - -def is_column_type_temporal(column_type: TypeEngine) -> bool: - try: - return column_type.python_type in TEMPORAL_TYPES - except NotImplementedError: - return False - - -def load_or_create_tables( - session: Session, - database_id: int, - default_schema: Optional[str], - tables: Set[Table], - conditional_quote: Callable[[str], str], -) -> List[NewTable]: - """ - Load or create new table model instances. - """ - if not tables: - return [] - - # set the default schema in tables that don't have it - if default_schema: - tables = list(tables) - for i, table in enumerate(tables): - if table.schema is None: - tables[i] = Table(table.table, default_schema, table.catalog) - - # load existing tables - predicate = or_( - *[ - and_( - NewTable.database_id == database_id, - NewTable.schema == table.schema, - NewTable.name == table.table, - ) - for table in tables - ] - ) - new_tables = session.query(NewTable).filter(predicate).all() - - # use original database model to get the engine - engine = ( - session.query(OriginalDatabase) - .filter_by(id=database_id) - .one() - .get_sqla_engine(default_schema) - ) - inspector = inspect(engine) - - # add missing tables - existing = {(table.schema, table.name) for table in new_tables} - for table in tables: - if (table.schema, table.table) not in existing: - column_metadata = inspector.get_columns(table.table, schema=table.schema) - columns = [ - NewColumn( - name=column["name"], - type=str(column["type"]), - expression=conditional_quote(column["name"]), - is_temporal=is_column_type_temporal(column["type"]), - is_aggregation=False, - is_physical=True, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - ) - for column in column_metadata - ] - new_tables.append( - NewTable( - name=table.table, - schema=table.schema, - catalog=None, - database_id=database_id, - columns=columns, - ) - ) - existing.add((table.schema, table.table)) - - return new_tables - - -def after_insert(target: SqlaTable) -> None: # pylint: disable=too-many-locals - """ - Copy old datasets to the new models. - """ - session = inspect(target).session - - # get DB-specific conditional quoter for expressions that point to columns or - # table names - database = ( - target.database - or session.query(Database).filter_by(id=target.database_id).first() - ) - if not database: - return - url = make_url(database.sqlalchemy_uri) - dialect_class = url.get_dialect() - conditional_quote = dialect_class().identifier_preparer.quote - - # create columns - columns = [] - for column in target.columns: - # ``is_active`` might be ``None`` at this point, but it defaults to ``True``. - if column.is_active is False: - continue - - try: - extra_json = json.loads(column.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"groupby", "filterable", "verbose_name", "python_date_format"}: - value = getattr(column, attr) - if value: - extra_json[attr] = value - - columns.append( - NewColumn( - name=column.column_name, - type=column.type or "Unknown", - expression=column.expression or conditional_quote(column.column_name), - description=column.description, - is_temporal=column.is_dttm, - is_aggregation=False, - is_physical=column.expression is None or column.expression == "", - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - - # create metrics - for metric in target.metrics: - try: - extra_json = json.loads(metric.extra or "{}") - except json.decoder.JSONDecodeError: - extra_json = {} - for attr in {"verbose_name", "metric_type", "d3format"}: - value = getattr(metric, attr) - if value: - extra_json[attr] = value - - is_additive = ( - metric.metric_type and metric.metric_type.lower() in ADDITIVE_METRIC_TYPES - ) - - columns.append( - NewColumn( - name=metric.metric_name, - type="Unknown", # figuring this out would require a type inferrer - expression=metric.expression, - warning_text=metric.warning_text, - description=metric.description, - is_aggregation=True, - is_additive=is_additive, - is_physical=False, - is_spatial=False, - is_partition=False, - is_increase_desired=True, - extra_json=json.dumps(extra_json) if extra_json else None, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ), - ) - - # physical dataset - if not target.sql: - physical_columns = [column for column in columns if column.is_physical] - - # create table - table = NewTable( - name=target.table_name, - schema=target.schema, - catalog=None, # currently not supported - database_id=target.database_id, - columns=physical_columns, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ) - tables = [table] - - # virtual dataset - else: - # mark all columns as virtual (not physical) - for column in columns: - column.is_physical = False - - # find referenced tables - referenced_tables = extract_table_references(target.sql, dialect_class.name) - tables = load_or_create_tables( - session, - target.database_id, - target.schema, - referenced_tables, - conditional_quote, - ) - - # create the new dataset - dataset = NewDataset( - sqlatable_id=target.id, - name=target.table_name, - expression=target.sql or conditional_quote(target.table_name), - tables=tables, - columns=columns, - is_physical=not target.sql, - is_managed_externally=target.is_managed_externally, - external_url=target.external_url, - ) - session.add(dataset) - def upgrade(): - # Create tables for the new models. - op.create_table( - "sl_columns", - # AuditMixinNullable - sa.Column("created_on", sa.DateTime(), nullable=True), - sa.Column("changed_on", sa.DateTime(), nullable=True), - sa.Column("created_by_fk", sa.Integer(), nullable=True), - sa.Column("changed_by_fk", sa.Integer(), nullable=True), - # ExtraJSONMixin - sa.Column("extra_json", sa.Text(), nullable=True), - # ImportExportMixin - sa.Column("uuid", UUIDType(binary=True), primary_key=False, default=uuid4), - # Column - sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False), - sa.Column("name", sa.TEXT(), nullable=False), - sa.Column("type", sa.TEXT(), nullable=False), - sa.Column("expression", sa.TEXT(), nullable=False), - sa.Column( - "is_physical", - sa.BOOLEAN(), - nullable=False, - default=True, - ), - sa.Column("description", sa.TEXT(), nullable=True), - sa.Column("warning_text", sa.TEXT(), nullable=True), - sa.Column("unit", sa.TEXT(), nullable=True), - sa.Column("is_temporal", sa.BOOLEAN(), nullable=False), - sa.Column( - "is_spatial", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_partition", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_aggregation", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_additive", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_increase_desired", - sa.BOOLEAN(), - nullable=False, - default=True, - ), - sa.Column( - "is_managed_externally", - sa.Boolean(), - nullable=False, - server_default=sa.false(), - ), - sa.Column("external_url", sa.Text(), nullable=True), - sa.PrimaryKeyConstraint("id"), - ) - with op.batch_alter_table("sl_columns") as batch_op: - batch_op.create_unique_constraint("uq_sl_columns_uuid", ["uuid"]) - - op.create_table( - "sl_tables", - # AuditMixinNullable - sa.Column("created_on", sa.DateTime(), nullable=True), - sa.Column("changed_on", sa.DateTime(), nullable=True), - sa.Column("created_by_fk", sa.Integer(), nullable=True), - sa.Column("changed_by_fk", sa.Integer(), nullable=True), - # ExtraJSONMixin - sa.Column("extra_json", sa.Text(), nullable=True), - # ImportExportMixin - sa.Column("uuid", UUIDType(binary=True), primary_key=False, default=uuid4), - # Table - sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False), - sa.Column("database_id", sa.INTEGER(), autoincrement=False, nullable=False), - sa.Column("catalog", sa.TEXT(), nullable=True), - sa.Column("schema", sa.TEXT(), nullable=True), - sa.Column("name", sa.TEXT(), nullable=False), - sa.Column( - "is_managed_externally", - sa.Boolean(), - nullable=False, - server_default=sa.false(), - ), - sa.Column("external_url", sa.Text(), nullable=True), - sa.ForeignKeyConstraint(["database_id"], ["dbs.id"], name="sl_tables_ibfk_1"), - sa.PrimaryKeyConstraint("id"), - ) - with op.batch_alter_table("sl_tables") as batch_op: - batch_op.create_unique_constraint("uq_sl_tables_uuid", ["uuid"]) - - op.create_table( - "sl_table_columns", - sa.Column("table_id", sa.INTEGER(), autoincrement=False, nullable=False), - sa.Column("column_id", sa.INTEGER(), autoincrement=False, nullable=False), - sa.ForeignKeyConstraint( - ["column_id"], ["sl_columns.id"], name="sl_table_columns_ibfk_2" - ), - sa.ForeignKeyConstraint( - ["table_id"], ["sl_tables.id"], name="sl_table_columns_ibfk_1" - ), - ) - - op.create_table( - "sl_datasets", - # AuditMixinNullable - sa.Column("created_on", sa.DateTime(), nullable=True), - sa.Column("changed_on", sa.DateTime(), nullable=True), - sa.Column("created_by_fk", sa.Integer(), nullable=True), - sa.Column("changed_by_fk", sa.Integer(), nullable=True), - # ExtraJSONMixin - sa.Column("extra_json", sa.Text(), nullable=True), - # ImportExportMixin - sa.Column("uuid", UUIDType(binary=True), primary_key=False, default=uuid4), - # Dataset - sa.Column("id", sa.INTEGER(), autoincrement=True, nullable=False), - sa.Column("sqlatable_id", sa.INTEGER(), nullable=True), - sa.Column("name", sa.TEXT(), nullable=False), - sa.Column("expression", sa.TEXT(), nullable=False), - sa.Column( - "is_physical", - sa.BOOLEAN(), - nullable=False, - default=False, - ), - sa.Column( - "is_managed_externally", - sa.Boolean(), - nullable=False, - server_default=sa.false(), - ), - sa.Column("external_url", sa.Text(), nullable=True), - sa.PrimaryKeyConstraint("id"), - ) - with op.batch_alter_table("sl_datasets") as batch_op: - batch_op.create_unique_constraint("uq_sl_datasets_uuid", ["uuid"]) - batch_op.create_unique_constraint( - "uq_sl_datasets_sqlatable_id", ["sqlatable_id"] - ) - - op.create_table( - "sl_dataset_columns", - sa.Column("dataset_id", sa.INTEGER(), autoincrement=False, nullable=False), - sa.Column("column_id", sa.INTEGER(), autoincrement=False, nullable=False), - sa.ForeignKeyConstraint( - ["column_id"], ["sl_columns.id"], name="sl_dataset_columns_ibfk_2" - ), - sa.ForeignKeyConstraint( - ["dataset_id"], ["sl_datasets.id"], name="sl_dataset_columns_ibfk_1" - ), - ) - - op.create_table( - "sl_dataset_tables", - sa.Column("dataset_id", sa.INTEGER(), autoincrement=False, nullable=False), - sa.Column("table_id", sa.INTEGER(), autoincrement=False, nullable=False), - sa.ForeignKeyConstraint( - ["dataset_id"], ["sl_datasets.id"], name="sl_dataset_tables_ibfk_1" - ), - sa.ForeignKeyConstraint( - ["table_id"], ["sl_tables.id"], name="sl_dataset_tables_ibfk_2" - ), - ) - - # migrate existing datasets to the new models - bind = op.get_bind() - session = db.Session(bind=bind) # pylint: disable=no-member - - datasets = session.query(SqlaTable).all() - for dataset in datasets: - dataset.fetch_columns_and_metrics(session) - after_insert(target=dataset) + pass def downgrade(): - op.drop_table("sl_dataset_columns") - op.drop_table("sl_dataset_tables") - op.drop_table("sl_datasets") - op.drop_table("sl_table_columns") - op.drop_table("sl_tables") - op.drop_table("sl_columns") + pass diff --git a/superset/tables/models.py b/superset/tables/models.py index e2489445c68..97009b8bd0c 100644 --- a/superset/tables/models.py +++ b/superset/tables/models.py @@ -28,11 +28,10 @@ from typing import List import sqlalchemy as sa from flask_appbuilder import Model -from sqlalchemy.orm import backref, relationship +from sqlalchemy.orm import relationship from sqlalchemy.schema import UniqueConstraint from superset.columns.models import Column -from superset.models.core import Database from superset.models.helpers import ( AuditMixinNullable, ExtraJSONMixin, @@ -62,14 +61,8 @@ class Table(Model, AuditMixinNullable, ExtraJSONMixin, ImportExportMixin): id = sa.Column(sa.Integer, primary_key=True) - database_id = sa.Column(sa.Integer, sa.ForeignKey("dbs.id"), nullable=False) - database: Database = relationship( - "Database", - # TODO (betodealmeida): rename the backref to ``tables`` once we get rid of the - # old models. - backref=backref("new_tables", cascade="all, delete-orphan"), - foreign_keys=[database_id], - ) + # TODO: add foreign key constraint when this is model actually in use + database_id = sa.Column(sa.Integer, nullable=False) # We use ``sa.Text`` for these attributes because (1) in modern databases the # performance is the same as ``VARCHAR``[1] and (2) because some table names can be diff --git a/tests/integration_tests/sqla_models_tests.py b/tests/integration_tests/sqla_models_tests.py index 5f3d3cfe815..8990243c6b4 100644 --- a/tests/integration_tests/sqla_models_tests.py +++ b/tests/integration_tests/sqla_models_tests.py @@ -37,6 +37,7 @@ from superset.db_engine_specs.druid import DruidEngineSpec from superset.exceptions import QueryObjectValidationError, SupersetSecurityException from superset.models.core import Database from superset.utils.core import ( + backend, AdhocMetricExpressionType, FilterOperator, GenericDataType, diff --git a/tests/unit_tests/datasets/commands/importers/v1/import_test.py b/tests/unit_tests/datasets/commands/importers/v1/import_test.py index 07ea8c49d04..c5e31196777 100644 --- a/tests/unit_tests/datasets/commands/importers/v1/import_test.py +++ b/tests/unit_tests/datasets/commands/importers/v1/import_test.py @@ -124,11 +124,11 @@ def test_import_dataset(app_context: None, session: Session) -> None: assert len(sqla_table.columns) == 1 assert sqla_table.columns[0].column_name == "profit" assert sqla_table.columns[0].verbose_name is None - assert sqla_table.columns[0].is_dttm is None - assert sqla_table.columns[0].is_active is None + assert sqla_table.columns[0].is_dttm is False + assert sqla_table.columns[0].is_active is True assert sqla_table.columns[0].type == "INTEGER" - assert sqla_table.columns[0].groupby is None - assert sqla_table.columns[0].filterable is None + assert sqla_table.columns[0].groupby is True + assert sqla_table.columns[0].filterable is True assert sqla_table.columns[0].expression == "revenue-expenses" assert sqla_table.columns[0].description is None assert sqla_table.columns[0].python_date_format is None diff --git a/tests/unit_tests/datasets/test_models.py b/tests/unit_tests/datasets/test_models.py index d21ef8ea60a..901ed33149e 100644 --- a/tests/unit_tests/datasets/test_models.py +++ b/tests/unit_tests/datasets/test_models.py @@ -18,12 +18,13 @@ # pylint: disable=import-outside-toplevel, unused-argument, unused-import, too-many-locals, invalid-name, too-many-lines import json -from datetime import datetime, timezone +import pytest from pytest_mock import MockFixture from sqlalchemy.orm.session import Session +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_dataset_model(app_context: None, session: Session) -> None: """ Test basic attributes of a ``Dataset``. @@ -82,6 +83,7 @@ FROM my_catalog.my_schema.my_table assert [column.name for column in dataset.columns] == ["position"] +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_cascade_delete_table(app_context: None, session: Session) -> None: """ Test that deleting ``Table`` also deletes its columns. @@ -117,6 +119,7 @@ def test_cascade_delete_table(app_context: None, session: Session) -> None: assert len(columns) == 0 +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_cascade_delete_dataset(app_context: None, session: Session) -> None: """ Test that deleting ``Dataset`` also deletes its columns. @@ -265,6 +268,7 @@ def test_dataset_attributes(app_context: None, session: Session) -> None: ] +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_create_physical_sqlatable(app_context: None, session: Session) -> None: """ Test shadow write when creating a new ``SqlaTable``. @@ -512,6 +516,7 @@ def test_create_physical_sqlatable(app_context: None, session: Session) -> None: ] +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_create_virtual_sqlatable( mocker: MockFixture, app_context: None, session: Session ) -> None: @@ -861,6 +866,7 @@ FROM ] +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_delete_sqlatable(app_context: None, session: Session) -> None: """ Test that deleting a ``SqlaTable`` also deletes the corresponding ``Dataset``. @@ -897,6 +903,7 @@ def test_delete_sqlatable(app_context: None, session: Session) -> None: assert len(datasets) == 0 +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_update_sqlatable( mocker: MockFixture, app_context: None, session: Session ) -> None: @@ -957,6 +964,7 @@ def test_update_sqlatable( assert dataset.columns[0].is_temporal is True +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_update_sqlatable_schema( mocker: MockFixture, app_context: None, session: Session ) -> None: @@ -1003,6 +1011,7 @@ def test_update_sqlatable_schema( assert new_dataset.tables[0].id == 2 +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_update_sqlatable_metric( mocker: MockFixture, app_context: None, session: Session ) -> None: @@ -1052,6 +1061,7 @@ def test_update_sqlatable_metric( assert column.expression == "MAX(ds)" +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_update_virtual_sqlatable_references( mocker: MockFixture, app_context: None, session: Session ) -> None: @@ -1121,6 +1131,7 @@ def test_update_virtual_sqlatable_references( assert new_dataset.expression == "SELECT a, b FROM table_a JOIN table_b" +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_quote_expressions(app_context: None, session: Session) -> None: """ Test that expressions are quoted appropriately in columns and datasets. @@ -1154,6 +1165,7 @@ def test_quote_expressions(app_context: None, session: Session) -> None: assert dataset.columns[1].expression == "no_need" +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_update_physical_sqlatable( mocker: MockFixture, app_context: None, session: Session ) -> None: @@ -1262,6 +1274,7 @@ def test_update_physical_sqlatable( assert dataset.tables[0].database_id == 1 +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_update_physical_sqlatable_no_dataset( mocker: MockFixture, app_context: None, session: Session ) -> None: diff --git a/tests/unit_tests/tables/test_models.py b/tests/unit_tests/tables/test_models.py index 56ca5ba82fb..8177fe6549d 100644 --- a/tests/unit_tests/tables/test_models.py +++ b/tests/unit_tests/tables/test_models.py @@ -17,9 +17,11 @@ # pylint: disable=import-outside-toplevel, unused-argument +import pytest from sqlalchemy.orm.session import Session +@pytest.mark.skip(reason="SIP-68 not ready yet") def test_table_model(app_context: None, session: Session) -> None: """ Test basic attributes of a ``Table``.