fix: Persist catalog change during dataset update + validation fixes (#33384)

This commit is contained in:
Vitor Avila
2025-05-08 15:22:25 -03:00
committed by GitHub
parent 4ed05f4ff1
commit 72cd9dffa3
6 changed files with 542 additions and 245 deletions

View File

@@ -33,6 +33,18 @@ def get_dataset_exist_error_msg(table: Table) -> str:
return _("Dataset %(table)s already exists", table=table)
class MultiCatalogDisabledValidationError(ValidationError):
"""
Validation error for using a non-default catalog when multi-catalog is disabled
"""
def __init__(self) -> None:
super().__init__(
[_("Only the default catalog is supported for this connection")],
field_name="catalog",
)
class DatabaseNotFoundValidationError(ValidationError):
"""
Marshmallow validation error for database does not exist
@@ -42,15 +54,6 @@ class DatabaseNotFoundValidationError(ValidationError):
super().__init__([_("Database does not exist")], field_name="database")
class DatabaseChangeValidationError(ValidationError):
"""
Marshmallow validation error database changes are not allowed on update
"""
def __init__(self) -> None:
super().__init__([_("Database not allowed to change")], field_name="database")
class DatasetExistsValidationError(ValidationError):
"""
Marshmallow validation error for dataset already exists

View File

@@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import logging
from collections import Counter
from functools import partial
@@ -26,7 +28,7 @@ from sqlalchemy.exc import SQLAlchemyError
from superset import is_feature_enabled, security_manager
from superset.commands.base import BaseCommand, UpdateMixin
from superset.commands.dataset.exceptions import (
DatabaseChangeValidationError,
DatabaseNotFoundValidationError,
DatasetColumnNotFoundValidationError,
DatasetColumnsDuplicateValidationError,
DatasetColumnsExistsValidationError,
@@ -38,11 +40,13 @@ from superset.commands.dataset.exceptions import (
DatasetMetricsNotFoundValidationError,
DatasetNotFoundError,
DatasetUpdateFailedError,
MultiCatalogDisabledValidationError,
)
from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
from superset.daos.dataset import DatasetDAO
from superset.datasets.schemas import FolderSchema
from superset.exceptions import SupersetSecurityException
from superset.models.core import Database
from superset.sql_parse import Table
from superset.utils.decorators import on_error, transaction
@@ -86,38 +90,12 @@ class UpdateDatasetCommand(UpdateMixin, BaseCommand):
if not self._model:
raise DatasetNotFoundError()
# Check ownership
# Check permission to update the dataset
try:
security_manager.raise_for_ownership(self._model)
except SupersetSecurityException as ex:
raise DatasetForbiddenError() from ex
database_id = self._properties.get("database")
catalog = self._properties.get("catalog")
if not catalog:
catalog = self._properties["catalog"] = (
self._model.database.get_default_catalog()
)
table = Table(
self._properties.get("table_name"), # type: ignore
self._properties.get("schema"),
catalog,
)
# Validate uniqueness
if not DatasetDAO.validate_update_uniqueness(
self._model.database,
table,
self._model_id,
):
exceptions.append(DatasetExistsValidationError(table))
# Validate/Populate database not allowed to change
if database_id and database_id != self._model:
exceptions.append(DatabaseChangeValidationError())
# Validate/Populate owner
try:
owners = self.compute_owners(
@@ -128,15 +106,68 @@ class UpdateDatasetCommand(UpdateMixin, BaseCommand):
except ValidationError as ex:
exceptions.append(ex)
self._validate_dataset_source(exceptions)
self._validate_semantics(exceptions)
if exceptions:
raise DatasetInvalidError(exceptions=exceptions)
def _validate_dataset_source(self, exceptions: list[ValidationError]) -> None:
# we know we have a valid model
self._model = cast(SqlaTable, self._model)
database_id = self._properties.pop("database_id", None)
catalog = self._properties.get("catalog")
new_db_connection: Database | None = None
if database_id and database_id != self._model.database.id:
if new_db_connection := DatasetDAO.get_database_by_id(database_id):
self._properties["database"] = new_db_connection
else:
exceptions.append(DatabaseNotFoundValidationError())
db = new_db_connection or self._model.database
default_catalog = db.get_default_catalog()
# If multi-catalog is disabled, and catalog provided is not
# the default one, fail
if (
"catalog" in self._properties
and catalog != default_catalog
and not db.allow_multi_catalog
):
exceptions.append(MultiCatalogDisabledValidationError())
# If the DB connection does not support multi-catalog,
# use the default catalog
elif not db.allow_multi_catalog:
catalog = self._properties["catalog"] = default_catalog
# Fallback to using the previous value if not provided
elif "catalog" not in self._properties:
catalog = self._model.catalog
schema = (
self._properties["schema"]
if "schema" in self._properties
else self._model.schema
)
table = Table(
self._properties.get("table_name", self._model.table_name),
schema,
catalog,
)
# Validate uniqueness
if not DatasetDAO.validate_update_uniqueness(
db,
table,
self._model_id,
):
exceptions.append(DatasetExistsValidationError(table))
def _validate_semantics(self, exceptions: list[ValidationError]) -> None:
# we know we have a valid model
self._model = cast(SqlaTable, self._model)
if columns := self._properties.get("columns"):
self._validate_columns(columns, exceptions)