Compare commits

...

4 Commits

Author SHA1 Message Date
Daniel Vaz Gaspar
311422f2ec fix: engines that don't support comments (#13153)
* fix: engines that don't support comments

* fix: engines that don't support comments

* add quick inexpensive test

* add test

(cherry picked from commit 9568985b7b)
2021-02-17 12:21:04 -08:00
Daniel Vaz Gaspar
ffa1d0c08f feat(db engines): add support for Opendistro Elasticsearch (AWS ES) (#12602)
* feat(db engines): add support for Opendistro Elasticsearch (AWS ES)

* add time grains

* lint

* bump elasticsearch-dbapi version

* add tests

* fix test

(cherry picked from commit b3a814fa27)
2021-02-17 12:14:40 -08:00
Daniel Vaz Gaspar
5874b92977 fix(alerts): void query with numeric comparison (#13090)
* fix(alerts): void query with numeric comparison

* remove config changes

* fix tests

* better logic

* fix logic

* fix logic

* Improve test readability

(cherry picked from commit 2e6ea76631)
2021-02-15 14:14:31 -08:00
Ville Brofeldt
92e172ec8d fix: sorting by saved metric (#13059)
(cherry picked from commit c1e10c4627)
2021-02-15 14:14:07 -08:00
10 changed files with 148 additions and 13 deletions

View File

@@ -126,7 +126,7 @@ setup(
"drill": ["sqlalchemy-drill==0.1.dev"], "drill": ["sqlalchemy-drill==0.1.dev"],
"druid": ["pydruid>=0.6.1,<0.7"], "druid": ["pydruid>=0.6.1,<0.7"],
"solr": ["sqlalchemy-solr >= 0.2.0"], "solr": ["sqlalchemy-solr >= 0.2.0"],
"elasticsearch": ["elasticsearch-dbapi>=0.1.0, <0.2.0"], "elasticsearch": ["elasticsearch-dbapi>=0.2.0, <0.3.0"],
"exasol": ["sqlalchemy-exasol>=2.1.0, <2.2"], "exasol": ["sqlalchemy-exasol>=2.1.0, <2.2"],
"excel": ["xlrd>=1.2.0, <1.3"], "excel": ["xlrd>=1.2.0, <1.3"],
"gsheets": ["gsheetsdb>=0.1.9"], "gsheets": ["gsheetsdb>=0.1.9"],

View File

@@ -1150,6 +1150,8 @@ class SqlaTable( # pylint: disable=too-many-public-methods,too-many-instance-at
col = self.adhoc_metric_to_sqla(col, columns_by_name) col = self.adhoc_metric_to_sqla(col, columns_by_name)
elif col in columns_by_name: elif col in columns_by_name:
col = columns_by_name[col].get_sqla_col() col = columns_by_name[col].get_sqla_col()
elif col in metrics_by_name:
col = metrics_by_name[col].get_sqla_col()
if isinstance(col, Label): if isinstance(col, Label):
label = col._label # pylint: disable=protected-access label = col._label # pylint: disable=protected-access

View File

@@ -153,6 +153,7 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
allows_joins = True allows_joins = True
allows_subqueries = True allows_subqueries = True
allows_column_aliases = True allows_column_aliases = True
allows_sql_comments = True
force_column_alias_quotes = False force_column_alias_quotes = False
arraysize = 0 arraysize = 0
max_column_name_length = 0 max_column_name_length = 0
@@ -857,7 +858,6 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
""" """
parsed_query = ParsedQuery(statement) parsed_query = ParsedQuery(statement)
sql = parsed_query.stripped() sql = parsed_query.stripped()
sql_query_mutator = config["SQL_QUERY_MUTATOR"] sql_query_mutator = config["SQL_QUERY_MUTATOR"]
if sql_query_mutator: if sql_query_mutator:
sql = sql_query_mutator(sql, user_name, security_manager, database) sql = sql_query_mutator(sql, user_name, security_manager, database)
@@ -933,6 +933,9 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
:param kwargs: kwargs to be passed to cursor.execute() :param kwargs: kwargs to be passed to cursor.execute()
:return: :return:
""" """
if not cls.allows_sql_comments:
query = sql_parse.strip_comments_from_sql(query)
if cls.arraysize: if cls.arraysize:
cursor.arraysize = cls.arraysize cursor.arraysize = cls.arraysize
try: try:

View File

@@ -33,6 +33,7 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho
time_secondary_columns = True time_secondary_columns = True
allows_joins = False allows_joins = False
allows_subqueries = True allows_subqueries = True
allows_sql_comments = False
_time_grain_expressions = { _time_grain_expressions = {
None: "{col}", None: "{col}",
@@ -61,3 +62,35 @@ class ElasticSearchEngineSpec(BaseEngineSpec): # pylint: disable=abstract-metho
if target_type.upper() == utils.TemporalType.DATETIME: if target_type.upper() == utils.TemporalType.DATETIME:
return f"""CAST('{dttm.isoformat(timespec="seconds")}' AS DATETIME)""" return f"""CAST('{dttm.isoformat(timespec="seconds")}' AS DATETIME)"""
return None return None
class OpenDistroEngineSpec(BaseEngineSpec): # pylint: disable=abstract-method
time_groupby_inline = True
time_secondary_columns = True
allows_joins = False
allows_subqueries = True
allows_sql_comments = False
_time_grain_expressions = {
None: "{col}",
"PT1S": "date_format({col}, 'yyyy-MM-dd HH:mm:ss.000')",
"PT1M": "date_format({col}, 'yyyy-MM-dd HH:mm:00.000')",
"PT1H": "date_format({col}, 'yyyy-MM-dd HH:00:00.000')",
"P1D": "date_format({col}, 'yyyy-MM-dd 00:00:00.000')",
"P1M": "date_format({col}, 'yyyy-MM-01 00:00:00.000')",
"P1Y": "date_format({col}, 'yyyy-01-01 00:00:00.000')",
}
engine = "odelasticsearch"
engine_name = "ElasticSearch"
@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
if target_type.upper() == utils.TemporalType.DATETIME:
return f"""'{dttm.isoformat(timespec="seconds")}'"""
return None
@staticmethod
def _mutate_label(label: str) -> str:
return label.replace(".", "_")

View File

@@ -271,8 +271,8 @@ def create_slices(
groupby=["name"], groupby=["name"],
adhoc_filters=[gen_filter("gender", "girl")], adhoc_filters=[gen_filter("gender", "girl")],
row_limit=50, row_limit=50,
timeseries_limit_metric="sum__num", timeseries_limit_metric=metric,
metrics=metrics, metrics=[metric],
), ),
), ),
Slice( Slice(
@@ -300,7 +300,8 @@ def create_slices(
groupby=["name"], groupby=["name"],
adhoc_filters=[gen_filter("gender", "boy")], adhoc_filters=[gen_filter("gender", "boy")],
row_limit=50, row_limit=50,
metrics=metrics, timeseries_limit_metric=metric,
metrics=[metric],
), ),
), ),
Slice( Slice(

View File

@@ -47,15 +47,16 @@ class AlertCommand(BaseCommand):
def run(self) -> bool: def run(self) -> bool:
self.validate() self.validate()
if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: if self._is_validator_not_null:
self._report_schedule.last_value_row_json = str(self._result) self._report_schedule.last_value_row_json = str(self._result)
return self._result not in (0, None, np.nan) return self._result is not None
self._report_schedule.last_value = self._result self._report_schedule.last_value = self._result
try: try:
operator = json.loads(self._report_schedule.validator_config_json)["op"] operator = json.loads(self._report_schedule.validator_config_json)["op"]
threshold = json.loads(self._report_schedule.validator_config_json)[ threshold = json.loads(self._report_schedule.validator_config_json)[
"threshold" "threshold"
] ]
return OPERATOR_FUNCTIONS[operator](self._result, threshold) return OPERATOR_FUNCTIONS[operator](self._result, threshold)
except (KeyError, json.JSONDecodeError): except (KeyError, json.JSONDecodeError):
raise AlertValidatorConfigError() raise AlertValidatorConfigError()
@@ -95,6 +96,18 @@ class AlertCommand(BaseCommand):
except (AssertionError, TypeError, ValueError): except (AssertionError, TypeError, ValueError):
raise AlertQueryInvalidTypeError() raise AlertQueryInvalidTypeError()
@property
def _is_validator_not_null(self) -> bool:
return (
self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL
)
@property
def _is_validator_operator(self) -> bool:
return (
self._report_schedule.validator_type == ReportScheduleValidatorType.OPERATOR
)
def validate(self) -> None: def validate(self) -> None:
""" """
Validate the query result as a Pandas DataFrame Validate the query result as a Pandas DataFrame
@@ -108,10 +121,14 @@ class AlertCommand(BaseCommand):
except Exception as ex: except Exception as ex:
raise AlertQueryError(message=str(ex)) raise AlertQueryError(message=str(ex))
if df.empty: if df.empty and self._is_validator_not_null:
self._result = None
return
if df.empty and self._is_validator_operator:
self._result = 0.0
return return
rows = df.to_records() rows = df.to_records()
if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: if self._is_validator_not_null:
self._validate_not_null(rows) self._validate_not_null(rows)
return return
self._validate_operator(rows) self._validate_operator(rows)

View File

@@ -58,6 +58,19 @@ def _extract_limit_from_query(statement: TokenList) -> Optional[int]:
return None return None
def strip_comments_from_sql(statement: str) -> str:
"""
Strips comments from a SQL statement, does a simple test first
to avoid always instantiating the expensive ParsedQuery constructor
This is useful for engines that don't support comments
:param statement: A string with the SQL statement
:return: SQL statement without comments
"""
return ParsedQuery(statement).strip_comments() if "--" in statement else statement
@dataclass(eq=True, frozen=True) @dataclass(eq=True, frozen=True)
class Table: # pylint: disable=too-few-public-methods class Table: # pylint: disable=too-few-public-methods
""" """
@@ -150,6 +163,9 @@ class ParsedQuery:
def stripped(self) -> str: def stripped(self) -> str:
return self.sql.strip(" \t\n;") return self.sql.strip(" \t\n;")
def strip_comments(self) -> str:
return sqlparse.format(self.stripped(), strip_comments=True)
def get_statements(self) -> List[str]: def get_statements(self) -> List[str]:
"""Returns a list of SQL statements as strings, stripped""" """Returns a list of SQL statements as strings, stripped"""
statements = [] statements = []

View File

@@ -14,7 +14,14 @@
# KIND, either express or implied. See the License for the # KIND, either express or implied. See the License for the
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec from unittest.mock import MagicMock
from sqlalchemy import column
from superset.db_engine_specs.elasticsearch import (
ElasticSearchEngineSpec,
OpenDistroEngineSpec,
)
from tests.db_engine_specs.base_tests import TestDbEngineSpec from tests.db_engine_specs.base_tests import TestDbEngineSpec
@@ -26,3 +33,38 @@ class TestElasticSearchDbEngineSpec(TestDbEngineSpec):
ElasticSearchEngineSpec.convert_dttm("DATETIME", dttm), ElasticSearchEngineSpec.convert_dttm("DATETIME", dttm),
"CAST('2019-01-02T03:04:05' AS DATETIME)", "CAST('2019-01-02T03:04:05' AS DATETIME)",
) )
def test_opendistro_convert_dttm(self):
"""
DB Eng Specs (opendistro): Test convert_dttm
"""
dttm = self.get_dttm()
self.assertEqual(
OpenDistroEngineSpec.convert_dttm("DATETIME", dttm),
"'2019-01-02T03:04:05'",
)
def test_opendistro_sqla_column_label(self):
"""
DB Eng Specs (opendistro): Test column label
"""
test_cases = {
"Col": "Col",
"Col.keyword": "Col_keyword",
}
for original, expected in test_cases.items():
actual = OpenDistroEngineSpec.make_label_compatible(column(original).name)
self.assertEqual(actual, expected)
def test_opendistro_strip_comments(self):
"""
DB Eng Specs (opendistro): Test execute sql strip comments
"""
mock_cursor = MagicMock()
mock_cursor.execute.return_value = []
OpenDistroEngineSpec.execute(
mock_cursor, "-- some comment \nSELECT 1\n --other comment"
)
mock_cursor.execute.assert_called_once_with("SELECT 1\n")

View File

@@ -308,7 +308,7 @@ def create_test_table_context(database: Database):
@pytest.yield_fixture( @pytest.yield_fixture(
params=["alert1", "alert2", "alert3", "alert4", "alert5", "alert6"] params=["alert1", "alert2", "alert3", "alert4", "alert5", "alert6", "alert7"]
) )
def create_no_alert_email_chart(request): def create_no_alert_email_chart(request):
param_config = { param_config = {
@@ -338,10 +338,15 @@ def create_no_alert_email_chart(request):
"validator_config_json": '{"op": "!=", "threshold": 10}', "validator_config_json": '{"op": "!=", "threshold": 10}',
}, },
"alert6": { "alert6": {
"sql": "SELECT first from test_table where first=0", "sql": "SELECT first from test_table where 1=0",
"validator_type": ReportScheduleValidatorType.NOT_NULL, "validator_type": ReportScheduleValidatorType.NOT_NULL,
"validator_config_json": "{}", "validator_config_json": "{}",
}, },
"alert7": {
"sql": "SELECT first from test_table where 1=0",
"validator_type": ReportScheduleValidatorType.OPERATOR,
"validator_config_json": '{"op": ">", "threshold": 0}',
},
} }
with app.app_context(): with app.app_context():
chart = db.session.query(Slice).first() chart = db.session.query(Slice).first()

View File

@@ -18,7 +18,7 @@ import unittest
import sqlparse import sqlparse
from superset.sql_parse import ParsedQuery, Table from superset.sql_parse import ParsedQuery, strip_comments_from_sql, Table
class TestSupersetSqlParse(unittest.TestCase): class TestSupersetSqlParse(unittest.TestCase):
@@ -732,3 +732,19 @@ class TestSupersetSqlParse(unittest.TestCase):
""" """
parsed = ParsedQuery(query, strip_comments=True) parsed = ParsedQuery(query, strip_comments=True)
assert not parsed.is_valid_ctas() assert not parsed.is_valid_ctas()
def test_strip_comments_from_sql(self):
"""Test that we are able to strip comments out of SQL stmts"""
assert (
strip_comments_from_sql("SELECT col1, col2 FROM table1")
== "SELECT col1, col2 FROM table1"
)
assert (
strip_comments_from_sql("SELECT col1, col2 FROM table1\n-- comment")
== "SELECT col1, col2 FROM table1\n"
)
assert (
strip_comments_from_sql("SELECT '--abc' as abc, col2 FROM table1\n")
== "SELECT '--abc' as abc, col2 FROM table1"
)