mirror of
https://github.com/apache/superset.git
synced 2026-04-19 08:04:53 +00:00
chore: re add upload tests (#22753)
This commit is contained in:
committed by
GitHub
parent
02d4adfa2e
commit
edcbf597f5
@@ -29,12 +29,13 @@ import pytest
|
||||
|
||||
import superset.utils.database
|
||||
from superset.sql_parse import Table
|
||||
from tests.integration_tests.conftest import ADMIN_SCHEMA_NAME
|
||||
from superset import db
|
||||
from superset import security_manager
|
||||
from superset.models.core import Database
|
||||
from superset.utils import core as utils
|
||||
from tests.integration_tests.test_app import app, login
|
||||
from tests.integration_tests.base_tests import get_resp
|
||||
|
||||
from tests.integration_tests.base_tests import get_resp, SupersetTestCase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -57,8 +58,7 @@ CSV_UPLOAD_TABLE_W_SCHEMA = "csv_upload_w_schema"
|
||||
CSV_UPLOAD_TABLE_W_EXPLORE = "csv_upload_w_explore"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def setup_csv_upload(login_as_admin):
|
||||
def _setup_csv_upload():
|
||||
upload_db = superset.utils.database.get_or_create_db(
|
||||
CSV_UPLOAD_DATABASE, app.config["SQLALCHEMY_EXAMPLES_URI"]
|
||||
)
|
||||
@@ -77,8 +77,20 @@ def setup_csv_upload(login_as_admin):
|
||||
engine.execute(f"DROP TABLE IF EXISTS {PARQUET_UPLOAD_TABLE}")
|
||||
engine.execute(f"DROP TABLE IF EXISTS {CSV_UPLOAD_TABLE_W_SCHEMA}")
|
||||
engine.execute(f"DROP TABLE IF EXISTS {CSV_UPLOAD_TABLE_W_EXPLORE}")
|
||||
db.session.delete(upload_db)
|
||||
db.session.commit()
|
||||
db.session.delete(upload_db)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def setup_csv_upload(login_as_admin):
|
||||
yield from _setup_csv_upload()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def setup_csv_upload_with_context():
|
||||
with app.app_context():
|
||||
login(test_client, username="admin")
|
||||
yield from _setup_csv_upload()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@@ -199,3 +211,306 @@ def mock_upload_to_s3(filename: str, upload_prefix: str, table: Table) -> str:
|
||||
container.exec_run(f"hdfs dfs -put {src} {dest}")
|
||||
# hive external table expectes a directory for the location
|
||||
return dest_dir
|
||||
|
||||
|
||||
def escaped_double_quotes(text):
|
||||
return f"\"{text}\""
|
||||
|
||||
|
||||
def escaped_parquet(text):
|
||||
return escaped_double_quotes(f"['{text}']")
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_csv_upload_with_context")
|
||||
@pytest.mark.usefixtures("create_csv_files")
|
||||
@mock.patch(
|
||||
"superset.models.core.config",
|
||||
{**app.config, "ALLOWED_USER_CSV_SCHEMA_FUNC": lambda d, u: ["admin_database"]},
|
||||
)
|
||||
@mock.patch("superset.db_engine_specs.hive.upload_to_s3", mock_upload_to_s3)
|
||||
@mock.patch("superset.views.database.views.event_logger.log_with_context")
|
||||
def test_import_csv_enforced_schema(mock_event_logger):
|
||||
if utils.backend() == "sqlite":
|
||||
pytest.skip("Sqlite doesn't support schema / database creation")
|
||||
|
||||
full_table_name = f"admin_database.{CSV_UPLOAD_TABLE_W_SCHEMA}"
|
||||
|
||||
# Invalid table name
|
||||
resp = upload_csv(CSV_FILENAME1, full_table_name)
|
||||
assert "Table name cannot contain a schema" in resp
|
||||
|
||||
# no schema specified, fail upload
|
||||
resp = upload_csv(CSV_FILENAME1, CSV_UPLOAD_TABLE_W_SCHEMA, extra={"schema": None})
|
||||
assert (
|
||||
f"Database {escaped_double_quotes(CSV_UPLOAD_DATABASE)} schema"
|
||||
f" {escaped_double_quotes('None')} is not allowed for csv uploads" in resp
|
||||
)
|
||||
|
||||
success_msg = f"CSV file {escaped_double_quotes(CSV_FILENAME1)} uploaded to table {escaped_double_quotes(full_table_name)}"
|
||||
|
||||
resp = upload_csv(
|
||||
CSV_FILENAME1,
|
||||
CSV_UPLOAD_TABLE_W_SCHEMA,
|
||||
extra={"schema": "admin_database", "if_exists": "replace"},
|
||||
)
|
||||
|
||||
assert success_msg in resp
|
||||
mock_event_logger.assert_called_with(
|
||||
action="successful_csv_upload",
|
||||
database=get_upload_db().name,
|
||||
schema="admin_database",
|
||||
table=CSV_UPLOAD_TABLE_W_SCHEMA,
|
||||
)
|
||||
|
||||
with get_upload_db().get_sqla_engine_with_context() as engine:
|
||||
data = engine.execute(
|
||||
f"SELECT * from {ADMIN_SCHEMA_NAME}.{CSV_UPLOAD_TABLE_W_SCHEMA}"
|
||||
).fetchall()
|
||||
assert data == [("john", 1), ("paul", 2)]
|
||||
|
||||
# user specified schema doesn't match, fail
|
||||
resp = upload_csv(
|
||||
CSV_FILENAME1, CSV_UPLOAD_TABLE_W_SCHEMA, extra={"schema": "gold"}
|
||||
)
|
||||
assert (
|
||||
f'Database {escaped_double_quotes(CSV_UPLOAD_DATABASE)} schema {escaped_double_quotes("gold")} is not allowed for csv uploads'
|
||||
in resp
|
||||
)
|
||||
|
||||
# user specified schema matches the expected schema, append
|
||||
if utils.backend() == "hive":
|
||||
pytest.skip("Hive database doesn't support append csv uploads.")
|
||||
resp = upload_csv(
|
||||
CSV_FILENAME1,
|
||||
CSV_UPLOAD_TABLE_W_SCHEMA,
|
||||
extra={"schema": "admin_database", "if_exists": "append"},
|
||||
)
|
||||
assert success_msg in resp
|
||||
|
||||
# Clean up
|
||||
with get_upload_db().get_sqla_engine_with_context() as engine:
|
||||
engine.execute(f"DROP TABLE {full_table_name}")
|
||||
|
||||
|
||||
@mock.patch("superset.db_engine_specs.hive.upload_to_s3", mock_upload_to_s3)
|
||||
def test_import_csv_explore_database(setup_csv_upload_with_context, create_csv_files):
|
||||
schema = utils.get_example_default_schema()
|
||||
full_table_name = (
|
||||
f"{schema}.{CSV_UPLOAD_TABLE_W_EXPLORE}"
|
||||
if schema
|
||||
else CSV_UPLOAD_TABLE_W_EXPLORE
|
||||
)
|
||||
|
||||
if utils.backend() == "sqlite":
|
||||
pytest.skip("Sqlite doesn't support schema / database creation")
|
||||
|
||||
resp = upload_csv(CSV_FILENAME1, CSV_UPLOAD_TABLE_W_EXPLORE)
|
||||
assert (
|
||||
f"CSV file {escaped_double_quotes(CSV_FILENAME1)} uploaded to table {escaped_double_quotes(full_table_name)}"
|
||||
in resp
|
||||
)
|
||||
table = SupersetTestCase.get_table(name=CSV_UPLOAD_TABLE_W_EXPLORE)
|
||||
assert table.database_id == superset.utils.database.get_example_database().id
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_csv_upload_with_context")
|
||||
@pytest.mark.usefixtures("create_csv_files")
|
||||
@mock.patch("superset.db_engine_specs.hive.upload_to_s3", mock_upload_to_s3)
|
||||
@mock.patch("superset.views.database.views.event_logger.log_with_context")
|
||||
def test_import_csv(mock_event_logger):
|
||||
schema = utils.get_example_default_schema()
|
||||
full_table_name = f"{schema}.{CSV_UPLOAD_TABLE}" if schema else CSV_UPLOAD_TABLE
|
||||
success_msg_f1 = f"CSV file {escaped_double_quotes(CSV_FILENAME1)} uploaded to table {escaped_double_quotes(full_table_name)}"
|
||||
|
||||
test_db = get_upload_db()
|
||||
|
||||
# initial upload with fail mode
|
||||
resp = upload_csv(CSV_FILENAME1, CSV_UPLOAD_TABLE)
|
||||
assert success_msg_f1 in resp
|
||||
|
||||
# upload again with fail mode; should fail
|
||||
fail_msg = f"Unable to upload CSV file {escaped_double_quotes(CSV_FILENAME1)} to table {escaped_double_quotes(CSV_UPLOAD_TABLE)}"
|
||||
resp = upload_csv(CSV_FILENAME1, CSV_UPLOAD_TABLE)
|
||||
assert fail_msg in resp
|
||||
|
||||
if utils.backend() != "hive":
|
||||
# upload again with append mode
|
||||
resp = upload_csv(
|
||||
CSV_FILENAME1, CSV_UPLOAD_TABLE, extra={"if_exists": "append"}
|
||||
)
|
||||
assert success_msg_f1 in resp
|
||||
mock_event_logger.assert_called_with(
|
||||
action="successful_csv_upload",
|
||||
database=test_db.name,
|
||||
schema=schema,
|
||||
table=CSV_UPLOAD_TABLE,
|
||||
)
|
||||
|
||||
# upload again with replace mode
|
||||
resp = upload_csv(CSV_FILENAME1, CSV_UPLOAD_TABLE, extra={"if_exists": "replace"})
|
||||
assert success_msg_f1 in resp
|
||||
|
||||
# try to append to table from file with different schema
|
||||
resp = upload_csv(CSV_FILENAME2, CSV_UPLOAD_TABLE, extra={"if_exists": "append"})
|
||||
fail_msg_f2 = f"Unable to upload CSV file {escaped_double_quotes(CSV_FILENAME2)} to table {escaped_double_quotes(CSV_UPLOAD_TABLE)}"
|
||||
assert fail_msg_f2 in resp
|
||||
|
||||
# replace table from file with different schema
|
||||
resp = upload_csv(CSV_FILENAME2, CSV_UPLOAD_TABLE, extra={"if_exists": "replace"})
|
||||
success_msg_f2 = f"CSV file {escaped_double_quotes(CSV_FILENAME2)} uploaded to table {escaped_double_quotes(full_table_name)}"
|
||||
assert success_msg_f2 in resp
|
||||
|
||||
table = SupersetTestCase.get_table(name=CSV_UPLOAD_TABLE)
|
||||
# make sure the new column name is reflected in the table metadata
|
||||
assert "d" in table.column_names
|
||||
|
||||
# ensure user is assigned as an owner
|
||||
assert security_manager.find_user("admin") in table.owners
|
||||
|
||||
# null values are set
|
||||
upload_csv(
|
||||
CSV_FILENAME2,
|
||||
CSV_UPLOAD_TABLE,
|
||||
extra={"null_values": '["", "john"]', "if_exists": "replace"},
|
||||
)
|
||||
# make sure that john and empty string are replaced with None
|
||||
with test_db.get_sqla_engine_with_context() as engine:
|
||||
data = engine.execute(f"SELECT * from {CSV_UPLOAD_TABLE}").fetchall()
|
||||
assert data == [(None, 1, "x"), ("paul", 2, None)]
|
||||
# default null values
|
||||
upload_csv(CSV_FILENAME2, CSV_UPLOAD_TABLE, extra={"if_exists": "replace"})
|
||||
# make sure that john and empty string are replaced with None
|
||||
data = engine.execute(f"SELECT * from {CSV_UPLOAD_TABLE}").fetchall()
|
||||
assert data == [("john", 1, "x"), ("paul", 2, None)]
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_csv_upload_with_context")
|
||||
@pytest.mark.usefixtures("create_excel_files")
|
||||
@mock.patch("superset.db_engine_specs.hive.upload_to_s3", mock_upload_to_s3)
|
||||
@mock.patch("superset.views.database.views.event_logger.log_with_context")
|
||||
def test_import_excel(mock_event_logger):
|
||||
if utils.backend() == "hive":
|
||||
pytest.skip("Hive doesn't excel upload.")
|
||||
|
||||
schema = utils.get_example_default_schema()
|
||||
full_table_name = f"{schema}.{EXCEL_UPLOAD_TABLE}" if schema else EXCEL_UPLOAD_TABLE
|
||||
test_db = get_upload_db()
|
||||
|
||||
success_msg = f"Excel file {escaped_double_quotes(EXCEL_FILENAME)} uploaded to table {escaped_double_quotes(full_table_name)}"
|
||||
|
||||
# initial upload with fail mode
|
||||
resp = upload_excel(EXCEL_FILENAME, EXCEL_UPLOAD_TABLE)
|
||||
assert success_msg in resp
|
||||
mock_event_logger.assert_called_with(
|
||||
action="successful_excel_upload",
|
||||
database=test_db.name,
|
||||
schema=schema,
|
||||
table=EXCEL_UPLOAD_TABLE,
|
||||
)
|
||||
|
||||
# ensure user is assigned as an owner
|
||||
table = SupersetTestCase.get_table(name=EXCEL_UPLOAD_TABLE)
|
||||
assert security_manager.find_user("admin") in table.owners
|
||||
|
||||
# upload again with fail mode; should fail
|
||||
fail_msg = f"Unable to upload Excel file {escaped_double_quotes(EXCEL_FILENAME)} to table {escaped_double_quotes(EXCEL_UPLOAD_TABLE)}"
|
||||
resp = upload_excel(EXCEL_FILENAME, EXCEL_UPLOAD_TABLE)
|
||||
assert fail_msg in resp
|
||||
|
||||
if utils.backend() != "hive":
|
||||
# upload again with append mode
|
||||
resp = upload_excel(
|
||||
EXCEL_FILENAME, EXCEL_UPLOAD_TABLE, extra={"if_exists": "append"}
|
||||
)
|
||||
assert success_msg in resp
|
||||
|
||||
# upload again with replace mode
|
||||
resp = upload_excel(
|
||||
EXCEL_FILENAME, EXCEL_UPLOAD_TABLE, extra={"if_exists": "replace"}
|
||||
)
|
||||
assert success_msg in resp
|
||||
mock_event_logger.assert_called_with(
|
||||
action="successful_excel_upload",
|
||||
database=test_db.name,
|
||||
schema=schema,
|
||||
table=EXCEL_UPLOAD_TABLE,
|
||||
)
|
||||
|
||||
with test_db.get_sqla_engine_with_context() as engine:
|
||||
data = engine.execute(f"SELECT * from {EXCEL_UPLOAD_TABLE}").fetchall()
|
||||
assert data == [(0, "john", 1), (1, "paul", 2)]
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_csv_upload_with_context")
|
||||
@pytest.mark.usefixtures("create_columnar_files")
|
||||
@mock.patch("superset.db_engine_specs.hive.upload_to_s3", mock_upload_to_s3)
|
||||
@mock.patch("superset.views.database.views.event_logger.log_with_context")
|
||||
def test_import_parquet(mock_event_logger):
|
||||
if utils.backend() == "hive":
|
||||
pytest.skip("Hive doesn't allow parquet upload.")
|
||||
|
||||
schema = utils.get_example_default_schema()
|
||||
full_table_name = (
|
||||
f"{schema}.{PARQUET_UPLOAD_TABLE}" if schema else PARQUET_UPLOAD_TABLE
|
||||
)
|
||||
test_db = get_upload_db()
|
||||
|
||||
success_msg_f1 = f"Columnar file {escaped_parquet(PARQUET_FILENAME1)} uploaded to table {escaped_double_quotes(full_table_name)}"
|
||||
|
||||
# initial upload with fail mode
|
||||
resp = upload_columnar(PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE)
|
||||
assert success_msg_f1 in resp
|
||||
|
||||
# upload again with fail mode; should fail
|
||||
fail_msg = f"Unable to upload Columnar file {escaped_parquet(PARQUET_FILENAME1)} to table {escaped_double_quotes(PARQUET_UPLOAD_TABLE)}"
|
||||
resp = upload_columnar(PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE)
|
||||
assert fail_msg in resp
|
||||
|
||||
if utils.backend() != "hive":
|
||||
# upload again with append mode
|
||||
resp = upload_columnar(
|
||||
PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE, extra={"if_exists": "append"}
|
||||
)
|
||||
assert success_msg_f1 in resp
|
||||
mock_event_logger.assert_called_with(
|
||||
action="successful_columnar_upload",
|
||||
database=test_db.name,
|
||||
schema=schema,
|
||||
table=PARQUET_UPLOAD_TABLE,
|
||||
)
|
||||
|
||||
# upload again with replace mode and specific columns
|
||||
resp = upload_columnar(
|
||||
PARQUET_FILENAME1,
|
||||
PARQUET_UPLOAD_TABLE,
|
||||
extra={"if_exists": "replace", "usecols": '["a"]'},
|
||||
)
|
||||
assert success_msg_f1 in resp
|
||||
|
||||
table = SupersetTestCase.get_table(name=PARQUET_UPLOAD_TABLE, schema=None)
|
||||
# make sure only specified column name was read
|
||||
assert "b" not in table.column_names
|
||||
|
||||
# ensure user is assigned as an owner
|
||||
assert security_manager.find_user("admin") in table.owners
|
||||
|
||||
# upload again with replace mode
|
||||
resp = upload_columnar(
|
||||
PARQUET_FILENAME1, PARQUET_UPLOAD_TABLE, extra={"if_exists": "replace"}
|
||||
)
|
||||
assert success_msg_f1 in resp
|
||||
|
||||
with test_db.get_sqla_engine_with_context() as engine:
|
||||
data = engine.execute(f"SELECT * from {PARQUET_UPLOAD_TABLE}").fetchall()
|
||||
assert data == [("john", 1), ("paul", 2)]
|
||||
|
||||
# replace table with zip file
|
||||
resp = upload_columnar(
|
||||
ZIP_FILENAME, PARQUET_UPLOAD_TABLE, extra={"if_exists": "replace"}
|
||||
)
|
||||
success_msg_f2 = f"Columnar file {escaped_parquet(ZIP_FILENAME)} uploaded to table {escaped_double_quotes(full_table_name)}"
|
||||
assert success_msg_f2 in resp
|
||||
|
||||
with test_db.get_sqla_engine_with_context() as engine:
|
||||
data = engine.execute(f"SELECT * from {PARQUET_UPLOAD_TABLE}").fetchall()
|
||||
assert data == [("john", 1), ("paul", 2), ("max", 3), ("bob", 4)]
|
||||
|
||||
@@ -366,12 +366,18 @@ class TestDatasetApi(SupersetTestCase):
|
||||
schema="information_schema",
|
||||
)
|
||||
)
|
||||
schema_values = [
|
||||
"information_schema",
|
||||
"public",
|
||||
]
|
||||
all_datasets = db.session.query(SqlaTable).all()
|
||||
schema_values = sorted(
|
||||
set(
|
||||
[
|
||||
dataset.schema
|
||||
for dataset in all_datasets
|
||||
if dataset.schema is not None
|
||||
]
|
||||
)
|
||||
)
|
||||
expected_response = {
|
||||
"count": 2,
|
||||
"count": len(schema_values),
|
||||
"result": [{"text": val, "value": val} for val in schema_values],
|
||||
}
|
||||
self.login(username="admin")
|
||||
@@ -397,10 +403,8 @@ class TestDatasetApi(SupersetTestCase):
|
||||
pg_test_query_parameter(
|
||||
query_parameter,
|
||||
{
|
||||
"count": 2,
|
||||
"result": [
|
||||
{"text": "information_schema", "value": "information_schema"}
|
||||
],
|
||||
"count": len(schema_values),
|
||||
"result": [expected_response["result"][0]],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user