fix(alerts): wrong alert trigger with custom query (#35871)

This commit is contained in:
Gabriel Torres Ruiz
2026-01-07 13:06:18 -04:00
committed by GitHub
parent 861e5cd013
commit 5edaed2e5b
4 changed files with 570 additions and 18 deletions

View File

@@ -59,14 +59,17 @@ class AlertCommand(BaseCommand):
self._report_schedule = report_schedule
self._execution_id = execution_id
self._result: float | None = None
self._info_message: str | None = None
def run(self) -> bool:
def run(self) -> tuple[bool, str | None]:
"""
Executes an alert SQL query and validates it.
Will set the report_schedule.last_value or last_value_row_json
with the query result
:return: bool, if the alert triggered or not
:return: tuple[bool, str | None] - (triggered, message)
- triggered: True if alert condition was met
- message: Optional info message explaining why alert didn't trigger
:raises AlertQueryError: SQL query is not valid
:raises AlertQueryInvalidTypeError: The output from the SQL query
is not an allowed type
@@ -79,20 +82,47 @@ class AlertCommand(BaseCommand):
if self._is_validator_not_null:
self._report_schedule.last_value_row_json = str(self._result)
return self._result not in (0, None, np.nan)
is_null_or_nan = self._result is None or pd.isna(self._result)
triggered = bool(self._result != 0 and not is_null_or_nan)
if not triggered:
logger.debug(
"Alert not triggered for report_schedule_id=%s, "
"execution_id=%s, result=%s, message=%s",
self._report_schedule.id,
self._execution_id,
self._result,
self._info_message,
)
return (triggered, self._info_message)
self._report_schedule.last_value = self._result
try:
operator = json.loads(self._report_schedule.validator_config_json)["op"]
threshold = json.loads(self._report_schedule.validator_config_json)[
"threshold"
]
return OPERATOR_FUNCTIONS[operator](self._result, threshold) # type: ignore
config = json.loads(self._report_schedule.validator_config_json)
operator = config["op"]
threshold = config["threshold"]
# Return False for None results to prevent false alert triggers
if self._result is None:
logger.debug(
"Alert not triggered (NULL result) for report_schedule_id=%s, "
"execution_id=%s, message=%s",
self._report_schedule.id,
self._execution_id,
self._info_message,
)
return (False, self._info_message)
triggered = OPERATOR_FUNCTIONS[operator](self._result, threshold)
return (triggered, None)
except (KeyError, json.JSONDecodeError) as ex:
raise AlertValidatorConfigError() from ex
def _validate_not_null(self, rows: np.recarray[Any, Any]) -> None:
self._validate_result(rows)
self._result = rows[0][1]
value = rows[0][1]
# Normalize NULL/NaN to None for consistency with _validate_operator
if value is None or pd.isna(value):
self._result = None
self._info_message = "Query returned NULL value"
return
self._result = value
@staticmethod
def _validate_result(rows: np.recarray[Any, Any]) -> None:
@@ -117,11 +147,16 @@ class AlertCommand(BaseCommand):
def _validate_operator(self, rows: np.recarray[Any, Any]) -> None:
self._validate_result(rows)
if rows[0][1] in (0, None, np.nan):
if rows[0][1] is None or pd.isna(rows[0][1]):
self._result = None
self._info_message = "Query returned NULL value"
return
if rows[0][1] == 0:
self._result = 0.0
return
try:
# Check if it's float or if we can convert it
self._result = float(rows[0][1])
return
except (AssertionError, TypeError, ValueError) as ex:
@@ -212,7 +247,14 @@ class AlertCommand(BaseCommand):
self._result = None
return
if df.empty and self._is_validator_operator:
self._result = 0.0
logger.debug(
"Alert query returned empty result for report_schedule_id=%s, "
"execution_id=%s.",
self._report_schedule.id,
self._execution_id,
)
self._result = None
self._info_message = "Query returned no rows (empty result set)"
return
rows = df.to_records()
if self._is_validator_not_null:

View File

@@ -822,8 +822,13 @@ class ReportNotTriggeredErrorState(BaseReportState):
try:
# If it's an alert check if the alert is triggered
if self._report_schedule.type == ReportScheduleType.ALERT:
if not AlertCommand(self._report_schedule, self._execution_id).run():
self.update_report_schedule_and_log(ReportState.NOOP)
triggered, message = AlertCommand(
self._report_schedule, self._execution_id
).run()
if not triggered:
self.update_report_schedule_and_log(
ReportState.NOOP, error_message=message
)
return
self.send()
self.update_report_schedule_and_log(ReportState.SUCCESS)
@@ -949,8 +954,13 @@ class ReportSuccessState(BaseReportState):
return
self.update_report_schedule_and_log(ReportState.WORKING)
try:
if not AlertCommand(self._report_schedule, self._execution_id).run():
self.update_report_schedule_and_log(ReportState.NOOP)
triggered, message = AlertCommand(
self._report_schedule, self._execution_id
).run()
if not triggered:
self.update_report_schedule_and_log(
ReportState.NOOP, error_message=message
)
return
except Exception as ex:
self.send_error(

View File

@@ -131,7 +131,9 @@ def test_execute_query_mutate_query_enabled(
database=mock_database,
validator_config_json='{"op": "==", "threshold": 1}',
)
AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run()
triggered, message = AlertCommand(
report_schedule=report_schedule, execution_id=uuid.uuid4()
).run()
mock_mutate_call.assert_called_once_with(mock_limited_sql.return_value)
mock_get_df.assert_called_once_with(sql=mock_mutate_call.return_value)
@@ -166,7 +168,9 @@ def test_execute_query_mutate_query_disabled(
database=mock_database,
validator_config_json='{"op": "==", "threshold": 1}',
)
AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run()
triggered, message = AlertCommand(
report_schedule=report_schedule, execution_id=uuid.uuid4()
).run()
mock_database.mutate_sql_based_on_config.assert_not_called()
mock_database.get_df.assert_called_once_with(

View File

@@ -0,0 +1,496 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from uuid import uuid4
import numpy as np
import pandas as pd
import pytest
from pytest_mock import MockerFixture
from superset.commands.report.alert import AlertCommand
from superset.commands.report.exceptions import AlertValidatorConfigError
from superset.reports.models import ReportScheduleValidatorType, ReportState
def test_empty_query_result_with_operator_validator_returns_false_with_message(
mocker: MockerFixture,
) -> None:
"""Test that empty results with operator validator returns (False, message)"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame(),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.OPERATOR
report_schedule_mock.validator_config_json = '{"op": "<", "threshold": 0.75}'
report_schedule_mock.id = 1
report_schedule_mock.sql = "SELECT value FROM metrics WHERE value < 0.75"
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is False
assert command._result is None
assert message == "Query returned no rows (empty result set)"
@pytest.mark.parametrize(
"operator,threshold",
[
("<", 0.75),
("<=", 100),
(">", 50),
(">=", 0),
("==", 42),
("!=", 0),
],
)
def test_empty_result_prevents_false_alerts_for_all_operators(
mocker: MockerFixture,
operator: str,
threshold: float,
) -> None:
"""Test that empty results return (False, message) for any operator type"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame(),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.OPERATOR
report_schedule_mock.validator_config_json = (
f'{{"op": "{operator}", "threshold": {threshold}}}'
)
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is False, (
f"Alert with operator '{operator}' should not trigger on empty results"
)
assert message == "Query returned no rows (empty result set)"
def test_empty_result_flow_sets_noop_state(mocker: MockerFixture) -> None:
"""Test that empty results lead to NOOP state with info message"""
from superset.commands.report.execute import ReportNotTriggeredErrorState
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame(),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.type = "Alert"
report_schedule_mock.validator_type = ReportScheduleValidatorType.OPERATOR
report_schedule_mock.validator_config_json = '{"op": "<", "threshold": 0.75}'
report_schedule_mock.id = 1
report_schedule_mock.last_state = ReportState.NOOP
state = ReportNotTriggeredErrorState(
report_schedule=report_schedule_mock,
scheduled_dttm=mocker.Mock(),
execution_id=uuid4(),
)
send_mock = mocker.patch.object(state, "send")
update_mock = mocker.patch.object(state, "update_report_schedule_and_log")
state.next()
update_mock.assert_any_call(
ReportState.NOOP, error_message="Query returned no rows (empty result set)"
)
send_mock.assert_not_called()
def test_malformed_config_raises_error_with_valid_result(
mocker: MockerFixture,
) -> None:
"""Test that malformed config is detected when result is valid"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [0.5]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.OPERATOR
report_schedule_mock.validator_config_json = "invalid json"
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
with pytest.raises(AlertValidatorConfigError):
command.run()
def test_query_returning_null_value_returns_false_with_message(
mocker: MockerFixture,
) -> None:
"""Test that query returning NULL value returns (False, message)"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [None]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.OPERATOR
report_schedule_mock.validator_config_json = '{"op": "<", "threshold": 0.75}'
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is False
assert command._result is None, "Query returning NULL should set result to None"
assert message == "Query returned NULL value"
def test_query_returning_zero_value_sets_result_to_zero(
mocker: MockerFixture,
) -> None:
"""Test that query returning 0 value sets result to 0.0 (not None)"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [0]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.OPERATOR
report_schedule_mock.validator_config_json = '{"op": "<", "threshold": 0.75}'
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert command._result == 0.0, "Query returning 0 should set result to 0.0"
assert triggered is True, "0 < 0.75 should trigger alert"
assert message is None
assert report_schedule_mock.last_value == 0.0
def test_query_returning_nan_value_returns_false_with_message(
mocker: MockerFixture,
) -> None:
"""Test that query returning NaN value returns (False, message)"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [np.nan]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.OPERATOR
report_schedule_mock.validator_config_json = '{"op": ">", "threshold": 5}'
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is False
assert command._result is None, "Query returning NaN should set result to None"
assert message == "Query returned NULL value"
@pytest.mark.parametrize(
"value,expected_result,operator,threshold,should_trigger",
[
(0, 0.0, "<", 0.75, True),
(0.5, 0.5, "<", 0.75, True),
(1.0, 1.0, "<", 0.75, False),
(0, 0.0, ">=", 0, True),
],
)
def test_value_handling_with_valid_numbers(
mocker: MockerFixture,
value: float,
expected_result: float,
operator: str,
threshold: float,
should_trigger: bool,
) -> None:
"""Test proper handling of valid numeric values including 0"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [value]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.OPERATOR
report_schedule_mock.validator_config_json = (
f'{{"op": "{operator}", "threshold": {threshold}}}'
)
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert command._result == expected_result, (
f"Value {value} should result in {expected_result}, got {command._result}"
)
assert triggered is should_trigger, (
f"Value {value} with {operator} {threshold} should "
f"{'trigger' if should_trigger else 'not trigger'} alert"
)
assert report_schedule_mock.last_value == expected_result
if should_trigger:
assert message is None
else:
assert message is None # Non-trigger due to threshold, not NULL
@pytest.mark.parametrize(
"value,expected_message",
[
(None, "Query returned NULL value"),
(np.nan, "Query returned NULL value"),
],
)
def test_value_handling_with_null_and_nan(
mocker: MockerFixture,
value: float | None,
expected_message: str,
) -> None:
"""Test that NULL and NaN values return (False, message)"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [value]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.OPERATOR
report_schedule_mock.validator_config_json = '{"op": "<", "threshold": 0.75}'
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is False
assert message == expected_message
assert command._result is None
def test_not_null_validator_with_valid_value_triggers_alert(
mocker: MockerFixture,
) -> None:
"""Test NOT_NULL validator triggers with valid non-null value"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [42]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.NOT_NULL
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is True, "NOT_NULL with valid value should trigger alert"
assert command._result == 42
assert message is None
assert report_schedule_mock.last_value_row_json == "42"
def test_not_null_validator_with_null_value_does_not_trigger(
mocker: MockerFixture,
) -> None:
"""Test NOT_NULL validator normalizes NULL to None and doesn't trigger"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [None]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.NOT_NULL
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is False, "NOT_NULL with NULL value should not trigger"
assert command._result is None
assert message == "Query returned NULL value"
def test_not_null_validator_with_nan_value_does_not_trigger(
mocker: MockerFixture,
) -> None:
"""Test NOT_NULL validator normalizes NaN to None and doesn't trigger"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [np.nan]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.NOT_NULL
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is False, "NOT_NULL with NaN value should not trigger"
assert command._result is None
assert message == "Query returned NULL value"
def test_not_null_validator_with_zero_value_does_not_trigger(
mocker: MockerFixture,
) -> None:
"""Test NOT_NULL validator with 0 value does not trigger (0 is falsy)"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame({"value": [0]}),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.NOT_NULL
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is False, "NOT_NULL with 0 value should not trigger"
assert command._result == 0
def test_not_null_validator_with_empty_result_does_not_trigger(
mocker: MockerFixture,
) -> None:
"""Test NOT_NULL validator with empty result does not trigger"""
mocker.patch(
"superset.commands.report.alert.retry_call",
side_effect=lambda func, *args, **kwargs: func(*args, **kwargs),
)
mocker.patch(
"superset.commands.report.alert.AlertCommand._execute_query",
return_value=pd.DataFrame(),
)
report_schedule_mock = mocker.Mock()
report_schedule_mock.validator_type = ReportScheduleValidatorType.NOT_NULL
report_schedule_mock.id = 1
command = AlertCommand(
report_schedule=report_schedule_mock,
execution_id=uuid4(),
)
triggered, message = command.run()
assert triggered is False, "NOT_NULL with empty result should not trigger"
assert command._result is None