chore: consolidate common code in importers (#11936)

* chore: consolidate common code in importers

* Small fixes

* Small fixes
This commit is contained in:
Beto Dealmeida
2020-12-10 10:11:58 -08:00
committed by GitHub
parent 38bb6f3f20
commit 9256b6fb3d
25 changed files with 418 additions and 1348 deletions

View File

@@ -38,7 +38,7 @@ class ExportDatabasesCommand(ExportModelsCommand):
not_found = DatabaseNotFoundError
@staticmethod
def export(model: Database) -> Iterator[Tuple[str, str]]:
def _export(model: Database) -> Iterator[Tuple[str, str]]:
database_slug = secure_filename(model.database_name)
file_name = f"databases/{database_slug}.yaml"

View File

@@ -15,53 +15,43 @@
# specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, List, Optional
from typing import Any, Dict
from marshmallow import Schema, validate
from marshmallow.exceptions import ValidationError
from marshmallow import Schema
from sqlalchemy.orm import Session
from superset import db
from superset.commands.base import BaseCommand
from superset.commands.exceptions import CommandInvalidError
from superset.commands.importers.v1.utils import (
load_metadata,
load_yaml,
METADATA_FILE_NAME,
)
from superset.commands.importers.v1 import ImportModelsCommand
from superset.databases.commands.exceptions import DatabaseImportError
from superset.databases.commands.importers.v1.utils import import_database
from superset.databases.dao import DatabaseDAO
from superset.databases.schemas import ImportV1DatabaseSchema
from superset.datasets.commands.importers.v1.utils import import_dataset
from superset.datasets.schemas import ImportV1DatasetSchema
from superset.models.core import Database
schemas: Dict[str, Schema] = {
"databases/": ImportV1DatabaseSchema(),
"datasets/": ImportV1DatasetSchema(),
}
class ImportDatabasesCommand(BaseCommand):
class ImportDatabasesCommand(ImportModelsCommand):
"""Import databases"""
# pylint: disable=unused-argument
def __init__(self, contents: Dict[str, str], *args: Any, **kwargs: Any):
self.contents = contents
self.passwords: Dict[str, str] = kwargs.get("passwords") or {}
self._configs: Dict[str, Any] = {}
dao = DatabaseDAO
model_name = "database"
schemas: Dict[str, Schema] = {
"databases/": ImportV1DatabaseSchema(),
"datasets/": ImportV1DatasetSchema(),
}
import_error = DatabaseImportError
def _import_bundle(self, session: Session) -> None:
@staticmethod
def _import(session: Session, configs: Dict[str, Any]) -> None:
# first import databases
database_ids: Dict[str, int] = {}
for file_name, config in self._configs.items():
for file_name, config in configs.items():
if file_name.startswith("databases/"):
database = import_database(session, config, overwrite=True)
database_ids[str(database.uuid)] = database.id
# import related datasets
for file_name, config in self._configs.items():
for file_name, config in configs.items():
if (
file_name.startswith("datasets/")
and config["database_uuid"] in database_ids
@@ -69,66 +59,3 @@ class ImportDatabasesCommand(BaseCommand):
config["database_id"] = database_ids[config["database_uuid"]]
# overwrite=False prevents deleting any non-imported columns/metrics
import_dataset(session, config, overwrite=False)
def run(self) -> None:
self.validate()
# rollback to prevent partial imports
try:
self._import_bundle(db.session)
db.session.commit()
except Exception:
db.session.rollback()
raise DatabaseImportError()
def validate(self) -> None:
exceptions: List[ValidationError] = []
# load existing databases so we can apply the password validation
db_passwords = {
str(uuid): password
for uuid, password in db.session.query(
Database.uuid, Database.password
).all()
}
# verify that the metadata file is present and valid
try:
metadata: Optional[Dict[str, str]] = load_metadata(self.contents)
except ValidationError as exc:
exceptions.append(exc)
metadata = None
# validate databases and dataset
for file_name, content in self.contents.items():
prefix = file_name.split("/")[0]
schema = schemas.get(f"{prefix}/")
if schema:
try:
config = load_yaml(file_name, content)
# populate passwords from the request or from existing DBs
if file_name in self.passwords:
config["password"] = self.passwords[file_name]
elif prefix == "databases" and config["uuid"] in db_passwords:
config["password"] = db_passwords[config["uuid"]]
schema.load(config)
self._configs[file_name] = config
except ValidationError as exc:
exc.messages = {file_name: exc.messages}
exceptions.append(exc)
# validate that the type declared in METADATA_FILE_NAME is correct
if metadata:
type_validator = validate.Equal(Database.__name__)
try:
type_validator(metadata["type"])
except ValidationError as exc:
exc.messages = {METADATA_FILE_NAME: {"type": exc.messages}}
exceptions.append(exc)
if exceptions:
exception = CommandInvalidError("Error importing database")
exception.add_list(exceptions)
raise exception