feat: welcome presto to the suite of tested databases (#10498)

* Add presto to the CI

Sample test data

Datetime conversion

Sample test data

Fix tests

* TODO to switch to timestamps

* Address feedback

* Update requirements

* Add TODOs

Co-authored-by: bogdan kyryliuk <bogdankyryliuk@dropbox.com>
This commit is contained in:
Bogdan
2020-08-06 12:07:22 -07:00
committed by GitHub
parent 749581d534
commit 62b873e3da
24 changed files with 440 additions and 104 deletions

View File

@@ -18,14 +18,15 @@
"""Unit tests for Superset Celery worker"""
import datetime
import json
from parameterized import parameterized
import subprocess
import time
import unittest
import unittest.mock as mock
import flask
from flask import current_app
from sqlalchemy.engine import Engine
from tests.test_app import app
from superset import db, sql_lab
@@ -38,6 +39,10 @@ from superset.sql_parse import ParsedQuery, CtasMethod
from superset.utils.core import get_example_database
from .base_tests import SupersetTestCase
from .sqllab_test_util import (
setup_presto_if_needed,
CTAS_SCHEMA_NAME,
) # noqa autoused fixture
CELERY_SHORT_SLEEP_TIME = 2
CELERY_SLEEP_TIME = 10
@@ -92,9 +97,6 @@ class TestAppContext(SupersetTestCase):
flask._app_ctx_stack.push(popped_app)
CTAS_SCHEMA_NAME = "sqllab_test_db"
class TestCelery(SupersetTestCase):
def get_query_by_name(self, sql):
query = db.session.query(Query).filter_by(sql=sql).first()
@@ -159,9 +161,10 @@ class TestCelery(SupersetTestCase):
@parameterized.expand([CtasMethod.TABLE, CtasMethod.VIEW])
def test_run_sync_query_cta(self, ctas_method):
main_db = get_example_database()
backend = main_db.backend
db_id = main_db.id
tmp_table_name = f"tmp_sync_23_{ctas_method.lower()}"
self.drop_table_if_exists(tmp_table_name, main_db)
self.drop_table_if_exists(tmp_table_name, ctas_method, main_db)
name = "James"
sql_where = f"SELECT name FROM birth_names WHERE name='{name}' LIMIT 1"
result = self.run_sql(
@@ -174,8 +177,24 @@ class TestCelery(SupersetTestCase):
)
# provide better error message
self.assertEqual(QueryStatus.SUCCESS, result["query"]["state"], msg=result)
self.assertEqual([], result["data"])
self.assertEqual([], result["columns"])
expected_result = []
if backend == "presto":
expected_result = (
[{"rows": 1}] if ctas_method == CtasMethod.TABLE else [{"result": True}]
)
self.assertEqual(expected_result, result["data"])
# TODO(bkyryliuk): refactor database specific logic into a separate class
expected_columns = []
if backend == "presto":
expected_columns = [
{
"name": "rows" if ctas_method == CtasMethod.TABLE else "result",
"type": "BIGINT" if ctas_method == CtasMethod.TABLE else "BOOLEAN",
"is_date": False,
}
]
self.assertEqual(expected_columns, result["columns"])
query2 = self.get_query_by_id(result["query"]["serverId"])
# Check the data in the tmp table.
@@ -184,7 +203,7 @@ class TestCelery(SupersetTestCase):
self.assertGreater(len(results["data"]), 0)
# cleanup tmp table
self.drop_table_if_exists(tmp_table_name, get_example_database())
self.drop_table_if_exists(tmp_table_name, ctas_method, get_example_database())
def test_run_sync_query_cta_no_data(self):
main_db = get_example_database()
@@ -198,9 +217,9 @@ class TestCelery(SupersetTestCase):
query3 = self.get_query_by_id(result3["query"]["serverId"])
self.assertEqual(QueryStatus.SUCCESS, query3.status)
def drop_table_if_exists(self, table_name, database=None):
def drop_table_if_exists(self, table_name, table_type: CtasMethod, database=None):
"""Drop table if it exists, works on any DB"""
sql = "DROP TABLE {}".format(table_name)
sql = f"DROP {table_type} {table_name}"
db_id = database.id
if database:
database.allow_dml = True
@@ -215,7 +234,8 @@ class TestCelery(SupersetTestCase):
):
main_db = get_example_database()
db_id = main_db.id
if main_db.backend == "sqlite":
backend = main_db.backend
if backend == "sqlite":
# sqlite doesn't support schemas
return
tmp_table_name = f"tmp_async_22_{ctas_method.lower()}"
@@ -223,7 +243,7 @@ class TestCelery(SupersetTestCase):
main_db.inspector.engine.dialect.identifier_preparer.quote_identifier
)
expected_full_table_name = f"{CTAS_SCHEMA_NAME}.{quote(tmp_table_name)}"
self.drop_table_if_exists(expected_full_table_name, main_db)
self.drop_table_if_exists(expected_full_table_name, ctas_method, main_db)
name = "James"
sql_where = f"SELECT name FROM birth_names WHERE name='{name}'"
result = self.run_sql(
@@ -234,10 +254,32 @@ class TestCelery(SupersetTestCase):
cta=True,
ctas_method=ctas_method,
)
self.assertEqual(QueryStatus.SUCCESS, result["query"]["state"], msg=result)
self.assertEqual([], result["data"])
self.assertEqual([], result["columns"])
expected_result = []
# TODO(bkyryliuk): refactor database specific logic into a separate class
if backend == "presto":
expected_result = (
[{"rows": 1}]
if ctas_method == CtasMethod.TABLE
else [{"result": True}]
)
self.assertEqual(expected_result, result["data"])
expected_columns = []
# TODO(bkyryliuk): refactor database specific logic into a separate class
if backend == "presto":
expected_columns = [
{
"name": "rows" if ctas_method == CtasMethod.TABLE else "result",
"type": "BIGINT"
if ctas_method == CtasMethod.TABLE
else "BOOLEAN",
"is_date": False,
}
]
self.assertEqual(expected_columns, result["columns"])
query = self.get_query_by_id(result["query"]["serverId"])
self.assertEqual(
f"CREATE {ctas_method} {CTAS_SCHEMA_NAME}.{tmp_table_name} AS \n"
@@ -246,13 +288,18 @@ class TestCelery(SupersetTestCase):
query.executed_sql,
)
self.assertEqual(
"SELECT *\n" f"FROM {CTAS_SCHEMA_NAME}.{tmp_table_name}",
"SELECT *\n" f"FROM {CTAS_SCHEMA_NAME}.{tmp_table_name}"
if backend != "presto"
else "SELECT *\n"
f"FROM {quote(CTAS_SCHEMA_NAME)}.{quote(tmp_table_name)}",
query.select_sql,
)
time.sleep(CELERY_SHORT_SLEEP_TIME)
results = self.run_sql(db_id, query.select_sql)
self.assertEqual(QueryStatus.SUCCESS, results["status"], msg=result)
self.drop_table_if_exists(expected_full_table_name, get_example_database())
self.drop_table_if_exists(
expected_full_table_name, ctas_method, get_example_database()
)
@parameterized.expand([CtasMethod.TABLE, CtasMethod.VIEW])
def test_run_async_query_cta_config(self, ctas_method):
@@ -265,12 +312,19 @@ class TestCelery(SupersetTestCase):
if main_db.backend == "sqlite":
# sqlite doesn't support schemas
return
tmp_table_name = f"sqllab_test_table_async_1_{ctas_method}"
quote = (
main_db.inspector.engine.dialect.identifier_preparer.quote_identifier
)
expected_full_table_name = f"{CTAS_SCHEMA_NAME}.{quote(tmp_table_name)}"
self.drop_table_if_exists(expected_full_table_name, main_db)
schema_name = (
quote(CTAS_SCHEMA_NAME)
if main_db.backend == "presto"
else CTAS_SCHEMA_NAME
)
expected_full_table_name = f"{schema_name}.{quote(tmp_table_name)}"
self.drop_table_if_exists(expected_full_table_name, ctas_method, main_db)
sql_where = "SELECT name FROM birth_names WHERE name='James' LIMIT 10"
result = self.run_sql(
db_id,
@@ -294,18 +348,22 @@ class TestCelery(SupersetTestCase):
"LIMIT 10",
query.executed_sql,
)
self.drop_table_if_exists(expected_full_table_name, get_example_database())
self.drop_table_if_exists(
expected_full_table_name, ctas_method, get_example_database()
)
@parameterized.expand([CtasMethod.TABLE, CtasMethod.VIEW])
def test_run_async_cta_query(self, ctas_method):
main_db = get_example_database()
db_backend = main_db.backend
db_id = main_db.id
table_name = f"tmp_async_4_{ctas_method}"
self.drop_table_if_exists(table_name, main_db)
self.drop_table_if_exists(table_name, ctas_method, main_db)
time.sleep(DROP_TABLE_SLEEP_TIME)
sql_where = "SELECT name FROM birth_names WHERE name='James' LIMIT 10"
result = self.run_sql(
db_id,
sql_where,
@@ -316,6 +374,7 @@ class TestCelery(SupersetTestCase):
ctas_method=ctas_method,
)
db.session.close()
assert result["query"]["state"] in (
QueryStatus.PENDING,
QueryStatus.RUNNING,
@@ -337,16 +396,20 @@ class TestCelery(SupersetTestCase):
query.executed_sql,
)
self.assertEqual(sql_where, query.sql)
self.assertEqual(0, query.rows)
if db_backend == "presto":
self.assertEqual(1, query.rows)
else:
self.assertEqual(0, query.rows)
self.assertEqual(True, query.select_as_cta)
self.assertEqual(True, query.select_as_cta_used)
@parameterized.expand([CtasMethod.TABLE, CtasMethod.VIEW])
def test_run_async_cta_query_with_lower_limit(self, ctas_method):
main_db = get_example_database()
db_backend = main_db.backend
db_id = main_db.id
tmp_table = f"tmp_async_2_{ctas_method}"
self.drop_table_if_exists(tmp_table, main_db)
self.drop_table_if_exists(tmp_table, ctas_method, main_db)
sql_where = "SELECT name FROM birth_names LIMIT 1"
result = self.run_sql(
@@ -359,6 +422,7 @@ class TestCelery(SupersetTestCase):
ctas_method=ctas_method,
)
db.session.close()
assert result["query"]["state"] in (
QueryStatus.PENDING,
QueryStatus.RUNNING,
@@ -377,7 +441,10 @@ class TestCelery(SupersetTestCase):
query.executed_sql,
)
self.assertEqual(sql_where, query.sql)
self.assertEqual(0, query.rows)
if db_backend == "presto":
self.assertEqual(1, query.rows)
else:
self.assertEqual(0, query.rows)
self.assertEqual(None, query.limit)
self.assertEqual(True, query.select_as_cta)
self.assertEqual(True, query.select_as_cta_used)