chore: simplify alerting data model to leverage a single class (#11179)

* Collapse alerting models into a single one

Fixing upgrade migration & tests

* Address feedback

Co-authored-by: bogdan kyryliuk <bogdankyryliuk@dropbox.com>
This commit is contained in:
Bogdan
2020-10-09 19:20:12 -07:00
committed by GitHub
parent a6fc3d2384
commit cb3f649a7f
8 changed files with 525 additions and 403 deletions

View File

@@ -151,8 +151,6 @@ class SupersetAppInitializer:
AlertLogModelView,
AlertModelView,
AlertObservationModelView,
SQLObserverInlineView,
ValidatorInlineView,
)
from superset.views.annotations import (
AnnotationLayerModelView,
@@ -411,8 +409,6 @@ class SupersetAppInitializer:
category_label=__("Manage"),
icon="fa-exclamation-triangle",
)
appbuilder.add_view_no_menu(SQLObserverInlineView)
appbuilder.add_view_no_menu(ValidatorInlineView)
appbuilder.add_view_no_menu(AlertObservationModelView)
appbuilder.add_view_no_menu(AlertLogModelView)

View File

@@ -0,0 +1,294 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Collapse alerting models into a single one
Revision ID: af30ca79208f
Revises: b56500de1855
Create Date: 2020-10-05 18:10:52.272047
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.sqlite.base import SQLiteDialect
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.orm import backref, relationship, RelationshipProperty
from superset import db
from superset.utils.core import generic_find_fk_constraint_name
revision = "af30ca79208f"
down_revision = "b56500de1855"
Base = declarative_base()
class Alert(Base):
__tablename__ = "alerts"
id = sa.Column(sa.Integer, primary_key=True)
sql = sa.Column(sa.Text, nullable=False)
validator_type = sa.Column(sa.String(100), nullable=False)
validator_config = sa.Column(sa.Text)
database_id = sa.Column(sa.Integer)
class SQLObserver(Base):
__tablename__ = "sql_observers"
id = sa.Column(sa.Integer, primary_key=True)
sql = sa.Column(sa.Text, nullable=False)
database_id = sa.Column(sa.Integer)
@declared_attr
def alert_id(self) -> int:
return sa.Column(sa.Integer, sa.ForeignKey("alerts.id"), nullable=False)
@declared_attr
def alert(self) -> RelationshipProperty:
return relationship(
"Alert",
foreign_keys=[self.alert_id],
backref=backref("sql_observer", cascade="all, delete-orphan"),
)
class Validator(Base):
__tablename__ = "alert_validators"
id = sa.Column(sa.Integer, primary_key=True)
validator_type = sa.Column(sa.String(100), nullable=False)
config = sa.Column(sa.Text)
@declared_attr
def alert_id(self) -> int:
return sa.Column(sa.Integer, sa.ForeignKey("alerts.id"), nullable=False)
@declared_attr
def alert(self) -> RelationshipProperty:
return relationship(
"Alert",
foreign_keys=[self.alert_id],
backref=backref("validators", cascade="all, delete-orphan"),
)
def upgrade():
bind = op.get_bind()
insp = sa.engine.reflection.Inspector.from_engine(bind)
if isinstance(bind.dialect, SQLiteDialect):
op.add_column(
"alerts",
sa.Column("validator_config", sa.Text(), server_default="", nullable=True),
)
op.add_column(
"alerts",
sa.Column("database_id", sa.Integer(), server_default="0", nullable=False),
)
op.add_column(
"alerts", sa.Column("sql", sa.Text(), server_default="", nullable=False)
)
op.add_column(
"alerts",
sa.Column(
"validator_type",
sa.String(length=100),
server_default="",
nullable=False,
),
)
else: # mysql does not support server_default for text fields
op.add_column(
"alerts",
sa.Column("validator_config", sa.Text(), default="", nullable=True),
)
op.add_column(
"alerts", sa.Column("database_id", sa.Integer(), default=0, nullable=False),
)
op.add_column("alerts", sa.Column("sql", sa.Text(), default="", nullable=False))
op.add_column(
"alerts",
sa.Column(
"validator_type", sa.String(length=100), default="", nullable=False
),
)
# Migrate data
session = db.Session(bind=bind)
alerts = session.query(Alert).all()
for a in alerts:
if a.sql_observer:
a.sql = a.sql_observer[0].sql
a.database_id = a.sql_observer[0].database_id
if a.validators:
a.validator_type = a.validators[0].validator_type
a.validator_config = a.validators[0].config
session.commit()
if not isinstance(bind.dialect, SQLiteDialect):
constraint = generic_find_fk_constraint_name(
"sql_observations", {"id"}, "sql_observers", insp
)
op.drop_constraint(constraint, "sql_observations", type_="foreignkey")
op.drop_column("sql_observations", "observer_id")
op.drop_table("alert_validators")
op.drop_table("sql_observers")
# sqlite does not support column and fk deletion
if isinstance(bind.dialect, SQLiteDialect):
op.drop_table("sql_observations")
op.create_table(
"sql_observations",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("dttm", sa.DateTime(), nullable=True),
sa.Column("alert_id", sa.Integer(), nullable=True),
sa.Column("value", sa.Float(), nullable=True),
sa.Column("error_msg", sa.String(length=500), nullable=True),
sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"],),
sa.PrimaryKeyConstraint("id"),
)
else:
op.create_foreign_key(None, "alerts", "dbs", ["database_id"], ["id"])
def downgrade():
bind = op.get_bind()
insp = sa.engine.reflection.Inspector.from_engine(bind)
op.create_table(
"sql_observers",
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.Column("changed_on", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("sql", sa.Text(), nullable=False),
sa.Column("created_by_fk", sa.Integer(), autoincrement=False, nullable=True),
sa.Column("changed_by_fk", sa.Integer(), autoincrement=False, nullable=True),
sa.Column("alert_id", sa.Integer(), autoincrement=False, nullable=False),
sa.Column("database_id", sa.Integer(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"]),
sa.ForeignKeyConstraint(["changed_by_fk"], ["ab_user.id"]),
sa.ForeignKeyConstraint(["created_by_fk"], ["ab_user.id"]),
sa.ForeignKeyConstraint(["database_id"], ["dbs.id"]),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"alert_validators",
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.Column("changed_on", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("validator_type", sa.String(length=100), nullable=False,),
sa.Column("config", sa.Text(), nullable=True),
sa.Column("created_by_fk", sa.Integer(), autoincrement=False, nullable=True),
sa.Column("changed_by_fk", sa.Integer(), autoincrement=False, nullable=True),
sa.Column("alert_id", sa.Integer(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["alert_id"], ["alerts.id"], name="alert_validators_ibfk_1"
),
sa.ForeignKeyConstraint(
["changed_by_fk"], ["ab_user.id"], name="alert_validators_ibfk_2"
),
sa.ForeignKeyConstraint(
["created_by_fk"], ["ab_user.id"], name="alert_validators_ibfk_3"
),
sa.PrimaryKeyConstraint("id"),
)
# Migrate data
session = db.Session(bind=bind)
alerts = session.query(Alert).all()
for a in alerts:
if a.sql:
ob = SQLObserver(sql=a.sql, database_id=a.database_id)
a.sql_observer.append(ob)
session.add(ob)
if a.validator_type:
val = Validator(
validator_type=a.validator_type,
config=a.validator_config,
alert_id=a.id,
)
a.validators.append(val)
session.add(val)
session.commit()
# sqlite does not support dropping columns
if isinstance(bind.dialect, SQLiteDialect):
op.add_column(
"sql_observations",
sa.Column(
"observer_id",
sa.Integer(),
autoincrement=False,
nullable=False,
server_default="0",
),
)
op.drop_table("alerts")
op.create_table(
"alerts",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("label", sa.String(length=150), nullable=False),
sa.Column("active", sa.Boolean(), nullable=True),
sa.Column("crontab", sa.String(length=50), nullable=False),
sa.Column("alert_type", sa.String(length=50), nullable=True),
sa.Column("log_retention", sa.Integer(), nullable=False, default=90),
sa.Column(
"grace_period", sa.Integer(), nullable=False, default=60 * 60 * 24
),
sa.Column("recipients", sa.Text(), nullable=True),
sa.Column("slice_id", sa.Integer(), nullable=True),
sa.Column("dashboard_id", sa.Integer(), nullable=True),
sa.Column("last_eval_dttm", sa.DateTime(), nullable=True),
sa.Column("last_state", sa.String(length=10), nullable=True),
sa.Column("changed_by_fk", sa.Integer(), nullable=True),
sa.Column("changed_on", sa.DateTime(), nullable=True),
sa.Column("created_by_fk", sa.Integer(), nullable=True),
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.Column("slack_channel", sa.Text(), nullable=True),
sa.ForeignKeyConstraint(["dashboard_id"], ["dashboards.id"],),
sa.ForeignKeyConstraint(["slice_id"], ["slices.id"],),
sa.ForeignKeyConstraint(["created_by_fk"], ["ab_user.id"],),
sa.ForeignKeyConstraint(["changed_by_fk"], ["ab_user.id"],),
sa.PrimaryKeyConstraint("id"),
)
else:
op.add_column(
"sql_observations",
sa.Column(
"observer_id",
sa.Integer(),
autoincrement=False,
nullable=False,
default=0,
),
)
constraint = generic_find_fk_constraint_name("alerts", {"id"}, "dbs", insp)
op.drop_constraint(constraint, "alerts", type_="foreignkey")
op.drop_column("alerts", "validator_type")
op.drop_column("alerts", "sql")
op.drop_column("alerts", "database_id")
op.drop_column("alerts", "validator_config")
op.create_foreign_key(
"sql_observations_ibfk_2",
"sql_observations",
"sql_observers",
["observer_id"],
["id"],
)

View File

@@ -80,9 +80,63 @@ class Alert(Model, AuditMixinNullable):
last_eval_dttm = Column(DateTime, default=datetime.utcnow)
last_state = Column(String(10))
# Observation related columns
sql = Column(Text, nullable=False)
# Validation related columns
validator_type = Column(String(100), nullable=False)
validator_config = Column(
Text,
default=textwrap.dedent(
"""
{
}
"""
),
)
@declared_attr
def database_id(self) -> int:
return Column(Integer, ForeignKey("dbs.id"), nullable=False)
@declared_attr
def database(self) -> RelationshipProperty:
return relationship(
"Database",
foreign_keys=[self.database_id],
backref=backref("sql_observers", cascade="all, delete-orphan"),
)
def get_last_observation(self) -> Optional[Any]:
observations = list(
db.session.query(SQLObservation)
.filter_by(alert_id=self.id)
.order_by(SQLObservation.dttm.desc())
.limit(1)
)
if observations:
return observations[0]
return None
def __str__(self) -> str:
return f"<{self.id}:{self.label}>"
@property
def pretty_config(self) -> str:
""" String representing the comparison that will trigger a validator """
config = json.loads(self.validator_config)
if self.validator_type.lower() == "operator":
return f"{config['op']} {config['threshold']}"
if self.validator_type.lower() == "not null":
return "!= Null or 0"
return ""
class AlertLog(Model):
"""Keeps track of alert-related operations"""
@@ -105,65 +159,13 @@ class AlertLog(Model):
# TODO: Currently SQLObservation table will constantly grow with no limit,
# add some retention restriction or more to a more scalable db e.g.
# https://github.com/apache/incubator-superset/blob/master/superset/utils/log.py#L32
class SQLObserver(Model, AuditMixinNullable):
"""Runs SQL-based queries for alerts"""
__tablename__ = "sql_observers"
id = Column(Integer, primary_key=True)
sql = Column(Text, nullable=False)
@declared_attr
def alert_id(self) -> int:
return Column(Integer, ForeignKey("alerts.id"), nullable=False)
@declared_attr
def alert(self) -> RelationshipProperty:
return relationship(
"Alert",
foreign_keys=[self.alert_id],
backref=backref("sql_observer", cascade="all, delete-orphan"),
)
@declared_attr
def database_id(self) -> int:
return Column(Integer, ForeignKey("dbs.id"), nullable=False)
@declared_attr
def database(self) -> RelationshipProperty:
return relationship(
"Database",
foreign_keys=[self.database_id],
backref=backref("sql_observers", cascade="all, delete-orphan"),
)
def get_last_observation(self) -> Optional[Any]:
observations = list(
db.session.query(SQLObservation)
.filter_by(observer_id=self.id)
.order_by(SQLObservation.dttm.desc())
.limit(1)
)
if observations:
return observations[0]
return None
class SQLObservation(Model): # pylint: disable=too-few-public-methods
"""Keeps track of values retrieved from SQLObservers"""
"""Keeps track of the collected observations for alerts."""
__tablename__ = "sql_observations"
id = Column(Integer, primary_key=True)
dttm = Column(DateTime, default=datetime.utcnow, index=True)
observer_id = Column(Integer, ForeignKey("sql_observers.id"), nullable=False)
observer = relationship(
"SQLObserver",
foreign_keys=[observer_id],
backref=backref("observations", cascade="all, delete-orphan"),
)
alert_id = Column(Integer, ForeignKey("alerts.id"))
alert = relationship(
"Alert",
@@ -172,47 +174,3 @@ class SQLObservation(Model): # pylint: disable=too-few-public-methods
)
value = Column(Float)
error_msg = Column(String(500))
class Validator(Model, AuditMixinNullable):
"""Used to determine how an alert and its observations should be validated"""
__tablename__ = "alert_validators"
id = Column(Integer, primary_key=True)
validator_type = Column(String(100), nullable=False)
config = Column(
Text,
default=textwrap.dedent(
"""
{
}
"""
),
)
@declared_attr
def alert_id(self) -> int:
return Column(Integer, ForeignKey("alerts.id"), nullable=False)
@declared_attr
def alert(self) -> RelationshipProperty:
return relationship(
"Alert",
foreign_keys=[self.alert_id],
backref=backref("validators", cascade="all, delete-orphan"),
)
@property
def pretty_config(self) -> str:
""" String representing the comparison that will trigger a validator """
config = json.loads(self.config)
if self.validator_type.lower() == "operator":
return f"{config['op']} {config['threshold']}"
if self.validator_type.lower() == "not null":
return "!= Null or 0"
return ""

View File

@@ -31,20 +31,17 @@ logger = logging.getLogger("tasks.email_reports")
# Session needs to be passed along in the celery workers and db.session cannot be used.
# For more info see: https://github.com/apache/incubator-superset/issues/10530
def observe(alert_id: int, session: Session) -> Optional[str]:
"""
Runs the SQL query in an alert's SQLObserver and then
stores the result in a SQLObservation.
"""Collect observations for the alert.
Returns an error message if the observer value was not valid
"""
alert = session.query(Alert).filter_by(id=alert_id).one()
sql_observer = alert.sql_observer[0]
value = None
tp = jinja_context.get_template_processor(database=sql_observer.database)
rendered_sql = tp.process_template(sql_observer.sql)
df = sql_observer.database.get_df(rendered_sql)
tp = jinja_context.get_template_processor(database=alert.database)
rendered_sql = tp.process_template(alert.sql)
df = alert.database.get_df(rendered_sql)
error_msg = validate_observer_result(df, alert.id, alert.label)
@@ -52,11 +49,7 @@ def observe(alert_id: int, session: Session) -> Optional[str]:
value = float(df.to_records()[0][1])
observation = SQLObservation(
observer_id=sql_observer.id,
alert_id=alert_id,
dttm=datetime.utcnow(),
value=value,
error_msg=error_msg,
alert_id=alert_id, dttm=datetime.utcnow(), value=value, error_msg=error_msg,
)
session.add(observation)

View File

@@ -22,12 +22,12 @@ from typing import Callable, Optional
import numpy as np
from superset.exceptions import SupersetException
from superset.models.alerts import SQLObserver
from superset.models.alerts import Alert
OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne}
class AlertValidatorType(enum.Enum):
class AlertValidatorType(str, enum.Enum):
not_null = "not null"
operator = "operator"
@@ -67,11 +67,11 @@ def check_validator(validator_type: str, config: str) -> None:
def not_null_validator(
observer: SQLObserver, validator_config: str # pylint: disable=unused-argument
alert: Alert, validator_config: str # pylint: disable=unused-argument
) -> bool:
"""Returns True if a SQLObserver's recent observation is not NULL"""
"""Returns True if a recent observation is not NULL"""
observation = observer.get_last_observation()
observation = alert.get_last_observation()
# TODO: Validate malformed observations/observations with errors separately
if (
not observation
@@ -82,12 +82,12 @@ def not_null_validator(
return True
def operator_validator(observer: SQLObserver, validator_config: str) -> bool:
def operator_validator(alert: Alert, validator_config: str) -> bool:
"""
Returns True if a SQLObserver's recent observation is greater than or equal to
Returns True if a recent observation is greater than or equal to
the value given in the validator config
"""
observation = observer.get_last_observation()
observation = alert.get_last_observation()
if not observation or observation.value in (None, np.nan):
return False
@@ -98,7 +98,7 @@ def operator_validator(observer: SQLObserver, validator_config: str) -> bool:
def get_validator_function(
validator_type: str,
) -> Optional[Callable[[SQLObserver, str], bool]]:
) -> Optional[Callable[[Alert, str], bool]]:
"""Returns a validation function based on validator_type"""
alert_validators = {

View File

@@ -591,15 +591,13 @@ def deliver_alert(
recipients = recipients or alert.recipients
slack_channel = slack_channel or alert.slack_channel
validation_error_message = (
str(alert.observations[-1].value) + " " + alert.validators[0].pretty_config
if alert.validators
else ""
str(alert.observations[-1].value) + " " + alert.pretty_config
)
if alert.slice:
alert_content = AlertContent(
alert.label,
alert.sql_observer[0].sql,
alert.sql,
str(alert.observations[-1].value),
validation_error_message,
_get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id),
@@ -609,7 +607,7 @@ def deliver_alert(
# TODO: dashboard delivery!
alert_content = AlertContent(
alert.label,
alert.sql_observer[0].sql,
alert.sql,
str(alert.observations[-1].value),
validation_error_message,
_get_url_path("AlertModelView.show", user_friendly=True, pk=alert_id),
@@ -743,12 +741,8 @@ def validate_observations(alert_id: int, label: str, session: Session) -> bool:
logger.info("Validating observations for alert <%s:%s>", alert_id, label)
alert = session.query(Alert).get(alert_id)
if not alert.validators:
return False
validator = alert.validators[0]
validate = get_validator_function(validator.validator_type)
return bool(validate and validate(alert.sql_observer[0], validator.config))
validate = get_validator_function(alert.validator_type)
return bool(validate and validate(alert, alert.validator_config))
def next_schedules(

View File

@@ -20,13 +20,7 @@ from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_babel import lazy_gettext as _
from superset.constants import RouteMethod
from superset.models.alerts import (
Alert,
AlertLog,
SQLObservation,
SQLObserver,
Validator,
)
from superset.models.alerts import Alert, AlertLog, SQLObservation
from superset.tasks.alerts.validator import check_validator
from superset.utils import core as utils
from superset.utils.core import get_email_address_str, markdown
@@ -69,111 +63,6 @@ class AlertObservationModelView(
}
# TODO: add a button to the form to test if the SQL statment can run with no errors
class SQLObserverInlineView( # pylint: disable=too-many-ancestors
CompactCRUDMixin, SupersetModelView
):
datamodel = SQLAInterface(SQLObserver)
include_route_methods = RouteMethod.RELATED_VIEW_SET | RouteMethod.API_SET
list_title = _("SQL Observers")
show_title = _("Show SQL Observer")
add_title = _("Add SQL Observer")
edit_title = _("Edit SQL Observer")
edit_columns = [
"alert",
"database",
"sql",
]
add_columns = edit_columns
list_columns = ["alert.label", "database", "sql"]
label_columns = {
"alert": _("Alert"),
"database": _("Database"),
"sql": _("SQL"),
}
description_columns = {
"sql": _(
"A SQL statement that defines whether the alert should get triggered or "
"not. The query is expected to return either NULL or a number value."
)
}
def pre_add(self, item: "SQLObserverInlineView") -> None:
if item.alert.sql_observer and item.alert.sql_observer[0].id != item.id:
raise SupersetException("Error: An alert should only have one observer.")
class ValidatorInlineView( # pylint: disable=too-many-ancestors
CompactCRUDMixin, SupersetModelView
):
datamodel = SQLAInterface(Validator)
include_route_methods = RouteMethod.RELATED_VIEW_SET | RouteMethod.API_SET
list_title = _("Validators")
show_title = _("Show Validator")
add_title = _("Add Validator")
edit_title = _("Edit Validator")
edit_columns = [
"alert",
"validator_type",
"config",
]
add_columns = edit_columns
list_columns = [
"alert.label",
"validator_type",
"pretty_config",
]
label_columns = {
"validator_type": _("Validator Type"),
"alert": _("Alert"),
}
description_columns = {
"validator_type": utils.markdown(
"Determines when to trigger alert based off value from SQLObserver query. "
"Alerts will be triggered with these validator types:"
"<ul><li>Not Null - When the return value is Not NULL, Empty, or 0</li>"
"<li>Operator - When `sql_return_value comparison_operator threshold`"
" is True e.g. `50 <= 75`<br>Supports the comparison operators <, <=, "
">, >=, ==, and !=</li></ul>",
True,
),
"config": utils.markdown(
"JSON string containing values the validator will compare against. "
"Each validator needs the following values:"
"<ul><li>Not Null - Nothing. You can leave the config as it is.</li>"
'<li>Operator<ul><li>`"op": "operator"` with an operator from ["<", '
'"<=", ">", ">=", "==", "!="] e.g. `"op": ">="`</li>'
'<li>`"threshold": threshold_value` e.g. `"threshold": 50`'
'</li></ul>Example config:<br>{<br> "op":">=",<br>"threshold": 60<br>}'
"</li></ul>",
True,
),
}
def pre_add(self, item: "ValidatorInlineView") -> None:
if item.alert.validators and item.alert.validators[0].id != item.id:
raise SupersetException(
"Error: Alerts currently only support 1 validator per alert."
)
item.validator_type = item.validator_type.lower()
check_validator(item.validator_type, item.config)
def pre_update(self, item: "ValidatorInlineView") -> None:
item.validator_type = item.validator_type.lower()
check_validator(item.validator_type, item.config)
class AlertModelView(SupersetModelView): # pylint: disable=too-many-ancestors
datamodel = SQLAInterface(Alert)
route_base = "/alert"
@@ -181,6 +70,9 @@ class AlertModelView(SupersetModelView): # pylint: disable=too-many-ancestors
list_columns = (
"label",
"database",
"sql",
"pretty_config",
"crontab",
"last_eval_dttm",
"last_state",
@@ -189,6 +81,10 @@ class AlertModelView(SupersetModelView): # pylint: disable=too-many-ancestors
)
show_columns = (
"label",
"database",
"sql",
"validator_type",
"validator_config",
"active",
"crontab",
"owners",
@@ -203,6 +99,10 @@ class AlertModelView(SupersetModelView): # pylint: disable=too-many-ancestors
order_columns = ["label", "last_eval_dttm", "last_state", "active"]
add_columns = (
"label",
"database",
"sql",
"validator_type",
"validator_config",
"active",
"crontab",
# TODO: implement different types of alerts
@@ -232,14 +132,36 @@ class AlertModelView(SupersetModelView): # pylint: disable=too-many-ancestors
"Once an alert is triggered, how long, in seconds, before "
"Superset nags you again."
),
"sql": _(
"A SQL statement that defines whether the alert should get triggered or "
"not. The query is expected to return either NULL or a number value."
),
"validator_type": utils.markdown(
"Determines when to trigger alert based off value from alert query. "
"Alerts will be triggered with these validator types:"
"<ul><li>Not Null - When the return value is Not NULL, Empty, or 0</li>"
"<li>Operator - When `sql_return_value comparison_operator threshold`"
" is True e.g. `50 <= 75`<br>Supports the comparison operators <, <=, "
">, >=, ==, and !=</li></ul>",
True,
),
"validator_config": utils.markdown(
"JSON string containing values the validator will compare against. "
"Each validator needs the following values:"
"<ul><li>Not Null - Nothing. You can leave the config as it is.</li>"
'<li>Operator<ul><li>`"op": "operator"` with an operator from ["<", '
'"<=", ">", ">=", "==", "!="] e.g. `"op": ">="`</li>'
'<li>`"threshold": threshold_value` e.g. `"threshold": 50`'
'</li></ul>Example config:<br>{<br> "op":">=",<br>"threshold": 60<br>}'
"</li></ul>",
True,
),
}
edit_columns = add_columns
related_views = [
AlertObservationModelView,
AlertLogModelView,
ValidatorInlineView,
SQLObserverInlineView,
]
def pre_add(self, item: "AlertModelView") -> None:
@@ -248,5 +170,12 @@ class AlertModelView(SupersetModelView): # pylint: disable=too-many-ancestors
if not croniter.is_valid(item.crontab):
raise SupersetException("Invalid crontab format")
item.validator_type = item.validator_type.lower()
check_validator(item.validator_type, item.validator_config)
def pre_update(self, item: "AlertModelView") -> None:
item.validator_type = item.validator_type.lower()
check_validator(item.validator_type, item.validator_config)
def post_update(self, item: "AlertModelView") -> None:
self.post_add(item)

View File

@@ -15,24 +15,21 @@
# specific language governing permissions and limitations
# under the License.
"""Unit tests for alerting in Superset"""
import json
import logging
from typing import Optional
from unittest.mock import patch
import pytest
from sqlalchemy.orm import Session
from superset import db
from superset.exceptions import SupersetException
from superset.models.alerts import (
Alert,
AlertLog,
SQLObservation,
SQLObserver,
Validator,
)
from superset.models.alerts import Alert, AlertLog, SQLObservation
from superset.models.slice import Slice
from superset.tasks.alerts.observer import observe
from superset.tasks.alerts.validator import (
AlertValidatorType,
check_validator,
not_null_validator,
operator_validator,
@@ -62,101 +59,89 @@ def setup_database():
"INSERT INTO test_table (first, second) VALUES (3, 4)"
)
no_observer_alert = Alert(crontab="* * * * *", label="No Observer")
db.session.add(no_observer_alert)
db.session.commit()
yield db.session
db.session.query(SQLObservation).delete()
db.session.query(SQLObserver).delete()
db.session.query(Validator).delete()
db.session.query(AlertLog).delete()
db.session.query(Alert).delete()
db.session.commit()
example_database.get_sqla_engine().execute("DROP TABLE test_table")
def create_alert(
dbsession,
db_session: Session,
sql: str,
validator_type: Optional[str] = None,
validator_config: Optional[str] = None,
validator_type: AlertValidatorType = AlertValidatorType.operator,
validator_config: str = "",
) -> Alert:
db_session.commit()
alert = Alert(
label="test_alert",
active=True,
crontab="* * * * *",
slice_id=dbsession.query(Slice).all()[0].id,
slice_id=db_session.query(Slice).all()[0].id,
recipients="recipient1@superset.com",
slack_channel="#test_channel",
sql=sql,
database_id=utils.get_example_database().id,
validator_type=validator_type,
validator_config=validator_config,
)
dbsession.add(alert)
dbsession.commit()
sql_observer = SQLObserver(
sql=sql, alert_id=alert.id, database_id=utils.get_example_database().id,
)
if validator_type and validator_config:
validator = Validator(
validator_type=validator_type, config=validator_config, alert_id=alert.id,
)
dbsession.add(validator)
dbsession.add(sql_observer)
dbsession.commit()
db_session.add(alert)
db_session.commit()
return alert
def test_alert_observer(setup_database):
dbsession = setup_database
db_session = setup_database
# Test SQLObserver with int SQL return
alert1 = create_alert(dbsession, "SELECT 55")
observe(alert1.id, dbsession)
assert alert1.sql_observer[0].observations[-1].value == 55.0
assert alert1.sql_observer[0].observations[-1].error_msg is None
# Test int SQL return
alert1 = create_alert(db_session, "SELECT 55")
observe(alert1.id, db_session)
assert alert1.observations[-1].value == 55.0
assert alert1.observations[-1].error_msg is None
# Test SQLObserver with double SQL return
alert2 = create_alert(dbsession, "SELECT 30.0 as wage")
observe(alert2.id, dbsession)
assert alert2.sql_observer[0].observations[-1].value == 30.0
assert alert2.sql_observer[0].observations[-1].error_msg is None
# Test double SQL return
alert2 = create_alert(db_session, "SELECT 30.0 as wage")
observe(alert2.id, db_session)
assert alert2.observations[-1].value == 30.0
assert alert2.observations[-1].error_msg is None
# Test SQLObserver with NULL result
alert3 = create_alert(dbsession, "SELECT null as null_result")
observe(alert3.id, dbsession)
assert alert3.sql_observer[0].observations[-1].value is None
assert alert3.sql_observer[0].observations[-1].error_msg is None
# Test NULL result
alert3 = create_alert(db_session, "SELECT null as null_result")
observe(alert3.id, db_session)
assert alert3.observations[-1].value is None
assert alert3.observations[-1].error_msg is None
# Test SQLObserver with empty SQL return, expected
alert4 = create_alert(dbsession, "SELECT first FROM test_table WHERE first = -1")
observe(alert4.id, dbsession)
assert alert4.sql_observer[0].observations[-1].value is None
assert alert4.sql_observer[0].observations[-1].error_msg is None
# Test empty SQL return, expected
alert4 = create_alert(db_session, "SELECT first FROM test_table WHERE first = -1")
observe(alert4.id, db_session)
assert alert4.observations[-1].value is None
assert alert4.observations[-1].error_msg is None
# Test SQLObserver with str result
alert5 = create_alert(dbsession, "SELECT 'test_string' as string_value")
observe(alert5.id, dbsession)
assert alert5.sql_observer[0].observations[-1].value is None
assert alert5.sql_observer[0].observations[-1].error_msg is not None
# Test str result
alert5 = create_alert(db_session, "SELECT 'test_string' as string_value")
observe(alert5.id, db_session)
assert alert5.observations[-1].value is None
assert alert5.observations[-1].error_msg is not None
# Test SQLObserver with two row result
alert6 = create_alert(dbsession, "SELECT first FROM test_table")
observe(alert6.id, dbsession)
assert alert6.sql_observer[0].observations[-1].value is None
assert alert6.sql_observer[0].observations[-1].error_msg is not None
# Test two row result
alert6 = create_alert(db_session, "SELECT first FROM test_table")
observe(alert6.id, db_session)
assert alert6.observations[-1].value is None
assert alert6.observations[-1].error_msg is not None
# Test SQLObserver with two column result
# Test two column result
alert7 = create_alert(
dbsession, "SELECT first, second FROM test_table WHERE first = 1"
db_session, "SELECT first, second FROM test_table WHERE first = 1"
)
observe(alert7.id, dbsession)
assert alert7.sql_observer[0].observations[-1].value is None
assert alert7.sql_observer[0].observations[-1].error_msg is not None
observe(alert7.id, db_session)
assert alert7.observations[-1].value is None
assert alert7.observations[-1].error_msg is not None
# Test multiline SQLObserver
# Test multiline sql
alert8 = create_alert(
dbsession,
db_session,
"""
-- comment
SELECT
@@ -165,41 +150,38 @@ def test_alert_observer(setup_database):
WHERE first = 1
""",
)
observe(alert8.id, dbsession)
assert alert8.sql_observer[0].observations[-1].value == 1.0
assert alert8.sql_observer[0].observations[-1].error_msg is None
observe(alert8.id, db_session)
assert alert8.observations[-1].value == 1.0
assert alert8.observations[-1].error_msg is None
# Test jinja
alert9 = create_alert(dbsession, "SELECT {{ 2 }}")
observe(alert9.id, dbsession)
assert alert9.sql_observer[0].observations[-1].value == 2.0
assert alert9.sql_observer[0].observations[-1].error_msg is None
alert9 = create_alert(db_session, "SELECT {{ 2 }}")
observe(alert9.id, db_session)
assert alert9.observations[-1].value == 2.0
assert alert9.observations[-1].error_msg is None
@patch("superset.tasks.schedules.deliver_alert")
def test_evaluate_alert(mock_deliver_alert, setup_database):
dbsession = setup_database
db_session = setup_database
# Test error with Observer SQL statement
alert1 = create_alert(dbsession, "$%^&")
evaluate_alert(alert1.id, alert1.label, dbsession)
alert1 = create_alert(db_session, "$%^&")
evaluate_alert(alert1.id, alert1.label, db_session)
assert alert1.logs[-1].state == AlertState.ERROR
# Test error with alert lacking observer
alert2 = dbsession.query(Alert).filter_by(label="No Observer").one()
evaluate_alert(alert2.id, alert2.label, dbsession)
assert alert2.logs[-1].state == AlertState.ERROR
# Test pass on alert lacking validator
alert3 = create_alert(dbsession, "SELECT 55")
evaluate_alert(alert3.id, alert3.label, dbsession)
assert alert3.logs[-1].state == AlertState.PASS
# Test pass on alert lacking validator config
alert2 = create_alert(db_session, "SELECT 55")
# evaluation fails if config is malformed
with pytest.raises(json.decoder.JSONDecodeError):
evaluate_alert(alert2.id, alert2.label, db_session)
assert not alert2.logs
# Test triggering successful alert
alert4 = create_alert(dbsession, "SELECT 55", "not null", "{}")
evaluate_alert(alert4.id, alert4.label, dbsession)
alert3 = create_alert(db_session, "SELECT 55", "not null", "{}")
evaluate_alert(alert3.id, alert3.label, db_session)
assert mock_deliver_alert.call_count == 1
assert alert4.logs[-1].state == AlertState.TRIGGER
assert alert3.logs[-1].state == AlertState.TRIGGER
def test_check_validator():
@@ -231,101 +213,77 @@ def test_check_validator():
def test_not_null_validator(setup_database):
dbsession = setup_database
db_session = setup_database
# Test passing SQLObserver with 'null' SQL result
alert1 = create_alert(dbsession, "SELECT 0")
observe(alert1.id, dbsession)
assert not_null_validator(alert1.sql_observer[0], "{}") is False
# Test passing with 'null' SQL result
alert1 = create_alert(db_session, "SELECT 0")
observe(alert1.id, db_session)
assert not_null_validator(alert1, "{}") is False
# Test passing SQLObserver with empty SQL result
alert2 = create_alert(dbsession, "SELECT first FROM test_table WHERE first = -1")
observe(alert2.id, dbsession)
assert not_null_validator(alert2.sql_observer[0], "{}") is False
# Test passing with empty SQL result
alert2 = create_alert(db_session, "SELECT first FROM test_table WHERE first = -1")
observe(alert2.id, db_session)
assert not_null_validator(alert2, "{}") is False
# Test triggering alert with non-null SQL result
alert3 = create_alert(dbsession, "SELECT 55")
observe(alert3.id, dbsession)
assert not_null_validator(alert3.sql_observer[0], "{}") is True
alert3 = create_alert(db_session, "SELECT 55")
observe(alert3.id, db_session)
assert not_null_validator(alert3, "{}") is True
def test_operator_validator(setup_database):
dbsession = setup_database
# Test passing SQLObserver with empty SQL result
# Test passing with empty SQL result
alert1 = create_alert(dbsession, "SELECT first FROM test_table WHERE first = -1")
observe(alert1.id, dbsession)
assert (
operator_validator(alert1.sql_observer[0], '{"op": ">=", "threshold": 60}')
is False
)
assert operator_validator(alert1, '{"op": ">=", "threshold": 60}') is False
# ensure that 0 threshold works
assert (
operator_validator(alert1.sql_observer[0], '{"op": ">=", "threshold": 0}')
is False
)
assert operator_validator(alert1, '{"op": ">=", "threshold": 0}') is False
# Test passing SQLObserver with result that doesn't pass a greater than threshold
# Test passing with result that doesn't pass a greater than threshold
alert2 = create_alert(dbsession, "SELECT 55")
observe(alert2.id, dbsession)
assert (
operator_validator(alert2.sql_observer[0], '{"op": ">=", "threshold": 60}')
is False
)
assert operator_validator(alert2, '{"op": ">=", "threshold": 60}') is False
# Test passing SQLObserver with result that passes a greater than threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": ">=", "threshold": 40}')
is True
)
# Test passing with result that passes a greater than threshold
assert operator_validator(alert2, '{"op": ">=", "threshold": 40}') is True
# Test passing SQLObserver with result that doesn't pass a less than threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": "<=", "threshold": 40}')
is False
)
# Test passing with result that doesn't pass a less than threshold
assert operator_validator(alert2, '{"op": "<=", "threshold": 40}') is False
# Test passing SQLObserver with result that passes threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": "<=", "threshold": 60}')
is True
)
# Test passing with result that passes threshold
assert operator_validator(alert2, '{"op": "<=", "threshold": 60}') is True
# Test passing SQLObserver with result that doesn't equal threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": "==", "threshold": 60}')
is False
)
# Test passing with result that doesn't equal threshold
assert operator_validator(alert2, '{"op": "==", "threshold": 60}') is False
# Test passing SQLObserver with result that equals threshold
assert (
operator_validator(alert2.sql_observer[0], '{"op": "==", "threshold": 55}')
is True
)
# Test passing with result that equals threshold
assert operator_validator(alert2, '{"op": "==", "threshold": 55}') is True
def test_validate_observations(setup_database):
dbsession = setup_database
db_session = setup_database
# Test False on alert with no validator
alert1 = create_alert(dbsession, "SELECT 55")
assert validate_observations(alert1.id, alert1.label, dbsession) is False
alert1 = create_alert(db_session, "SELECT 55")
assert validate_observations(alert1.id, alert1.label, db_session) is False
# Test False on alert with no observations
alert2 = create_alert(dbsession, "SELECT 55", "not null", "{}")
assert validate_observations(alert2.id, alert2.label, dbsession) is False
alert2 = create_alert(db_session, "SELECT 55", "not null", "{}")
assert validate_observations(alert2.id, alert2.label, db_session) is False
# Test False on alert that shouldnt be triggered
alert3 = create_alert(dbsession, "SELECT 0", "not null", "{}")
observe(alert3.id, dbsession)
assert validate_observations(alert3.id, alert3.label, dbsession) is False
alert3 = create_alert(db_session, "SELECT 0", "not null", "{}")
observe(alert3.id, db_session)
assert validate_observations(alert3.id, alert3.label, db_session) is False
# Test True on alert that should be triggered
alert4 = create_alert(
dbsession, "SELECT 55", "operator", '{"op": "<=", "threshold": 60}'
db_session, "SELECT 55", "operator", '{"op": "<=", "threshold": 60}'
)
observe(alert4.id, dbsession)
assert validate_observations(alert4.id, alert4.label, dbsession) is True
observe(alert4.id, db_session)
assert validate_observations(alert4.id, alert4.label, db_session) is True
@patch("superset.tasks.slack_util.WebClient.files_upload")
@@ -354,9 +312,9 @@ def test_deliver_alert_screenshot(
"channels": alert.slack_channel,
"file": screenshot,
"initial_comment": f"\n*Triggered Alert: {alert.label} :redalert:*\n"
f"*Query*:```{alert.sql_observer[0].sql}```\n"
f"*Query*:```{alert.sql}```\n"
f"*Result*: {alert.observations[-1].value}\n"
f"*Reason*: {alert.observations[-1].value} {alert.validators[0].pretty_config}\n"
f"*Reason*: {alert.observations[-1].value} {alert.pretty_config}\n"
f"<http://0.0.0.0:8080/alert/show/{alert.id}"
f"|View Alert Details>\n<http://0.0.0.0:8080/superset/slice/{alert.slice_id}/"
"|*Explore in Superset*>",