chore: clean up csv tests (#10556)

* Clean up csv tests

* Update tests/base_tests.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* Update tests/base_tests.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* import optional

* Fix mypy error

Co-authored-by: bogdan kyryliuk <bogdankyryliuk@dropbox.com>
Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>
This commit is contained in:
Bogdan
2020-08-10 11:22:38 -07:00
committed by GitHub
parent 8b9292ed05
commit 101e5b670b
5 changed files with 319 additions and 230 deletions

View File

@@ -845,222 +845,6 @@ class TestCore(SupersetTestCase):
form_get = self.get_resp("/csvtodatabaseview/form")
self.assertIn("CSV to Database configuration", form_get)
def upload_csv(
self, filename: str, table_name: str, extra: Optional[Dict[str, str]] = None
):
form_data = {
"csv_file": open(filename, "rb"),
"sep": ",",
"name": table_name,
"con": utils.get_example_database().id,
"if_exists": "fail",
"index_label": "test_label",
"mangle_dupe_cols": False,
}
if extra:
form_data.update(extra)
return self.get_resp("/csvtodatabaseview/form", data=form_data)
def upload_excel(
self, filename: str, table_name: str, extra: Optional[Dict[str, str]] = None
):
form_data = {
"excel_file": open(filename, "rb"),
"name": table_name,
"con": utils.get_example_database().id,
"sheet_name": "Sheet1",
"if_exists": "fail",
"index_label": "test_label",
"mangle_dupe_cols": False,
}
if extra:
form_data.update(extra)
return self.get_resp("/exceltodatabaseview/form", data=form_data)
@mock.patch(
"superset.models.core.config",
{**app.config, "ALLOWED_USER_CSV_SCHEMA_FUNC": lambda d, u: ["admin_database"]},
)
def test_import_csv_enforced_schema(self):
if utils.get_example_database().backend == "sqlite":
# sqlite doesn't support schema / database creation
return
self.login(username="admin")
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
full_table_name = f"admin_database.{table_name}"
filename = "testCSV.csv"
self.create_sample_csvfile(filename, ["a,b", "john,1", "paul,2"])
try:
self.enable_csv_upload(utils.get_example_database())
# no schema specified, fail upload
resp = self.upload_csv(filename, table_name)
self.assertIn(
'Database "examples" schema "None" is not allowed for csv uploads', resp
)
# user specified schema matches the expected schema, append
success_msg = f'CSV file "{filename}" uploaded to table "{full_table_name}"'
resp = self.upload_csv(
filename,
table_name,
extra={"schema": "admin_database", "if_exists": "append"},
)
self.assertIn(success_msg, resp)
resp = self.upload_csv(
filename,
table_name,
extra={"schema": "admin_database", "if_exists": "replace"},
)
self.assertIn(success_msg, resp)
# user specified schema doesn't match, fail
resp = self.upload_csv(filename, table_name, extra={"schema": "gold"})
self.assertIn(
'Database "examples" schema "gold" is not allowed for csv uploads',
resp,
)
finally:
os.remove(filename)
def test_import_csv_explore_database(self):
if utils.get_example_database().backend == "sqlite":
# sqlite doesn't support schema / database creation
return
explore_db_id = utils.get_example_database().id
upload_db = utils.get_or_create_db(
"csv_explore_db", app.config["SQLALCHEMY_EXAMPLES_URI"]
)
upload_db_id = upload_db.id
extra = upload_db.get_extra()
extra["explore_database_id"] = explore_db_id
upload_db.extra = json.dumps(extra)
db.session.commit()
self.login(username="admin")
self.enable_csv_upload(DatasetDAO.get_database_by_id(upload_db_id))
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
f = "testCSV.csv"
self.create_sample_csvfile(f, ["a,b", "john,1", "paul,2"])
# initial upload with fail mode
resp = self.upload_csv(f, table_name)
self.assertIn(f'CSV file "{f}" uploaded to table "{table_name}"', resp)
table = self.get_table_by_name(table_name)
self.assertEqual(table.database_id, explore_db_id)
# cleanup
db.session.delete(table)
db.session.delete(DatasetDAO.get_database_by_id(upload_db_id))
db.session.commit()
os.remove(f)
def test_import_csv(self):
self.login(username="admin")
examples_db = utils.get_example_database()
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
f1 = "testCSV.csv"
self.create_sample_csvfile(f1, ["a,b", "john,1", "paul,2"])
f2 = "testCSV2.csv"
self.create_sample_csvfile(f2, ["b,c,d", "john,1,x", "paul,2,"])
self.enable_csv_upload(examples_db)
try:
success_msg_f1 = f'CSV file "{f1}" uploaded to table "{table_name}"'
# initial upload with fail mode
resp = self.upload_csv(f1, table_name)
self.assertIn(success_msg_f1, resp)
# upload again with fail mode; should fail
fail_msg = f'Unable to upload CSV file "{f1}" to table "{table_name}"'
resp = self.upload_csv(f1, table_name)
self.assertIn(fail_msg, resp)
# upload again with append mode
resp = self.upload_csv(f1, table_name, extra={"if_exists": "append"})
self.assertIn(success_msg_f1, resp)
# upload again with replace mode
resp = self.upload_csv(f1, table_name, extra={"if_exists": "replace"})
self.assertIn(success_msg_f1, resp)
# try to append to table from file with different schema
resp = self.upload_csv(f2, table_name, extra={"if_exists": "append"})
fail_msg_f2 = f'Unable to upload CSV file "{f2}" to table "{table_name}"'
self.assertIn(fail_msg_f2, resp)
# replace table from file with different schema
resp = self.upload_csv(f2, table_name, extra={"if_exists": "replace"})
success_msg_f2 = f'CSV file "{f2}" uploaded to table "{table_name}"'
self.assertIn(success_msg_f2, resp)
table = self.get_table_by_name(table_name)
# make sure the new column name is reflected in the table metadata
self.assertIn("d", table.column_names)
# null values are set
self.upload_csv(
f2,
table_name,
extra={"null_values": '["", "john"]', "if_exists": "replace"},
)
# make sure that john and empty string are replaced with None
engine = examples_db.get_sqla_engine()
data = engine.execute(f"SELECT * from {table_name}").fetchall()
assert data == [(None, 1, "x"), ("paul", 2, None)]
# default null values
self.upload_csv(f2, table_name, extra={"if_exists": "replace"})
# make sure that john and empty string are replaced with None
data = engine.execute(f"SELECT * from {table_name}").fetchall()
assert data == [("john", 1, "x"), ("paul", 2, None)]
finally:
os.remove(f1)
os.remove(f2)
def test_import_excel(self):
self.login(username="admin")
table_name = "".join(random.choice(string.ascii_lowercase) for _ in range(5))
f1 = "testExcel.xlsx"
self.create_sample_excelfile(f1, {"a": ["john", "paul"], "b": [1, 2]})
self.enable_csv_upload(utils.get_example_database())
try:
success_msg_f1 = f'Excel file "{f1}" uploaded to table "{table_name}"'
# initial upload with fail mode
resp = self.upload_excel(f1, table_name)
self.assertIn(success_msg_f1, resp)
# upload again with fail mode; should fail
fail_msg = f'Unable to upload Excel file "{f1}" to table "{table_name}"'
resp = self.upload_excel(f1, table_name)
self.assertIn(fail_msg, resp)
# upload again with append mode
resp = self.upload_excel(f1, table_name, extra={"if_exists": "append"})
self.assertIn(success_msg_f1, resp)
# upload again with replace mode
resp = self.upload_excel(f1, table_name, extra={"if_exists": "replace"})
self.assertIn(success_msg_f1, resp)
# make sure that john and empty string are replaced with None
data = (
utils.get_example_database()
.get_sqla_engine()
.execute(f"SELECT * from {table_name}")
.fetchall()
)
assert data == [(0, "john", 1), (1, "paul", 2)]
finally:
os.remove(f1)
def test_dataframe_timezone(self):
tz = pytz.FixedOffset(60)
data = [