feat: refractored SQL-based alerting framework (#10605)

* added new tables for alerting refractor

* reformatted inheritance structure

* added workflow for updated framework

* added suggested changes

* cleaned up changes

* added obervations to alert table to enable view

* added comments

* added requested changes

* fix tests

* added styling changes

* mypy

* added requested changes

* updated operator logic

* requested changes, 1 validator, styling changes

* refactored tests

* fix test alert workflow

* fixed create_alert in test

Co-authored-by: Jason Davis <@dropbox.com>
This commit is contained in:
Jason Davis
2020-09-01 13:36:02 -07:00
committed by GitHub
parent 4572ebb600
commit b59f6b1833
12 changed files with 953 additions and 185 deletions

View File

@@ -16,18 +16,32 @@
# under the License.
"""Unit tests for alerting in Superset"""
import logging
from typing import Optional
from unittest.mock import patch
import pytest
from superset import db
from superset.models.alerts import Alert, AlertLog
from superset.models.schedules import ScheduleType
from superset.exceptions import SupersetException
from superset.models.alerts import (
Alert,
AlertLog,
SQLObservation,
SQLObserver,
Validator,
)
from superset.models.slice import Slice
from superset.tasks.alerts.observer import observe
from superset.tasks.alerts.validator import (
check_validator,
not_null_validator,
operator_validator,
)
from superset.tasks.schedules import (
AlertState,
deliver_alert,
run_alert_query,
schedule_alert_query,
evaluate_alert,
validate_observations,
)
from superset.utils import core as utils
from tests.test_app import app
@@ -40,115 +54,252 @@ logger = logging.getLogger(__name__)
@pytest.yield_fixture(scope="module")
def setup_database():
with app.app_context():
slice_id = db.session.query(Slice).all()[0].id
database_id = utils.get_example_database().id
example_database = utils.get_example_database()
example_database.get_sqla_engine().execute(
"CREATE TABLE test_table AS SELECT 1 as first, 2 as second"
)
example_database.get_sqla_engine().execute(
"INSERT INTO test_table (first, second) VALUES (3, 4)"
)
alerts = [
Alert(
id=1,
label="alert_1",
active=True,
crontab="*/1 * * * *",
sql="SELECT 0",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
),
Alert(
id=2,
label="alert_2",
active=True,
crontab="*/1 * * * *",
sql="SELECT 55",
alert_type="email",
slice_id=slice_id,
recipients="recipient1@superset.com",
slack_channel="#test_channel",
database_id=database_id,
),
Alert(
id=3,
label="alert_3",
active=False,
crontab="*/1 * * * *",
sql="UPDATE 55",
alert_type="email",
slice_id=slice_id,
database_id=database_id,
),
Alert(id=4, active=False, label="alert_4", database_id=-1),
Alert(id=5, active=False, label="alert_5", database_id=database_id),
]
db.session.bulk_save_objects(alerts)
no_observer_alert = Alert(crontab="* * * * *", label="No Observer")
db.session.add(no_observer_alert)
db.session.commit()
yield db.session
db.session.query(SQLObservation).delete()
db.session.query(SQLObserver).delete()
db.session.query(Validator).delete()
db.session.query(AlertLog).delete()
db.session.query(Alert).delete()
@patch("superset.tasks.schedules.deliver_alert")
@patch("superset.tasks.schedules.logging.Logger.error")
def test_run_alert_query(mock_error, mock_deliver_alert, setup_database):
dbsession = setup_database
# Test passing alert with null SQL result
alert1 = dbsession.query(Alert).filter_by(id=1).one()
run_alert_query(alert1.id, alert1.database_id, alert1.sql, alert1.label)
assert mock_deliver_alert.call_count == 0
assert mock_error.call_count == 0
# Test passing alert with True SQL result
alert2 = dbsession.query(Alert).filter_by(id=2).one()
run_alert_query(alert2.id, alert2.database_id, alert2.sql, alert2.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 0
# Test passing alert with error in SQL query
alert3 = dbsession.query(Alert).filter_by(id=3).one()
run_alert_query(alert3.id, alert3.database_id, alert3.sql, alert3.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 2
# Test passing alert with invalid database
alert4 = dbsession.query(Alert).filter_by(id=4).one()
run_alert_query(alert4.id, alert4.database_id, alert4.sql, alert4.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 3
# Test passing alert with no SQL statement
alert5 = dbsession.query(Alert).filter_by(id=5).one()
run_alert_query(alert5.id, alert5.database_id, alert5.sql, alert5.label)
assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 4
@patch("superset.tasks.schedules.deliver_alert")
@patch("superset.tasks.schedules.run_alert_query")
def test_schedule_alert_query(mock_run_alert, mock_deliver_alert, setup_database):
dbsession = setup_database
active_alert = dbsession.query(Alert).filter_by(id=1).one()
inactive_alert = dbsession.query(Alert).filter_by(id=3).one()
# Test that inactive alerts are no processed
schedule_alert_query(report_type=ScheduleType.alert, schedule_id=inactive_alert.id)
assert mock_run_alert.call_count == 0
assert mock_deliver_alert.call_count == 0
# Test that active alerts with no recipients passed in are processed regularly
schedule_alert_query(report_type=ScheduleType.alert, schedule_id=active_alert.id)
assert mock_run_alert.call_count == 1
assert mock_deliver_alert.call_count == 0
# Test that active alerts sent as a test are delivered immediately
schedule_alert_query(
report_type=ScheduleType.alert,
schedule_id=active_alert.id,
recipients="testing@email.com",
def create_alert(
dbsession,
sql: str,
validator_type: Optional[str] = None,
validator_config: Optional[str] = None,
) -> Alert:
alert = Alert(
label="test_alert",
active=True,
crontab="* * * * *",
slice_id=dbsession.query(Slice).all()[0].id,
recipients="recipient1@superset.com",
slack_channel="#test_channel",
)
assert mock_run_alert.call_count == 1
dbsession.add(alert)
dbsession.commit()
sql_observer = SQLObserver(
sql=sql, alert_id=alert.id, database_id=utils.get_example_database().id,
)
if validator_type and validator_config:
validator = Validator(
validator_type=validator_type, config=validator_config, alert_id=alert.id,
)
dbsession.add(validator)
dbsession.add(sql_observer)
dbsession.commit()
return alert
def test_alert_observer(setup_database):
dbsession = setup_database
# Test SQLObserver with int SQL return
alert1 = create_alert(dbsession, "SELECT 55")
observe(alert1.id)
assert alert1.sql_observer[0].observations[-1].value == 55.0
assert alert1.sql_observer[0].observations[-1].error_msg is None
# Test SQLObserver with double SQL return
alert2 = create_alert(dbsession, "SELECT 30.0 as wage")
observe(alert2.id)
assert alert2.sql_observer[0].observations[-1].value == 30.0
assert alert2.sql_observer[0].observations[-1].error_msg is None
# Test SQLObserver with NULL result
alert3 = create_alert(dbsession, "SELECT null as null_result")
observe(alert3.id)
assert alert3.sql_observer[0].observations[-1].value is None
assert alert3.sql_observer[0].observations[-1].error_msg is None
# Test SQLObserver with empty SQL return
alert4 = create_alert(dbsession, "SELECT first FROM test_table WHERE first = -1")
observe(alert4.id)
assert alert4.sql_observer[0].observations[-1].value is None
assert alert4.sql_observer[0].observations[-1].error_msg is not None
# Test SQLObserver with str result
alert5 = create_alert(dbsession, "SELECT 'test_string' as string_value")
observe(alert5.id)
assert alert5.sql_observer[0].observations[-1].value is None
assert alert5.sql_observer[0].observations[-1].error_msg is not None
# Test SQLObserver with two row result
alert6 = create_alert(dbsession, "SELECT first FROM test_table")
observe(alert6.id)
assert alert6.sql_observer[0].observations[-1].value is None
assert alert6.sql_observer[0].observations[-1].error_msg is not None
# Test SQLObserver with two column result
alert7 = create_alert(
dbsession, "SELECT first, second FROM test_table WHERE first = 1"
)
observe(alert7.id)
assert alert7.sql_observer[0].observations[-1].value is None
assert alert7.sql_observer[0].observations[-1].error_msg is not None
@patch("superset.tasks.schedules.deliver_alert")
def test_evaluate_alert(mock_deliver_alert, setup_database):
dbsession = setup_database
# Test error with Observer SQL statement
alert1 = create_alert(dbsession, "$%^&")
evaluate_alert(alert1.id, alert1.label)
assert alert1.logs[-1].state == AlertState.ERROR
# Test error with alert lacking observer
alert2 = dbsession.query(Alert).filter_by(label="No Observer").one()
evaluate_alert(alert2.id, alert2.label)
assert alert2.logs[-1].state == AlertState.ERROR
# Test pass on alert lacking validator
alert3 = create_alert(dbsession, "SELECT 55")
evaluate_alert(alert3.id, alert3.label)
assert alert3.logs[-1].state == AlertState.PASS
# Test triggering successful alert
alert4 = create_alert(dbsession, "SELECT 55", "not null", "{}")
evaluate_alert(alert4.id, alert4.label)
assert mock_deliver_alert.call_count == 1
assert alert4.logs[-1].state == AlertState.TRIGGER
def test_check_validator():
# Test with invalid operator type
with pytest.raises(SupersetException):
check_validator("greater than", "{}")
# Test with empty config
with pytest.raises(SupersetException):
check_validator("operator", "{}")
# Test with invalid operator
with pytest.raises(SupersetException):
check_validator("operator", '{"op": "is", "threshold":50.0}')
# Test with invalid operator
with pytest.raises(SupersetException):
check_validator("operator", '{"op": "is", "threshold":50.0}')
# Test with invalid threshold
with pytest.raises(SupersetException):
check_validator("operator", '{"op": "is", "threshold":"hello"}')
# Test with float threshold and no errors
assert check_validator("operator", '{"op": ">=", "threshold": 50.0}') is None
# Test with int threshold and no errors
assert check_validator("operator", '{"op": "==", "threshold": 50}') is None
def test_not_null_validator(setup_database):
dbsession = setup_database
# Test passing SQLObserver with 'null' SQL result
alert1 = create_alert(dbsession, "SELECT 0")
observe(alert1.id)
assert not_null_validator(alert1.sql_observer[0], "{}") is False
# Test passing SQLObserver with empty SQL result
alert2 = create_alert(dbsession, "SELECT first FROM test_table WHERE first = -1")
observe(alert2.id)
assert not_null_validator(alert2.sql_observer[0], "{}") is False
# Test triggering alert with non-null SQL result
alert3 = create_alert(dbsession, "SELECT 55")
observe(alert3.id)
assert not_null_validator(alert3.sql_observer[0], "{}") is True
def test_operator_validator(setup_database):
dbsession = setup_database
# Test passing SQLObserver with empty SQL result
alert1 = create_alert(dbsession, "SELECT first FROM test_table WHERE first = -1")
observe(alert1.id)
assert (
operator_validator(alert1.sql_observer[0], '{"op": ">=", "threshold": 60}')
is False
)
# Test passing SQLObserver with result that doesn't pass a greater than threshold
alert2 = create_alert(dbsession, "SELECT 55")
observe(alert2.id)
assert (
operator_validator(alert2.sql_observer[0], '{"op": ">=", "threshold": 60}')
is False
)
# Test passing SQLObserver with result that passes a greater than threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": ">=", "threshold": 40}')
is True
)
# Test passing SQLObserver with result that doesn't pass a less than threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": "<=", "threshold": 40}')
is False
)
# Test passing SQLObserver with result that passes threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": "<=", "threshold": 60}')
is True
)
# Test passing SQLObserver with result that doesn't equal threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": "==", "threshold": 60}')
is False
)
# Test passing SQLObserver with result that equals threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": "==", "threshold": 55}')
is True
)
def test_validate_observations(setup_database):
dbsession = setup_database
# Test False on alert with no validator
alert1 = create_alert(dbsession, "SELECT 55")
assert validate_observations(alert1.id, alert1.label) is False
# Test False on alert with no observations
alert2 = create_alert(dbsession, "SELECT 55", "not null", "{}")
assert validate_observations(alert2.id, alert2.label) is False
# Test False on alert that shouldnt be triggered
alert3 = create_alert(dbsession, "SELECT 0", "not null", "{}")
observe(alert3.id)
assert validate_observations(alert3.id, alert3.label) is False
# Test True on alert that should be triggered
alert4 = create_alert(
dbsession, "SELECT 55", "operator", '{"op": "<=", "threshold": 60}'
)
observe(alert4.id)
assert validate_observations(alert4.id, alert4.label) is True
@patch("superset.tasks.slack_util.WebClient.files_upload")
@@ -159,7 +310,8 @@ def test_deliver_alert_screenshot(
screenshot_mock, url_mock, email_mock, file_upload_mock, setup_database
):
dbsession = setup_database
alert = dbsession.query(Alert).filter_by(id=2).one()
alert = create_alert(dbsession, "SELECT 55")
observe(alert.id)
screenshot = read_fixture("sample.png")
screenshot_mock.return_value = screenshot
@@ -176,7 +328,9 @@ def test_deliver_alert_screenshot(
"channels": alert.slack_channel,
"file": screenshot,
"initial_comment": f"\n*Triggered Alert: {alert.label} :redalert:*\n"
f"SQL Statement:```{alert.sql}```\n<http://0.0.0.0:8080/alert/show/{alert.id}"
f"*SQL* *Statement*:```{alert.sql_observer[0].sql}```\n"
f"*SQL* *Result*: {alert.observations[-1].value}"
f"\n<http://0.0.0.0:8080/alert/show/{alert.id}"
f"|View Alert Details>\n<http://0.0.0.0:8080/superset/slice/{alert.slice_id}/"
"|*Explore in Superset*>",
"title": f"[Alert] {alert.label}",