mirror of
https://github.com/apache/superset.git
synced 2026-04-20 16:44:46 +00:00
chore: remove shadow write of new sip 68 dataset models (#21986)
This commit is contained in:
committed by
GitHub
parent
824dc7188b
commit
86d52fcbc4
@@ -14,7 +14,7 @@
|
|||||||
# KIND, either express or implied. See the License for the
|
# KIND, either express or implied. See the License for the
|
||||||
# specific language governing permissions and limitations
|
# specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
# pylint: disable=too-many-lines, redefined-outer-name
|
# pylint: disable=too-many-lines
|
||||||
import dataclasses
|
import dataclasses
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
@@ -36,7 +36,6 @@ from typing import (
|
|||||||
Type,
|
Type,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
from uuid import uuid4
|
|
||||||
|
|
||||||
import dateutil.parser
|
import dateutil.parser
|
||||||
import numpy as np
|
import numpy as np
|
||||||
@@ -68,7 +67,6 @@ from sqlalchemy import (
|
|||||||
from sqlalchemy.engine.base import Connection
|
from sqlalchemy.engine.base import Connection
|
||||||
from sqlalchemy.ext.hybrid import hybrid_property
|
from sqlalchemy.ext.hybrid import hybrid_property
|
||||||
from sqlalchemy.orm import backref, Query, relationship, RelationshipProperty, Session
|
from sqlalchemy.orm import backref, Query, relationship, RelationshipProperty, Session
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
|
||||||
from sqlalchemy.orm.mapper import Mapper
|
from sqlalchemy.orm.mapper import Mapper
|
||||||
from sqlalchemy.schema import UniqueConstraint
|
from sqlalchemy.schema import UniqueConstraint
|
||||||
from sqlalchemy.sql import column, ColumnElement, literal_column, table
|
from sqlalchemy.sql import column, ColumnElement, literal_column, table
|
||||||
@@ -78,18 +76,15 @@ from sqlalchemy.sql.selectable import Alias, TableClause
|
|||||||
|
|
||||||
from superset import app, db, is_feature_enabled, security_manager
|
from superset import app, db, is_feature_enabled, security_manager
|
||||||
from superset.advanced_data_type.types import AdvancedDataTypeResponse
|
from superset.advanced_data_type.types import AdvancedDataTypeResponse
|
||||||
from superset.columns.models import Column as NewColumn, UNKOWN_TYPE
|
|
||||||
from superset.common.db_query_status import QueryStatus
|
from superset.common.db_query_status import QueryStatus
|
||||||
from superset.common.utils.time_range_utils import get_since_until_from_time_range
|
from superset.common.utils.time_range_utils import get_since_until_from_time_range
|
||||||
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
|
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
|
||||||
from superset.connectors.sqla.utils import (
|
from superset.connectors.sqla.utils import (
|
||||||
find_cached_objects_in_session,
|
|
||||||
get_columns_description,
|
get_columns_description,
|
||||||
get_physical_table_metadata,
|
get_physical_table_metadata,
|
||||||
get_virtual_table_metadata,
|
get_virtual_table_metadata,
|
||||||
validate_adhoc_subquery,
|
validate_adhoc_subquery,
|
||||||
)
|
)
|
||||||
from superset.datasets.models import Dataset as NewDataset
|
|
||||||
from superset.db_engine_specs.base import BaseEngineSpec, CTE_ALIAS, TimestampExpression
|
from superset.db_engine_specs.base import BaseEngineSpec, CTE_ALIAS, TimestampExpression
|
||||||
from superset.exceptions import (
|
from superset.exceptions import (
|
||||||
AdvancedDataTypeResponseError,
|
AdvancedDataTypeResponseError,
|
||||||
@@ -106,18 +101,8 @@ from superset.jinja_context import (
|
|||||||
)
|
)
|
||||||
from superset.models.annotations import Annotation
|
from superset.models.annotations import Annotation
|
||||||
from superset.models.core import Database
|
from superset.models.core import Database
|
||||||
from superset.models.helpers import (
|
from superset.models.helpers import AuditMixinNullable, CertificationMixin, QueryResult
|
||||||
AuditMixinNullable,
|
from superset.sql_parse import ParsedQuery, sanitize_clause
|
||||||
CertificationMixin,
|
|
||||||
clone_model,
|
|
||||||
QueryResult,
|
|
||||||
)
|
|
||||||
from superset.sql_parse import (
|
|
||||||
extract_table_references,
|
|
||||||
ParsedQuery,
|
|
||||||
sanitize_clause,
|
|
||||||
Table as TableName,
|
|
||||||
)
|
|
||||||
from superset.superset_typing import (
|
from superset.superset_typing import (
|
||||||
AdhocColumn,
|
AdhocColumn,
|
||||||
AdhocMetric,
|
AdhocMetric,
|
||||||
@@ -126,7 +111,6 @@ from superset.superset_typing import (
|
|||||||
OrderBy,
|
OrderBy,
|
||||||
QueryObjectDict,
|
QueryObjectDict,
|
||||||
)
|
)
|
||||||
from superset.tables.models import Table as NewTable
|
|
||||||
from superset.utils import core as utils
|
from superset.utils import core as utils
|
||||||
from superset.utils.core import (
|
from superset.utils.core import (
|
||||||
GenericDataType,
|
GenericDataType,
|
||||||
@@ -439,76 +423,6 @@ class TableColumn(Model, BaseColumn, CertificationMixin):
|
|||||||
|
|
||||||
return attr_dict
|
return attr_dict
|
||||||
|
|
||||||
def to_sl_column(
|
|
||||||
self, known_columns: Optional[Dict[str, NewColumn]] = None
|
|
||||||
) -> NewColumn:
|
|
||||||
"""Convert a TableColumn to NewColumn"""
|
|
||||||
session: Session = inspect(self).session
|
|
||||||
column = known_columns.get(self.uuid) if known_columns else None
|
|
||||||
if not column:
|
|
||||||
column = NewColumn()
|
|
||||||
|
|
||||||
extra_json = self.get_extra_dict()
|
|
||||||
for attr in {
|
|
||||||
"verbose_name",
|
|
||||||
"python_date_format",
|
|
||||||
}:
|
|
||||||
value = getattr(self, attr)
|
|
||||||
if value:
|
|
||||||
extra_json[attr] = value
|
|
||||||
|
|
||||||
# column id is primary key, so make sure that we check uuid against
|
|
||||||
# the id as well
|
|
||||||
if not column.id:
|
|
||||||
with session.no_autoflush:
|
|
||||||
saved_column: NewColumn = (
|
|
||||||
session.query(NewColumn).filter_by(uuid=self.uuid).one_or_none()
|
|
||||||
)
|
|
||||||
if saved_column is not None:
|
|
||||||
logger.warning(
|
|
||||||
"sl_column already exists. Using this row for db update %s",
|
|
||||||
self,
|
|
||||||
)
|
|
||||||
|
|
||||||
# overwrite the existing column instead of creating a new one
|
|
||||||
column = saved_column
|
|
||||||
|
|
||||||
column.uuid = self.uuid
|
|
||||||
column.created_on = self.created_on
|
|
||||||
column.changed_on = self.changed_on
|
|
||||||
column.created_by = self.created_by
|
|
||||||
column.changed_by = self.changed_by
|
|
||||||
column.name = self.column_name
|
|
||||||
column.type = self.type or UNKOWN_TYPE
|
|
||||||
column.expression = self.expression or self.table.quote_identifier(
|
|
||||||
self.column_name
|
|
||||||
)
|
|
||||||
column.description = self.description
|
|
||||||
column.is_aggregation = False
|
|
||||||
column.is_dimensional = self.groupby
|
|
||||||
column.is_filterable = self.filterable
|
|
||||||
column.is_increase_desired = True
|
|
||||||
column.is_managed_externally = self.table.is_managed_externally
|
|
||||||
column.is_partition = False
|
|
||||||
column.is_physical = not self.expression
|
|
||||||
column.is_spatial = False
|
|
||||||
column.is_temporal = self.is_dttm
|
|
||||||
column.extra_json = json.dumps(extra_json) if extra_json else None
|
|
||||||
column.external_url = self.table.external_url
|
|
||||||
|
|
||||||
return column
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def after_delete( # pylint: disable=unused-argument
|
|
||||||
mapper: Mapper,
|
|
||||||
connection: Connection,
|
|
||||||
target: "TableColumn",
|
|
||||||
) -> None:
|
|
||||||
session = inspect(target).session
|
|
||||||
column = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
|
|
||||||
if column:
|
|
||||||
session.delete(column)
|
|
||||||
|
|
||||||
|
|
||||||
class SqlMetric(Model, BaseMetric, CertificationMixin):
|
class SqlMetric(Model, BaseMetric, CertificationMixin):
|
||||||
|
|
||||||
@@ -574,76 +488,6 @@ class SqlMetric(Model, BaseMetric, CertificationMixin):
|
|||||||
attr_dict.update(super().data)
|
attr_dict.update(super().data)
|
||||||
return attr_dict
|
return attr_dict
|
||||||
|
|
||||||
def to_sl_column(
|
|
||||||
self, known_columns: Optional[Dict[str, NewColumn]] = None
|
|
||||||
) -> NewColumn:
|
|
||||||
"""Convert a SqlMetric to NewColumn. Find and update existing or
|
|
||||||
create a new one."""
|
|
||||||
session: Session = inspect(self).session
|
|
||||||
column = known_columns.get(self.uuid) if known_columns else None
|
|
||||||
if not column:
|
|
||||||
column = NewColumn()
|
|
||||||
|
|
||||||
extra_json = self.get_extra_dict()
|
|
||||||
for attr in {"verbose_name", "metric_type", "d3format"}:
|
|
||||||
value = getattr(self, attr)
|
|
||||||
if value is not None:
|
|
||||||
extra_json[attr] = value
|
|
||||||
is_additive = (
|
|
||||||
self.metric_type and self.metric_type.lower() in ADDITIVE_METRIC_TYPES_LOWER
|
|
||||||
)
|
|
||||||
|
|
||||||
# column id is primary key, so make sure that we check uuid against
|
|
||||||
# the id as well
|
|
||||||
if not column.id:
|
|
||||||
with session.no_autoflush:
|
|
||||||
saved_column: NewColumn = (
|
|
||||||
session.query(NewColumn).filter_by(uuid=self.uuid).one_or_none()
|
|
||||||
)
|
|
||||||
|
|
||||||
if saved_column is not None:
|
|
||||||
logger.warning(
|
|
||||||
"sl_column already exists. Using this row for db update %s",
|
|
||||||
self,
|
|
||||||
)
|
|
||||||
|
|
||||||
# overwrite the existing column instead of creating a new one
|
|
||||||
column = saved_column
|
|
||||||
|
|
||||||
column.uuid = self.uuid
|
|
||||||
column.name = self.metric_name
|
|
||||||
column.created_on = self.created_on
|
|
||||||
column.changed_on = self.changed_on
|
|
||||||
column.created_by = self.created_by
|
|
||||||
column.changed_by = self.changed_by
|
|
||||||
column.type = UNKOWN_TYPE
|
|
||||||
column.expression = self.expression
|
|
||||||
column.warning_text = self.warning_text
|
|
||||||
column.description = self.description
|
|
||||||
column.is_aggregation = True
|
|
||||||
column.is_additive = is_additive
|
|
||||||
column.is_filterable = False
|
|
||||||
column.is_increase_desired = True
|
|
||||||
column.is_managed_externally = self.table.is_managed_externally
|
|
||||||
column.is_partition = False
|
|
||||||
column.is_physical = False
|
|
||||||
column.is_spatial = False
|
|
||||||
column.extra_json = json.dumps(extra_json) if extra_json else None
|
|
||||||
column.external_url = self.table.external_url
|
|
||||||
|
|
||||||
return column
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def after_delete( # pylint: disable=unused-argument
|
|
||||||
mapper: Mapper,
|
|
||||||
connection: Connection,
|
|
||||||
target: "SqlMetric",
|
|
||||||
) -> None:
|
|
||||||
session = inspect(target).session
|
|
||||||
column = session.query(NewColumn).filter_by(uuid=target.uuid).one_or_none()
|
|
||||||
if column:
|
|
||||||
session.delete(column)
|
|
||||||
|
|
||||||
|
|
||||||
sqlatable_user = Table(
|
sqlatable_user = Table(
|
||||||
"sqlatable_user",
|
"sqlatable_user",
|
||||||
@@ -2228,40 +2072,6 @@ class SqlaTable(Model, BaseDatasource): # pylint: disable=too-many-public-metho
|
|||||||
):
|
):
|
||||||
raise Exception(get_dataset_exist_error_msg(target.full_name))
|
raise Exception(get_dataset_exist_error_msg(target.full_name))
|
||||||
|
|
||||||
def get_sl_columns(self) -> List[NewColumn]:
|
|
||||||
"""
|
|
||||||
Convert `SqlaTable.columns` and `SqlaTable.metrics` to the new Column model
|
|
||||||
"""
|
|
||||||
session: Session = inspect(self).session
|
|
||||||
|
|
||||||
uuids = set()
|
|
||||||
for column_or_metric in self.columns + self.metrics:
|
|
||||||
# pre-assign uuid after new columns or metrics are inserted so
|
|
||||||
# the related `NewColumn` can have a deterministic uuid, too
|
|
||||||
if not column_or_metric.uuid:
|
|
||||||
column_or_metric.uuid = uuid4()
|
|
||||||
else:
|
|
||||||
uuids.add(column_or_metric.uuid)
|
|
||||||
|
|
||||||
# load existing columns from cached session states first
|
|
||||||
existing_columns = set(
|
|
||||||
find_cached_objects_in_session(session, NewColumn, uuids=uuids)
|
|
||||||
)
|
|
||||||
for column in existing_columns:
|
|
||||||
uuids.remove(column.uuid)
|
|
||||||
|
|
||||||
if uuids:
|
|
||||||
with session.no_autoflush:
|
|
||||||
# load those not found from db
|
|
||||||
existing_columns |= set(
|
|
||||||
session.query(NewColumn).filter(NewColumn.uuid.in_(uuids))
|
|
||||||
)
|
|
||||||
|
|
||||||
known_columns = {column.uuid: column for column in existing_columns}
|
|
||||||
return [
|
|
||||||
item.to_sl_column(known_columns) for item in self.columns + self.metrics
|
|
||||||
]
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def update_column( # pylint: disable=unused-argument
|
def update_column( # pylint: disable=unused-argument
|
||||||
mapper: Mapper, connection: Connection, target: Union[SqlMetric, TableColumn]
|
mapper: Mapper, connection: Connection, target: Union[SqlMetric, TableColumn]
|
||||||
@@ -2278,46 +2088,6 @@ class SqlaTable(Model, BaseDatasource): # pylint: disable=too-many-public-metho
|
|||||||
# table is updated. This busts the cache key for all charts that use the table.
|
# table is updated. This busts the cache key for all charts that use the table.
|
||||||
session.execute(update(SqlaTable).where(SqlaTable.id == target.table.id))
|
session.execute(update(SqlaTable).where(SqlaTable.id == target.table.id))
|
||||||
|
|
||||||
# if table itself has changed, shadow-writing will happen in `after_update` anyway
|
|
||||||
if target.table not in session.dirty:
|
|
||||||
dataset: NewDataset = (
|
|
||||||
session.query(NewDataset)
|
|
||||||
.filter_by(uuid=target.table.uuid)
|
|
||||||
.one_or_none()
|
|
||||||
)
|
|
||||||
# Update shadow dataset and columns
|
|
||||||
# did we find the dataset?
|
|
||||||
if not dataset:
|
|
||||||
# if dataset is not found create a new copy
|
|
||||||
target.table.write_shadow_dataset()
|
|
||||||
return
|
|
||||||
|
|
||||||
# update changed_on timestamp
|
|
||||||
session.execute(update(NewDataset).where(NewDataset.id == dataset.id))
|
|
||||||
try:
|
|
||||||
with session.no_autoflush:
|
|
||||||
column = session.query(NewColumn).filter_by(uuid=target.uuid).one()
|
|
||||||
# update `Column` model as well
|
|
||||||
session.merge(target.to_sl_column({target.uuid: column}))
|
|
||||||
except NoResultFound:
|
|
||||||
logger.warning("No column was found for %s", target)
|
|
||||||
# see if the column is in cache
|
|
||||||
column = next(
|
|
||||||
find_cached_objects_in_session(
|
|
||||||
session, NewColumn, uuids=[target.uuid]
|
|
||||||
),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
if column:
|
|
||||||
logger.warning("New column was found in cache: %s", column)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# to be safe, use a different uuid and create a new column
|
|
||||||
uuid = uuid4()
|
|
||||||
target.uuid = uuid
|
|
||||||
|
|
||||||
session.add(target.to_sl_column())
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def after_insert(
|
def after_insert(
|
||||||
mapper: Mapper,
|
mapper: Mapper,
|
||||||
@@ -2325,19 +2095,9 @@ class SqlaTable(Model, BaseDatasource): # pylint: disable=too-many-public-metho
|
|||||||
sqla_table: "SqlaTable",
|
sqla_table: "SqlaTable",
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Shadow write the dataset to new models.
|
Update dataset permissions after insert
|
||||||
|
|
||||||
The ``SqlaTable`` model is currently being migrated to two new models, ``Table``
|
|
||||||
and ``Dataset``. In the first phase of the migration the new models are populated
|
|
||||||
whenever ``SqlaTable`` is modified (created, updated, or deleted).
|
|
||||||
|
|
||||||
In the second phase of the migration reads will be done from the new models.
|
|
||||||
Finally, in the third phase of the migration the old models will be removed.
|
|
||||||
|
|
||||||
For more context: https://github.com/apache/superset/issues/14909
|
|
||||||
"""
|
"""
|
||||||
security_manager.dataset_after_insert(mapper, connection, sqla_table)
|
security_manager.dataset_after_insert(mapper, connection, sqla_table)
|
||||||
sqla_table.write_shadow_dataset()
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def after_delete(
|
def after_delete(
|
||||||
@@ -2346,24 +2106,9 @@ class SqlaTable(Model, BaseDatasource): # pylint: disable=too-many-public-metho
|
|||||||
sqla_table: "SqlaTable",
|
sqla_table: "SqlaTable",
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Shadow write the dataset to new models.
|
Update dataset permissions after delete
|
||||||
|
|
||||||
The ``SqlaTable`` model is currently being migrated to two new models, ``Table``
|
|
||||||
and ``Dataset``. In the first phase of the migration the new models are populated
|
|
||||||
whenever ``SqlaTable`` is modified (created, updated, or deleted).
|
|
||||||
|
|
||||||
In the second phase of the migration reads will be done from the new models.
|
|
||||||
Finally, in the third phase of the migration the old models will be removed.
|
|
||||||
|
|
||||||
For more context: https://github.com/apache/superset/issues/14909
|
|
||||||
"""
|
"""
|
||||||
security_manager.dataset_after_delete(mapper, connection, sqla_table)
|
security_manager.dataset_after_delete(mapper, connection, sqla_table)
|
||||||
session = inspect(sqla_table).session
|
|
||||||
dataset = (
|
|
||||||
session.query(NewDataset).filter_by(uuid=sqla_table.uuid).one_or_none()
|
|
||||||
)
|
|
||||||
if dataset:
|
|
||||||
session.delete(dataset)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def after_update(
|
def after_update(
|
||||||
@@ -2372,240 +2117,18 @@ class SqlaTable(Model, BaseDatasource): # pylint: disable=too-many-public-metho
|
|||||||
sqla_table: "SqlaTable",
|
sqla_table: "SqlaTable",
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Shadow write the dataset to new models.
|
Update dataset permissions after update
|
||||||
|
|
||||||
The ``SqlaTable`` model is currently being migrated to two new models, ``Table``
|
|
||||||
and ``Dataset``. In the first phase of the migration the new models are populated
|
|
||||||
whenever ``SqlaTable`` is modified (created, updated, or deleted).
|
|
||||||
|
|
||||||
In the second phase of the migration reads will be done from the new models.
|
|
||||||
Finally, in the third phase of the migration the old models will be removed.
|
|
||||||
|
|
||||||
For more context: https://github.com/apache/superset/issues/14909
|
|
||||||
"""
|
"""
|
||||||
# set permissions
|
# set permissions
|
||||||
security_manager.dataset_after_update(mapper, connection, sqla_table)
|
security_manager.dataset_after_update(mapper, connection, sqla_table)
|
||||||
|
|
||||||
inspector = inspect(sqla_table)
|
|
||||||
session = inspector.session
|
|
||||||
|
|
||||||
# double-check that ``UPDATE``s are actually pending (this method is called even
|
|
||||||
# for instances that have no net changes to their column-based attributes)
|
|
||||||
if not session.is_modified(sqla_table, include_collections=True):
|
|
||||||
return
|
|
||||||
|
|
||||||
# find the dataset from the known instance list first
|
|
||||||
# (it could be either from a previous query or newly created)
|
|
||||||
dataset = next(
|
|
||||||
find_cached_objects_in_session(
|
|
||||||
session, NewDataset, uuids=[sqla_table.uuid]
|
|
||||||
),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
# if not found, pull from database
|
|
||||||
if not dataset:
|
|
||||||
dataset = (
|
|
||||||
session.query(NewDataset).filter_by(uuid=sqla_table.uuid).one_or_none()
|
|
||||||
)
|
|
||||||
if not dataset:
|
|
||||||
sqla_table.write_shadow_dataset()
|
|
||||||
return
|
|
||||||
|
|
||||||
# sync column list and delete removed columns
|
|
||||||
if (
|
|
||||||
inspector.attrs.columns.history.has_changes()
|
|
||||||
or inspector.attrs.metrics.history.has_changes()
|
|
||||||
):
|
|
||||||
# add pending new columns to known columns list, too, so if calling
|
|
||||||
# `after_update` twice before changes are persisted will not create
|
|
||||||
# two duplicate columns with the same uuids.
|
|
||||||
dataset.columns = sqla_table.get_sl_columns()
|
|
||||||
|
|
||||||
# physical dataset
|
|
||||||
if not sqla_table.sql:
|
|
||||||
# if the table name changed we should relink the dataset to another table
|
|
||||||
# (and create one if necessary)
|
|
||||||
if (
|
|
||||||
inspector.attrs.table_name.history.has_changes()
|
|
||||||
or inspector.attrs.schema.history.has_changes()
|
|
||||||
or inspector.attrs.database.history.has_changes()
|
|
||||||
):
|
|
||||||
tables = NewTable.bulk_load_or_create(
|
|
||||||
sqla_table.database,
|
|
||||||
[TableName(schema=sqla_table.schema, table=sqla_table.table_name)],
|
|
||||||
sync_columns=False,
|
|
||||||
default_props=dict(
|
|
||||||
changed_by=sqla_table.changed_by,
|
|
||||||
created_by=sqla_table.created_by,
|
|
||||||
is_managed_externally=sqla_table.is_managed_externally,
|
|
||||||
external_url=sqla_table.external_url,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
if not tables[0].id:
|
|
||||||
# dataset columns will only be assigned to newly created tables
|
|
||||||
# existing tables should manage column syncing in another process
|
|
||||||
physical_columns = [
|
|
||||||
clone_model(
|
|
||||||
column, ignore=["uuid"], keep_relations=["changed_by"]
|
|
||||||
)
|
|
||||||
for column in dataset.columns
|
|
||||||
if column.is_physical
|
|
||||||
]
|
|
||||||
tables[0].columns = physical_columns
|
|
||||||
dataset.tables = tables
|
|
||||||
|
|
||||||
# virtual dataset
|
|
||||||
else:
|
|
||||||
# mark all columns as virtual (not physical)
|
|
||||||
for column in dataset.columns:
|
|
||||||
column.is_physical = False
|
|
||||||
|
|
||||||
# update referenced tables if SQL changed
|
|
||||||
if sqla_table.sql and inspector.attrs.sql.history.has_changes():
|
|
||||||
referenced_tables = extract_table_references(
|
|
||||||
sqla_table.sql, sqla_table.database.get_dialect().name
|
|
||||||
)
|
|
||||||
dataset.tables = NewTable.bulk_load_or_create(
|
|
||||||
sqla_table.database,
|
|
||||||
referenced_tables,
|
|
||||||
default_schema=sqla_table.schema,
|
|
||||||
# sync metadata is expensive, we'll do it in another process
|
|
||||||
# e.g. when users open a Table page
|
|
||||||
sync_columns=False,
|
|
||||||
default_props=dict(
|
|
||||||
changed_by=sqla_table.changed_by,
|
|
||||||
created_by=sqla_table.created_by,
|
|
||||||
is_managed_externally=sqla_table.is_managed_externally,
|
|
||||||
external_url=sqla_table.external_url,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# update other attributes
|
|
||||||
dataset.name = sqla_table.table_name
|
|
||||||
dataset.expression = sqla_table.sql or sqla_table.quote_identifier(
|
|
||||||
sqla_table.table_name
|
|
||||||
)
|
|
||||||
dataset.is_physical = not sqla_table.sql
|
|
||||||
|
|
||||||
def write_shadow_dataset(
|
|
||||||
self: "SqlaTable",
|
|
||||||
) -> None:
|
|
||||||
"""
|
|
||||||
Shadow write the dataset to new models.
|
|
||||||
|
|
||||||
The ``SqlaTable`` model is currently being migrated to two new models, ``Table``
|
|
||||||
and ``Dataset``. In the first phase of the migration the new models are populated
|
|
||||||
whenever ``SqlaTable`` is modified (created, updated, or deleted).
|
|
||||||
|
|
||||||
In the second phase of the migration reads will be done from the new models.
|
|
||||||
Finally, in the third phase of the migration the old models will be removed.
|
|
||||||
|
|
||||||
For more context: https://github.com/apache/superset/issues/14909
|
|
||||||
"""
|
|
||||||
session = inspect(self).session
|
|
||||||
# make sure database points to the right instance, in case only
|
|
||||||
# `table.database_id` is updated and the changes haven't been
|
|
||||||
# consolidated by SQLA
|
|
||||||
if self.database_id and (
|
|
||||||
not self.database or self.database.id != self.database_id
|
|
||||||
):
|
|
||||||
self.database = session.query(Database).filter_by(id=self.database_id).one()
|
|
||||||
|
|
||||||
# create columns
|
|
||||||
columns = []
|
|
||||||
for item in self.columns + self.metrics:
|
|
||||||
item.created_by = self.created_by
|
|
||||||
item.changed_by = self.changed_by
|
|
||||||
# on `SqlaTable.after_insert`` event, although the table itself
|
|
||||||
# already has a `uuid`, the associated columns will not.
|
|
||||||
# Here we pre-assign a uuid so they can still be matched to the new
|
|
||||||
# Column after creation.
|
|
||||||
if not item.uuid:
|
|
||||||
item.uuid = uuid4()
|
|
||||||
columns.append(item.to_sl_column())
|
|
||||||
|
|
||||||
# physical dataset
|
|
||||||
if not self.sql:
|
|
||||||
# always create separate column entries for Dataset and Table
|
|
||||||
# so updating a dataset would not update columns in the related table
|
|
||||||
physical_columns = [
|
|
||||||
clone_model(
|
|
||||||
column,
|
|
||||||
ignore=["uuid"],
|
|
||||||
# `created_by` will always be left empty because it'd always
|
|
||||||
# be created via some sort of automated system.
|
|
||||||
# But keep `changed_by` in case someone manually changes
|
|
||||||
# column attributes such as `is_dttm`.
|
|
||||||
keep_relations=["changed_by"],
|
|
||||||
)
|
|
||||||
for column in columns
|
|
||||||
if column.is_physical
|
|
||||||
]
|
|
||||||
tables = NewTable.bulk_load_or_create(
|
|
||||||
self.database,
|
|
||||||
[TableName(schema=self.schema, table=self.table_name)],
|
|
||||||
sync_columns=False,
|
|
||||||
default_props=dict(
|
|
||||||
created_by=self.created_by,
|
|
||||||
changed_by=self.changed_by,
|
|
||||||
is_managed_externally=self.is_managed_externally,
|
|
||||||
external_url=self.external_url,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
tables[0].columns = physical_columns
|
|
||||||
|
|
||||||
# virtual dataset
|
|
||||||
else:
|
|
||||||
# mark all columns as virtual (not physical)
|
|
||||||
for column in columns:
|
|
||||||
column.is_physical = False
|
|
||||||
|
|
||||||
# find referenced tables
|
|
||||||
referenced_tables = extract_table_references(
|
|
||||||
self.sql, self.database.get_dialect().name
|
|
||||||
)
|
|
||||||
tables = NewTable.bulk_load_or_create(
|
|
||||||
self.database,
|
|
||||||
referenced_tables,
|
|
||||||
default_schema=self.schema,
|
|
||||||
# syncing table columns can be slow so we are not doing it here
|
|
||||||
sync_columns=False,
|
|
||||||
default_props=dict(
|
|
||||||
created_by=self.created_by,
|
|
||||||
changed_by=self.changed_by,
|
|
||||||
is_managed_externally=self.is_managed_externally,
|
|
||||||
external_url=self.external_url,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# create the new dataset
|
|
||||||
new_dataset = NewDataset(
|
|
||||||
uuid=self.uuid,
|
|
||||||
database_id=self.database_id,
|
|
||||||
created_on=self.created_on,
|
|
||||||
created_by=self.created_by,
|
|
||||||
changed_by=self.changed_by,
|
|
||||||
changed_on=self.changed_on,
|
|
||||||
owners=self.owners,
|
|
||||||
name=self.table_name,
|
|
||||||
expression=self.sql or self.quote_identifier(self.table_name),
|
|
||||||
tables=tables,
|
|
||||||
columns=columns,
|
|
||||||
is_physical=not self.sql,
|
|
||||||
is_managed_externally=self.is_managed_externally,
|
|
||||||
external_url=self.external_url,
|
|
||||||
)
|
|
||||||
session.add(new_dataset)
|
|
||||||
|
|
||||||
|
|
||||||
sa.event.listen(SqlaTable, "before_update", SqlaTable.before_update)
|
sa.event.listen(SqlaTable, "before_update", SqlaTable.before_update)
|
||||||
|
sa.event.listen(SqlaTable, "after_update", SqlaTable.after_update)
|
||||||
sa.event.listen(SqlaTable, "after_insert", SqlaTable.after_insert)
|
sa.event.listen(SqlaTable, "after_insert", SqlaTable.after_insert)
|
||||||
sa.event.listen(SqlaTable, "after_delete", SqlaTable.after_delete)
|
sa.event.listen(SqlaTable, "after_delete", SqlaTable.after_delete)
|
||||||
sa.event.listen(SqlaTable, "after_update", SqlaTable.after_update)
|
|
||||||
sa.event.listen(SqlMetric, "after_update", SqlaTable.update_column)
|
sa.event.listen(SqlMetric, "after_update", SqlaTable.update_column)
|
||||||
sa.event.listen(SqlMetric, "after_delete", SqlMetric.after_delete)
|
|
||||||
sa.event.listen(TableColumn, "after_update", SqlaTable.update_column)
|
sa.event.listen(TableColumn, "after_update", SqlaTable.update_column)
|
||||||
sa.event.listen(TableColumn, "after_delete", TableColumn.after_delete)
|
|
||||||
|
|
||||||
RLSFilterRoles = Table(
|
RLSFilterRoles = Table(
|
||||||
"rls_filter_roles",
|
"rls_filter_roles",
|
||||||
|
|||||||
@@ -15,9 +15,11 @@
|
|||||||
# specific language governing permissions and limitations
|
# specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
import logging
|
import logging
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional, Union
|
||||||
|
|
||||||
|
from flask_appbuilder.models.sqla.interface import SQLAInterface
|
||||||
from sqlalchemy.exc import SQLAlchemyError
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
|
from sqlalchemy.orm import joinedload
|
||||||
|
|
||||||
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
|
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
|
||||||
from superset.dao.base import BaseDAO
|
from superset.dao.base import BaseDAO
|
||||||
@@ -35,6 +37,26 @@ class DatasetDAO(BaseDAO): # pylint: disable=too-many-public-methods
|
|||||||
model_cls = SqlaTable
|
model_cls = SqlaTable
|
||||||
base_filter = DatasourceFilter
|
base_filter = DatasourceFilter
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def find_by_ids(cls, model_ids: Union[List[str], List[int]]) -> List[SqlaTable]:
|
||||||
|
"""
|
||||||
|
Find a List of models by a list of ids, if defined applies `base_filter`
|
||||||
|
"""
|
||||||
|
id_col = getattr(SqlaTable, cls.id_column_name, None)
|
||||||
|
if id_col is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# the joinedload option ensures that the database is
|
||||||
|
# available in the session later and not lazy loaded
|
||||||
|
query = (
|
||||||
|
db.session.query(SqlaTable)
|
||||||
|
.options(joinedload(SqlaTable.database))
|
||||||
|
.filter(id_col.in_(model_ids))
|
||||||
|
)
|
||||||
|
data_model = SQLAInterface(SqlaTable, db.session)
|
||||||
|
query = DatasourceFilter(cls.id_column_name, data_model).apply(query, None)
|
||||||
|
return query.all()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_database_by_id(database_id: int) -> Optional[Database]:
|
def get_database_by_id(database_id: int) -> Optional[Database]:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ from zipfile import is_zipfile, ZipFile
|
|||||||
import prison
|
import prison
|
||||||
import pytest
|
import pytest
|
||||||
import yaml
|
import yaml
|
||||||
|
from sqlalchemy.orm import joinedload
|
||||||
from sqlalchemy.sql import func
|
from sqlalchemy.sql import func
|
||||||
|
|
||||||
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
|
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
|
||||||
@@ -95,6 +96,7 @@ class TestDatasetApi(SupersetTestCase):
|
|||||||
def get_fixture_datasets(self) -> List[SqlaTable]:
|
def get_fixture_datasets(self) -> List[SqlaTable]:
|
||||||
return (
|
return (
|
||||||
db.session.query(SqlaTable)
|
db.session.query(SqlaTable)
|
||||||
|
.options(joinedload(SqlaTable.database))
|
||||||
.filter(SqlaTable.table_name.in_(self.fixture_tables_names))
|
.filter(SqlaTable.table_name.in_(self.fixture_tables_names))
|
||||||
.all()
|
.all()
|
||||||
)
|
)
|
||||||
@@ -1973,21 +1975,17 @@ class TestDatasetApi(SupersetTestCase):
|
|||||||
database = (
|
database = (
|
||||||
db.session.query(Database).filter_by(uuid=database_config["uuid"]).one()
|
db.session.query(Database).filter_by(uuid=database_config["uuid"]).one()
|
||||||
)
|
)
|
||||||
shadow_dataset = (
|
|
||||||
db.session.query(Dataset).filter_by(uuid=dataset_config["uuid"]).one()
|
|
||||||
)
|
|
||||||
assert database.database_name == "imported_database"
|
assert database.database_name == "imported_database"
|
||||||
|
|
||||||
assert len(database.tables) == 1
|
assert len(database.tables) == 1
|
||||||
dataset = database.tables[0]
|
dataset = database.tables[0]
|
||||||
assert dataset.table_name == "imported_dataset"
|
assert dataset.table_name == "imported_dataset"
|
||||||
assert str(dataset.uuid) == dataset_config["uuid"]
|
assert str(dataset.uuid) == dataset_config["uuid"]
|
||||||
assert str(shadow_dataset.uuid) == dataset_config["uuid"]
|
|
||||||
|
|
||||||
dataset.owners = []
|
dataset.owners = []
|
||||||
database.owners = []
|
database.owners = []
|
||||||
db.session.delete(dataset)
|
db.session.delete(dataset)
|
||||||
db.session.delete(shadow_dataset)
|
|
||||||
db.session.delete(database)
|
db.session.delete(database)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|||||||
@@ -1,87 +0,0 @@
|
|||||||
# Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
# or more contributor license agreements. See the NOTICE file
|
|
||||||
# distributed with this work for additional information
|
|
||||||
# regarding copyright ownership. The ASF licenses this file
|
|
||||||
# to you under the Apache License, Version 2.0 (the
|
|
||||||
# "License"); you may not use this file except in compliance
|
|
||||||
# with the License. You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing,
|
|
||||||
# software distributed under the License is distributed on an
|
|
||||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
# KIND, either express or implied. See the License for the
|
|
||||||
# specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from sqlalchemy import inspect
|
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
|
||||||
|
|
||||||
from superset.columns.models import Column
|
|
||||||
from superset.connectors.sqla.models import SqlaTable, TableColumn
|
|
||||||
from superset.extensions import db
|
|
||||||
from tests.integration_tests.base_tests import SupersetTestCase
|
|
||||||
from tests.integration_tests.fixtures.datasource import load_dataset_with_columns
|
|
||||||
|
|
||||||
|
|
||||||
class SqlaTableModelTest(SupersetTestCase):
|
|
||||||
@pytest.mark.usefixtures("load_dataset_with_columns")
|
|
||||||
def test_dual_update_column(self) -> None:
|
|
||||||
"""
|
|
||||||
Test that when updating a sqla ``TableColumn``
|
|
||||||
That the shadow ``Column`` is also updated
|
|
||||||
"""
|
|
||||||
dataset = db.session.query(SqlaTable).filter_by(table_name="students").first()
|
|
||||||
column = dataset.columns[0]
|
|
||||||
column_name = column.column_name
|
|
||||||
column.column_name = "new_column_name"
|
|
||||||
SqlaTable.update_column(None, None, target=column)
|
|
||||||
|
|
||||||
# refetch
|
|
||||||
dataset = db.session.query(SqlaTable).filter_by(id=dataset.id).one()
|
|
||||||
assert dataset.columns[0].column_name == "new_column_name"
|
|
||||||
|
|
||||||
# reset
|
|
||||||
column.column_name = column_name
|
|
||||||
SqlaTable.update_column(None, None, target=column)
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("load_dataset_with_columns")
|
|
||||||
@mock.patch("superset.columns.models.Column")
|
|
||||||
def test_dual_update_column_not_found(self, column_mock) -> None:
|
|
||||||
"""
|
|
||||||
Test that when updating a sqla ``TableColumn``
|
|
||||||
That the shadow ``Column`` is also updated
|
|
||||||
"""
|
|
||||||
dataset = db.session.query(SqlaTable).filter_by(table_name="students").first()
|
|
||||||
column = dataset.columns[0]
|
|
||||||
column_uuid = column.uuid
|
|
||||||
with mock.patch("sqlalchemy.orm.query.Query.one", side_effect=NoResultFound):
|
|
||||||
SqlaTable.update_column(None, None, target=column)
|
|
||||||
|
|
||||||
session = inspect(column).session
|
|
||||||
|
|
||||||
session.flush()
|
|
||||||
|
|
||||||
# refetch
|
|
||||||
dataset = db.session.query(SqlaTable).filter_by(id=dataset.id).one()
|
|
||||||
# it should create a new uuid
|
|
||||||
assert dataset.columns[0].uuid != column_uuid
|
|
||||||
|
|
||||||
# reset
|
|
||||||
column.uuid = column_uuid
|
|
||||||
SqlaTable.update_column(None, None, target=column)
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("load_dataset_with_columns")
|
|
||||||
def test_to_sl_column_no_known_columns(self) -> None:
|
|
||||||
"""
|
|
||||||
Test that the function returns a new column
|
|
||||||
"""
|
|
||||||
dataset = db.session.query(SqlaTable).filter_by(table_name="students").first()
|
|
||||||
column = dataset.columns[0]
|
|
||||||
new_column = column.to_sl_column()
|
|
||||||
|
|
||||||
# it should use the same uuid
|
|
||||||
assert column.uuid == new_column.uuid
|
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -144,15 +144,13 @@ def test_get_datasource_sl_table(session_with_data: Session) -> None:
|
|||||||
from superset.datasource.dao import DatasourceDAO
|
from superset.datasource.dao import DatasourceDAO
|
||||||
from superset.tables.models import Table
|
from superset.tables.models import Table
|
||||||
|
|
||||||
# todo(hugh): This will break once we remove the dual write
|
|
||||||
# update the datsource_id=1 and this will pass again
|
|
||||||
result = DatasourceDAO.get_datasource(
|
result = DatasourceDAO.get_datasource(
|
||||||
datasource_type=DatasourceType.SLTABLE,
|
datasource_type=DatasourceType.SLTABLE,
|
||||||
datasource_id=2,
|
datasource_id=1,
|
||||||
session=session_with_data,
|
session=session_with_data,
|
||||||
)
|
)
|
||||||
|
|
||||||
assert result.id == 2
|
assert result.id == 1
|
||||||
assert isinstance(result, Table)
|
assert isinstance(result, Table)
|
||||||
|
|
||||||
|
|
||||||
@@ -160,15 +158,13 @@ def test_get_datasource_sl_dataset(session_with_data: Session) -> None:
|
|||||||
from superset.datasets.models import Dataset
|
from superset.datasets.models import Dataset
|
||||||
from superset.datasource.dao import DatasourceDAO
|
from superset.datasource.dao import DatasourceDAO
|
||||||
|
|
||||||
# todo(hugh): This will break once we remove the dual write
|
|
||||||
# update the datsource_id=1 and this will pass again
|
|
||||||
result = DatasourceDAO.get_datasource(
|
result = DatasourceDAO.get_datasource(
|
||||||
datasource_type=DatasourceType.DATASET,
|
datasource_type=DatasourceType.DATASET,
|
||||||
datasource_id=2,
|
datasource_id=1,
|
||||||
session=session_with_data,
|
session=session_with_data,
|
||||||
)
|
)
|
||||||
|
|
||||||
assert result.id == 2
|
assert result.id == 1
|
||||||
assert isinstance(result, Dataset)
|
assert isinstance(result, Dataset)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user