feat: export datasets as ZIP files (#11332)

* Export datasets as ZIP files

* Add logging when failing to parse extra

* Fix logging
This commit is contained in:
Beto Dealmeida
2020-10-22 10:32:08 -07:00
committed by GitHub
parent 64b5aae6bc
commit 00e394451f
6 changed files with 391 additions and 16 deletions

View File

@@ -17,6 +17,7 @@
# isort:skip_file
import json
import logging
from typing import Iterator, List, Tuple
import yaml
@@ -27,6 +28,8 @@ from superset.databases.dao import DatabaseDAO
from superset.utils.dict_import_export import IMPORT_EXPORT_VERSION, sanitize
from superset.models.core import Database
logger = logging.getLogger(__name__)
class ExportDatabasesCommand(BaseCommand):
def __init__(self, database_ids: List[int]):
@@ -37,8 +40,8 @@ class ExportDatabasesCommand(BaseCommand):
@staticmethod
def export_database(database: Database) -> Iterator[Tuple[str, str]]:
name = sanitize(database.database_name)
file_name = f"databases/{name}.yaml"
database_slug = sanitize(database.database_name)
file_name = f"databases/{database_slug}.yaml"
payload = database.export_to_dict(
recursive=False,
@@ -52,18 +55,16 @@ class ExportDatabasesCommand(BaseCommand):
try:
payload["extra"] = json.loads(payload["extra"])
except json.decoder.JSONDecodeError:
pass
logger.info("Unable to decode `extra` field: %s", payload["extra"])
payload["version"] = IMPORT_EXPORT_VERSION
file_content = yaml.safe_dump(payload, sort_keys=False)
yield file_name, file_content
# TODO (betodealmeida): reuse logic from ExportDatasetCommand once
# it's implemented
for dataset in database.tables:
name = sanitize(dataset.table_name)
file_name = f"datasets/{name}.yaml"
dataset_slug = sanitize(dataset.table_name)
file_name = f"datasets/{database_slug}/{dataset_slug}.yaml"
payload = dataset.export_to_dict(
recursive=True,

View File

@@ -15,15 +15,19 @@
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime
from io import BytesIO
from typing import Any
from zipfile import ZipFile
import yaml
from flask import g, request, Response
from flask import g, request, Response, send_file
from flask_appbuilder.api import expose, protect, rison, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_babel import ngettext
from marshmallow import ValidationError
from superset import is_feature_enabled
from superset.connectors.sqla.models import SqlaTable
from superset.constants import RouteMethod
from superset.databases.filters import DatabaseFilter
@@ -40,6 +44,7 @@ from superset.datasets.commands.exceptions import (
DatasetRefreshFailedError,
DatasetUpdateFailedError,
)
from superset.datasets.commands.export import ExportDatasetsCommand
from superset.datasets.commands.refresh import RefreshDatasetCommand
from superset.datasets.commands.update import UpdateDatasetCommand
from superset.datasets.dao import DatasetDAO
@@ -373,6 +378,31 @@ class DatasetRestApi(BaseSupersetModelRestApi):
$ref: '#/components/responses/500'
"""
requested_ids = kwargs["rison"]
if is_feature_enabled("VERSIONED_EXPORT"):
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
root = f"dataset_export_{timestamp}"
filename = f"{root}.zip"
buf = BytesIO()
with ZipFile(buf, "w") as bundle:
try:
for file_name, file_content in ExportDatasetsCommand(
requested_ids
).run():
with bundle.open(f"{root}/{file_name}", "w") as fp:
fp.write(file_content.encode())
except DatasetNotFoundError:
return self.response_404()
buf.seek(0)
return send_file(
buf,
mimetype="application/zip",
as_attachment=True,
attachment_filename=filename,
)
query = self.datamodel.session.query(SqlaTable).filter(
SqlaTable.id.in_(requested_ids)
)

View File

@@ -0,0 +1,96 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# isort:skip_file
import json
import logging
from typing import Iterator, List, Tuple
import yaml
from superset.commands.base import BaseCommand
from superset.connectors.sqla.models import SqlaTable
from superset.datasets.commands.exceptions import DatasetNotFoundError
from superset.datasets.dao import DatasetDAO
from superset.utils.dict_import_export import IMPORT_EXPORT_VERSION, sanitize
logger = logging.getLogger(__name__)
class ExportDatasetsCommand(BaseCommand):
def __init__(self, dataset_ids: List[int]):
self.dataset_ids = dataset_ids
# this will be set when calling validate()
self._models: List[SqlaTable] = []
@staticmethod
def export_dataset(dataset: SqlaTable) -> Iterator[Tuple[str, str]]:
database_slug = sanitize(dataset.database.database_name)
dataset_slug = sanitize(dataset.table_name)
file_name = f"datasets/{database_slug}/{dataset_slug}.yaml"
payload = dataset.export_to_dict(
recursive=True,
include_parent_ref=False,
include_defaults=True,
export_uuids=True,
)
payload["version"] = IMPORT_EXPORT_VERSION
payload["database_uuid"] = str(dataset.database.uuid)
file_content = yaml.safe_dump(payload, sort_keys=False)
yield file_name, file_content
# include database as well
file_name = f"databases/{database_slug}.yaml"
payload = dataset.database.export_to_dict(
recursive=False,
include_parent_ref=False,
include_defaults=True,
export_uuids=True,
)
# TODO (betodealmeida): move this logic to export_to_dict once this
# becomes the default export endpoint
if "extra" in payload:
try:
payload["extra"] = json.loads(payload["extra"])
except json.decoder.JSONDecodeError:
logger.info("Unable to decode `extra` field: %s", payload["extra"])
payload["version"] = IMPORT_EXPORT_VERSION
file_content = yaml.safe_dump(payload, sort_keys=False)
yield file_name, file_content
def run(self) -> Iterator[Tuple[str, str]]:
self.validate()
seen = set()
for dataset in self._models:
for file_name, file_content in self.export_dataset(dataset):
# ignore repeated databases
if file_name not in seen:
yield file_name, file_content
seen.add(file_name)
def validate(self) -> None:
self._models = DatasetDAO.find_by_ids(self.dataset_ids)
if len(self._models) != len(self.dataset_ids):
raise DatasetNotFoundError()

View File

@@ -19,10 +19,9 @@ from unittest.mock import patch
import yaml
from superset import db, security_manager
from superset import security_manager
from superset.databases.commands.exceptions import DatabaseNotFoundError
from superset.databases.commands.export import ExportDatabasesCommand
from superset.models.core import Database
from superset.utils.core import backend, get_example_database
from tests.base_tests import SupersetTestCase
@@ -38,11 +37,11 @@ class TestExportDatabasesCommand(SupersetTestCase):
# TODO: this list shouldn't depend on the order in which unit tests are run
# or on the backend; for now use a stable subset
core_datasets = {
core_files = {
"databases/examples.yaml",
"datasets/energy_usage.yaml",
"datasets/wb_health_population.yaml",
"datasets/birth_names.yaml",
"datasets/examples/energy_usage.yaml",
"datasets/examples/wb_health_population.yaml",
"datasets/examples/birth_names.yaml",
}
expected_extra = {
"engine_params": {},
@@ -53,7 +52,7 @@ class TestExportDatabasesCommand(SupersetTestCase):
if backend() == "presto":
expected_extra = {"engine_params": {"connect_args": {"poll_interval": 0.1}}}
assert core_datasets.issubset(set(contents.keys()))
assert core_files.issubset(set(contents.keys()))
metadata = yaml.safe_load(contents["databases/examples.yaml"])
assert metadata == (
@@ -72,7 +71,7 @@ class TestExportDatabasesCommand(SupersetTestCase):
}
)
metadata = yaml.safe_load(contents["datasets/birth_names.yaml"])
metadata = yaml.safe_load(contents["datasets/examples/birth_names.yaml"])
metadata.pop("uuid")
assert metadata == {
"table_name": "birth_names",

View File

@@ -16,8 +16,10 @@
# under the License.
"""Unit tests for Superset"""
import json
from io import BytesIO
from typing import List
from unittest.mock import patch
from zipfile import is_zipfile
import prison
import pytest
@@ -1006,6 +1008,68 @@ class TestDatasetApi(SupersetTestCase):
rv = self.client.get(uri)
assert rv.status_code == 401
@patch.dict(
"superset.extensions.feature_flag_manager._feature_flags",
{"VERSIONED_EXPORT": True},
clear=True,
)
def test_export_dataset_bundle(self):
"""
Dataset API: Test export dataset
"""
birth_names_dataset = self.get_birth_names_dataset()
# TODO: fix test for presto
# debug with dump: https://github.com/apache/incubator-superset/runs/1092546855
if birth_names_dataset.database.backend in {"presto", "hive"}:
return
argument = [birth_names_dataset.id]
uri = f"api/v1/dataset/export/?q={prison.dumps(argument)}"
self.login(username="admin")
rv = self.get_assert_metric(uri, "export")
assert rv.status_code == 200
buf = BytesIO(rv.data)
assert is_zipfile(buf)
@patch.dict(
"superset.extensions.feature_flag_manager._feature_flags",
{"VERSIONED_EXPORT": True},
clear=True,
)
def test_export_dataset_bundle_not_found(self):
"""
Dataset API: Test export dataset not found
"""
# Just one does not exist and we get 404
argument = [-1, 1]
uri = f"api/v1/dataset/export/?q={prison.dumps(argument)}"
self.login(username="admin")
rv = self.get_assert_metric(uri, "export")
assert rv.status_code == 404
@patch.dict(
"superset.extensions.feature_flag_manager._feature_flags",
{"VERSIONED_EXPORT": True},
clear=True,
)
def test_export_dataset_bundle_gamma(self):
"""
Dataset API: Test export dataset has gamma
"""
birth_names_dataset = self.get_birth_names_dataset()
argument = [birth_names_dataset.id]
uri = f"api/v1/dataset/export/?q={prison.dumps(argument)}"
self.login(username="gamma")
rv = self.client.get(uri)
assert rv.status_code == 401
def test_get_dataset_related_objects(self):
"""
Dataset API: Test get chart and dashboard count related to a dataset

View File

@@ -0,0 +1,185 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from operator import itemgetter
from unittest.mock import patch
import yaml
from superset import security_manager
from superset.connectors.sqla.models import SqlaTable
from superset.datasets.commands.exceptions import DatasetNotFoundError
from superset.datasets.commands.export import ExportDatasetsCommand
from superset.utils.core import backend, get_example_database
from tests.base_tests import SupersetTestCase
class TestExportDatasetsCommand(SupersetTestCase):
@patch("superset.security.manager.g")
def test_export_dataset_command(self, mock_g):
mock_g.user = security_manager.find_user("admin")
example_db = get_example_database()
example_dataset = example_db.tables[0]
command = ExportDatasetsCommand(dataset_ids=[example_dataset.id])
contents = dict(command.run())
assert list(contents.keys()) == [
"datasets/examples/energy_usage.yaml",
"databases/examples.yaml",
]
metadata = yaml.safe_load(contents["datasets/examples/energy_usage.yaml"])
# sort columns for deterministc comparison
metadata["columns"] = sorted(metadata["columns"], key=itemgetter("column_name"))
metadata["metrics"] = sorted(metadata["metrics"], key=itemgetter("metric_name"))
# types are different depending on the backend
type_map = {
column.column_name: str(column.type) for column in example_dataset.columns
}
assert metadata == {
"cache_timeout": None,
"columns": [
{
"column_name": "source",
"description": None,
"expression": None,
"filterable": True,
"groupby": True,
"is_active": True,
"is_dttm": False,
"python_date_format": None,
"type": type_map["source"],
"verbose_name": None,
},
{
"column_name": "target",
"description": None,
"expression": None,
"filterable": True,
"groupby": True,
"is_active": True,
"is_dttm": False,
"python_date_format": None,
"type": type_map["target"],
"verbose_name": None,
},
{
"column_name": "value",
"description": None,
"expression": None,
"filterable": True,
"groupby": True,
"is_active": True,
"is_dttm": False,
"python_date_format": None,
"type": type_map["value"],
"verbose_name": None,
},
],
"database_uuid": str(example_db.uuid),
"default_endpoint": None,
"description": "Energy consumption",
"fetch_values_predicate": None,
"filter_select_enabled": False,
"main_dttm_col": None,
"metrics": [
{
"d3format": None,
"description": None,
"expression": "COUNT(*)",
"extra": None,
"metric_name": "count",
"metric_type": "count",
"verbose_name": "COUNT(*)",
"warning_text": None,
},
{
"d3format": None,
"description": None,
"expression": "SUM(value)",
"extra": None,
"metric_name": "sum__value",
"metric_type": None,
"verbose_name": None,
"warning_text": None,
},
],
"offset": 0,
"params": None,
"schema": None,
"sql": None,
"table_name": "energy_usage",
"template_params": None,
"uuid": str(example_dataset.uuid),
"version": "1.0.0",
}
@patch("superset.security.manager.g")
def test_export_dataset_command_no_access(self, mock_g):
"""Test that users can't export datasets they don't have access to"""
mock_g.user = security_manager.find_user("gamma")
example_db = get_example_database()
example_dataset = example_db.tables[0]
command = ExportDatasetsCommand(dataset_ids=[example_dataset.id])
contents = command.run()
with self.assertRaises(DatasetNotFoundError):
next(contents)
@patch("superset.security.manager.g")
def test_export_dataset_command_invalid_dataset(self, mock_g):
"""Test that an error is raised when exporting an invalid dataset"""
mock_g.user = security_manager.find_user("admin")
command = ExportDatasetsCommand(dataset_ids=[-1])
contents = command.run()
with self.assertRaises(DatasetNotFoundError):
next(contents)
@patch("superset.security.manager.g")
def test_export_dataset_command_key_order(self, mock_g):
"""Test that they keys in the YAML have the same order as export_fields"""
mock_g.user = security_manager.find_user("admin")
example_db = get_example_database()
example_dataset = example_db.tables[0]
command = ExportDatasetsCommand(dataset_ids=[example_dataset.id])
contents = dict(command.run())
metadata = yaml.safe_load(contents["datasets/examples/energy_usage.yaml"])
assert list(metadata.keys()) == [
"table_name",
"main_dttm_col",
"description",
"default_endpoint",
"offset",
"cache_timeout",
"schema",
"sql",
"params",
"template_params",
"filter_select_enabled",
"fetch_values_predicate",
"uuid",
"metrics",
"columns",
"version",
"database_uuid",
]