Compare commits

..

2 Commits

Author SHA1 Message Date
Joe Li
96477b390a fix(reports): handle missing executor user and revert Slack v2 atomically
Two robustness fixes in the alert/report execution command:

1. Missing executor user: when the configured executor cannot be resolved
   (security_manager.find_user returns None), the content-generation sites
   (_get_screenshots / _get_csv_data / _get_embedded_data) now raise a
   dedicated ReportScheduleExecutorNotFoundError instead of failing later
   with an opaque NoneType error. The guard lives at the content sites so
   it raises inside the state machine's error envelope, preserving the
   ERROR execution-log row and the owner error notification. The run()
   boundary continues to delegate to the state machine (a missing user is
   tolerated there, matching prior behavior) so visibility is unchanged.

2. Slack v2 migration revert: update_report_schedule_slack_v2 now snapshots
   every recipient it mutates and reverts all of them on failure, instead
   of only the loop variable, and no longer raises UnboundLocalError when
   the failure occurs before the loop binds a recipient.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 15:54:23 -07:00
Joe Li
eb79c96e98 fix(reports): bound webhook notification retry wall-clock time
Webhook notification retries used backoff without a max_time bound, so a
hanging or persistently-failing target could stall a worker for minutes
per bad URL (up to ~5 socket waits at timeout=60 plus retry sleeps),
starving sequential report dispatch.

Add max_time=120 to the backoff decorator on WebhookNotification.send.
factor/base/max_tries are unchanged, so legitimately-transient 5xx
targets are still retried; max_time only caps total wall-clock so a bad
target cannot monopolize a worker. max_time is checked between attempts,
so the final in-flight request still runs its full timeout.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 15:53:44 -07:00
5 changed files with 447 additions and 24 deletions

View File

@@ -16,6 +16,7 @@
# under the License.
import math
from typing import Optional
from flask_babel import lazy_gettext as _
@@ -200,6 +201,19 @@ class ReportScheduleDataFrameFailedError(CommandException):
message = _("Report Schedule execution failed when generating a dataframe.")
class ReportScheduleExecutorNotFoundError(CommandException):
status = 422
def __init__(self, username: str = "", exception: Optional[Exception] = None):
super().__init__(
_(
"Report Schedule executor user %(username)s was not found.",
username=f'"{username}"' if username else "",
),
exception,
)
class ReportScheduleExecuteUnexpectedError(CommandException):
message = _("Report Schedule execution got an unexpected error.")

View File

@@ -17,7 +17,7 @@
import logging
from collections.abc import Sequence
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from typing import Any, Optional, TYPE_CHECKING, Union
from uuid import UUID
import pandas as pd
@@ -37,6 +37,7 @@ from superset.commands.report.exceptions import (
ReportScheduleDataFrameFailedError,
ReportScheduleDataFrameTimeout,
ReportScheduleExecuteUnexpectedError,
ReportScheduleExecutorNotFoundError,
ReportScheduleNotFoundError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
@@ -82,9 +83,36 @@ from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot
from superset.utils.slack import get_channels_with_search, SlackChannelTypes
from superset.utils.urls import get_url_path
if TYPE_CHECKING:
from flask_appbuilder.security.sqla.models import User
logger = logging.getLogger(__name__)
def resolve_executor_user(model: ReportSchedule) -> tuple["User", str]:
"""
Resolve the executor user for a report schedule.
Determines the configured executor username via ``get_executor`` and looks up
the corresponding user. A deleted/disabled user or a misconfigured
``ALERT_REPORTS_EXECUTORS`` makes ``security_manager.find_user`` return
``None``; rather than passing ``None`` into the webdriver/auth flow (which
fails with an opaque NoneType error), raise a dedicated, actionable error.
:returns: the ``(user, username)`` pair — the username is returned alongside
the user because several call sites log it after resolution.
:raises ReportScheduleExecutorNotFoundError: if the executor user is missing.
"""
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=model,
)
user = security_manager.find_user(username)
if user is None:
raise ReportScheduleExecutorNotFoundError(username)
return user, username
class BaseReportState:
current_states: list[ReportState] = []
initial: bool = False
@@ -135,9 +163,17 @@ class BaseReportState:
Update the report schedule type and channels for all slack recipients to v2.
V2 uses ids instead of names for channels.
"""
# Track each recipient mutated in this pass with its original (type,
# config) so a partial failure can revert ALL of them — not just the
# loop variable. Restoring the values to their loaded state keeps the
# persisted rows unchanged regardless of any intervening commit.
mutated: list[tuple[ReportRecipients, ReportRecipientType, str]] = []
try:
for recipient in self._report_schedule.recipients:
if recipient.type == ReportRecipientType.SLACK:
mutated.append(
(recipient, recipient.type, recipient.recipient_config_json)
)
recipient.type = ReportRecipientType.SLACKV2
slack_recipients = json.loads(recipient.recipient_config_json)
# V1 method allowed to use leading `#` in the channel name
@@ -169,8 +205,15 @@ class BaseReportState:
}
)
except Exception as ex:
# Revert to v1 to preserve configuration (requires manual fix)
recipient.type = ReportRecipientType.SLACK
# Revert every mutated recipient to v1 (both type AND config) to
# preserve configuration (requires manual fix). Reverting the full
# set — not just the loop variable — keeps earlier recipients
# consistent; iterating the snapshot also avoids the UnboundLocalError
# that a bare loop-variable reference raises on a pre-iteration
# failure (which would mask the real error).
for reverted_recipient, original_type, original_config in mutated:
reverted_recipient.type = original_type
reverted_recipient.recipient_config_json = original_config
msg = f"Failed to update slack recipients to v2: {str(ex)}"
logger.exception(msg)
raise UpdateFailedError(msg) from ex
@@ -358,11 +401,6 @@ class BaseReportState:
dashboard_id=str(self._report_schedule.dashboard.uuid),
state=dashboard_state,
).run()
# Commit the permalink immediately so Playwright's separate DB connection
# can resolve the URL. CreateDashboardPermalinkCommand only flushes when
# called inside an outer @transaction(), leaving the row invisible to
# other connections until we explicitly commit here.
db.session.commit() # pylint: disable=consider-using-transaction
return get_url_path(
"Superset.dashboard_permalink",
@@ -428,11 +466,7 @@ class BaseReportState:
"""
start_time = datetime.utcnow()
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._report_schedule,
)
user = security_manager.find_user(username)
user, username = resolve_executor_user(self._report_schedule)
max_width = app.config["ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH"]
@@ -514,11 +548,7 @@ class BaseReportState:
def _get_csv_data(self) -> bytes:
start_time = datetime.utcnow()
url = self._get_url(result_format=ChartDataResultFormat.CSV)
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._report_schedule,
)
user = security_manager.find_user(username)
user, username = resolve_executor_user(self._report_schedule)
auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user)
if self._report_schedule.chart.query_context is None:
@@ -564,11 +594,7 @@ class BaseReportState:
start_time = datetime.utcnow()
url = self._get_url(result_format=ChartDataResultFormat.JSON)
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._report_schedule,
)
user = security_manager.find_user(username)
user, username = resolve_executor_user(self._report_schedule)
auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user)
if self._report_schedule.chart.query_context is None:
@@ -1169,6 +1195,15 @@ class AsyncExecuteReportScheduleCommand(BaseCommand):
if not self._model:
raise ReportScheduleExecuteUnexpectedError()
# Resolve the executor at the run() boundary the same way master
# does: tolerate a missing user (find_user -> None) so the state
# machine still runs and its error envelope writes the ERROR
# execution-log row and sends the owner notification. The dedicated
# ReportScheduleExecutorNotFoundError guard lives at the content
# sites (_get_screenshots / _get_csv_data / _get_embedded_data),
# which raise inside that envelope. Guarding here instead would
# surface a 422 above the state machine, suppressing both the log
# row and the owner notification.
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._model,

View File

@@ -128,7 +128,19 @@ class WebhookNotification(BaseNotification):
raise NotificationParamException("Webhook URL target host is not allowed.")
@backoff.on_exception(
backoff.expo, NotificationUnprocessableException, factor=10, base=2, max_tries=5
backoff.expo,
NotificationUnprocessableException,
factor=10,
base=2,
max_tries=5,
# Bound total wall-clock retry time. Without this, a hanging or
# persistently failing target can stall a worker for minutes per bad URL
# (up to ~5 socket waits at timeout=60 plus ~150s of retry sleeps),
# starving sequential report dispatch. max_time is checked between
# attempts, so the final in-flight request can still run its full
# timeout; factor is intentionally kept at 10 so legitimately-transient
# 5xx targets are not abandoned early.
max_time=120,
)
@statsd_gauge("reports.webhook.send")
def send(self) -> None:

View File

@@ -28,6 +28,7 @@ from superset.commands.exceptions import UpdateFailedError
from superset.commands.report.exceptions import (
ReportScheduleAlertGracePeriodError,
ReportScheduleCsvFailedError,
ReportScheduleExecutorNotFoundError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
ReportScheduleScreenshotTimeout,
@@ -995,6 +996,81 @@ def test_screenshot_width_calculation(
)
def _executor_report_state(mocker: MockerFixture) -> BaseReportState:
report_schedule = create_report_schedule(mocker)
# _get_csv_data/_get_embedded_data build a chart-data URL from chart_id
# before resolving the executor; give it a concrete value so URL building
# succeeds and the executor resolution is actually reached.
report_schedule.chart_id = 1
report_schedule.force_screenshot = False
return BaseReportState(
report_schedule=report_schedule,
scheduled_dttm=datetime.now(),
execution_id=UUID("084e7ee6-5557-4ecd-9632-b7f39c9ec524"),
)
@pytest.mark.parametrize(
"method_name",
["_get_screenshots", "_get_csv_data", "_get_embedded_data"],
)
def test_get_content_raises_when_executor_user_missing(
app: SupersetApp, mocker: MockerFixture, method_name: str
) -> None:
"""
When the configured executor user cannot be resolved
(``security_manager.find_user`` returns ``None``), each content path raises a
dedicated ``ReportScheduleExecutorNotFoundError`` naming the username, rather
than passing ``None`` downstream and failing with an opaque NoneType error.
"""
app.config.update(
{
"ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH": 1600,
"WEBDRIVER_WINDOW": {"slice": (800, 600), "dashboard": (800, 600)},
"ALERT_REPORTS_EXECUTORS": {},
}
)
report_state = _executor_report_state(mocker)
with (
patch("superset.commands.report.execute.security_manager") as mock_sm,
patch("superset.commands.report.execute.get_executor") as mock_get_executor,
patch("superset.commands.report.execute.machine_auth_provider_factory"),
):
mock_get_executor.return_value = ("executor", "ghost_user")
mock_sm.find_user = mocker.MagicMock(return_value=None)
with pytest.raises(ReportScheduleExecutorNotFoundError, match="ghost_user"):
getattr(report_state, method_name)()
def test_resolve_executor_user_returns_user_and_username(
app: SupersetApp, mocker: MockerFixture
) -> None:
"""
Happy path: when the executor user exists, the helper returns the
``(user, username)`` tuple unchanged — locking the no-behavior-change exit
criterion for the four call sites.
"""
from superset.commands.report.execute import resolve_executor_user
app.config.update({"ALERT_REPORTS_EXECUTORS": {}})
report_schedule = create_report_schedule(mocker)
mock_user = mocker.MagicMock()
with (
patch("superset.commands.report.execute.security_manager") as mock_sm,
patch("superset.commands.report.execute.get_executor") as mock_get_executor,
):
mock_get_executor.return_value = ("executor", "real_user")
mock_sm.find_user = mocker.MagicMock(return_value=mock_user)
user, username = resolve_executor_user(report_schedule)
assert user is mock_user
assert username == "real_user"
def test_update_recipient_to_slack_v2(mocker: MockerFixture):
"""
Test converting a Slack recipient to Slack v2 format.
@@ -1070,6 +1146,122 @@ def test_update_recipient_to_slack_v2_missing_channels(mocker: MockerFixture):
mock_cmmd.update_report_schedule_slack_v2()
def test_update_recipient_to_slack_v2_reverts_all_on_partial_failure(
mocker: MockerFixture,
) -> None:
"""
When the second of two Slack recipients fails channel resolution, BOTH
recipients are fully reverted — type AND exact original
``recipient_config_json`` string — not just the loop variable's type. This
prevents the intervening ``create_log`` commit from flushing a half-migrated,
inconsistent state.
"""
def channels_side_effect(search_string, types, exact_match):
if search_string == "Channel-1":
return [
{
"id": "id_channel_1",
"name": "Channel-1",
"is_member": True,
"is_private": False,
}
]
# Second recipient: no channel found → length mismatch → UpdateFailedError
return []
mocker.patch(
"superset.commands.report.execute.get_channels_with_search",
side_effect=channels_side_effect,
)
original_config_1 = json.dumps({"target": "Channel-1"})
original_config_2 = json.dumps({"target": "Channel-2"})
mock_report_schedule = ReportSchedule(
name="Test Report",
recipients=[
ReportRecipients(
type=ReportRecipientType.SLACK,
recipient_config_json=original_config_1,
),
ReportRecipients(
type=ReportRecipientType.SLACK,
recipient_config_json=original_config_2,
),
],
)
mock_cmmd = BaseReportState(
mock_report_schedule, "January 1, 2021", "execution_id_example"
)
with pytest.raises(UpdateFailedError):
mock_cmmd.update_report_schedule_slack_v2()
first, second = mock_report_schedule.recipients
# The first recipient was mutated to v2 before the second failed; it must be
# reverted to its exact original type AND config string.
assert first.type == ReportRecipientType.SLACK
assert first.recipient_config_json == original_config_1
assert second.type == ReportRecipientType.SLACK
assert second.recipient_config_json == original_config_2
def test_update_recipient_to_slack_v2_pre_iteration_failure(
mocker: MockerFixture,
) -> None:
"""
A failure raised while accessing/iterating the recipients (before the loop
variable is bound) surfaces as ``UpdateFailedError``, not a ``NameError``
that would mask the real error.
"""
class _ExplodingRecipients:
def __iter__(self):
raise RuntimeError("recipients exploded")
mock_report_schedule = mocker.MagicMock()
mock_report_schedule.recipients = _ExplodingRecipients()
mock_cmmd = BaseReportState(
mock_report_schedule, "January 1, 2021", "execution_id_example"
)
with pytest.raises(UpdateFailedError):
mock_cmmd.update_report_schedule_slack_v2()
def test_update_recipient_to_slack_v2_no_slack_recipients_is_noop(
mocker: MockerFixture,
) -> None:
"""
With no SLACK recipients there is nothing to migrate: the method returns
without raising and leaves the non-Slack recipients untouched.
"""
mock_search = mocker.patch(
"superset.commands.report.execute.get_channels_with_search",
)
mock_report_schedule = ReportSchedule(
recipients=[
ReportRecipients(
type=ReportRecipientType.EMAIL,
recipient_config_json=json.dumps({"target": "user@example.com"}),
),
],
)
mock_cmmd: BaseReportState = BaseReportState(
mock_report_schedule, "January 1, 2021", "execution_id_example"
)
mock_cmmd.update_report_schedule_slack_v2()
assert mock_cmmd._report_schedule.recipients[0].type == ReportRecipientType.EMAIL
assert (
mock_cmmd._report_schedule.recipients[0].recipient_config_json
== '{"target": "user@example.com"}'
)
mock_search.assert_not_called()
# ---------------------------------------------------------------------------
# Tier 1: _update_query_context + create_log
# ---------------------------------------------------------------------------

View File

@@ -16,11 +16,14 @@
# under the License.
import datetime as datetime_module
import pandas as pd
import pytest
from superset.reports.notifications.exceptions import (
NotificationParamException,
NotificationUnprocessableException,
)
from superset.reports.notifications.webhook import WebhookNotification
from superset.utils.core import HeaderDataType
@@ -269,3 +272,170 @@ def test_send_treats_redirect_as_failure(monkeypatch, mock_header_data) -> None:
with pytest.raises(NotificationParamException, match="redirect"):
webhook_notification.send()
class _FakeBackoffDatetime:
"""
Drop-in for the ``datetime`` *module* referenced as ``backoff._sync.datetime``.
backoff 2.2.1 computes the ``max_time`` elapsed via
``datetime.datetime.now()`` inside ``backoff._sync`` (NOT via ``time``), so
patching only ``backoff._sync.time.sleep`` leaves ``elapsed`` pinned at 0 and
``max_time`` never fires. This fake advances the clock a fixed ``step`` per
``now()`` call so the wall-time bound is observable in a sub-second test.
A ``step`` of 0 holds the clock flat (elapsed stays 0).
"""
def __init__(self, step_seconds: float) -> None:
base = datetime_module.datetime(2020, 1, 1)
state = {"calls": 0}
class _FakeDateTime:
@staticmethod
def now() -> datetime_module.datetime:
offset = datetime_module.timedelta(
seconds=step_seconds * state["calls"]
)
state["calls"] += 1
return base + offset
self.datetime = _FakeDateTime
def _make_webhook(mock_header_data) -> WebhookNotification:
from superset.reports.models import ReportRecipients, ReportRecipientType
from superset.reports.notifications.base import NotificationContent
content = NotificationContent(
name="test alert", header_data=mock_header_data, description="Test description"
)
return WebhookNotification(
recipient=ReportRecipients(
type=ReportRecipientType.WEBHOOK,
recipient_config_json='{"target": "https://example.com/webhook"}',
),
content=content,
)
class _MockServerErrorResponse:
status_code = 500
text = ""
def _allow_internal_app() -> type:
class MockCurrentApp:
config = {
"ALERT_REPORTS_WEBHOOK_HTTPS_ONLY": True,
"ALERT_REPORTS_WEBHOOK_ALLOW_INTERNAL_HOSTS": True,
}
return MockCurrentApp
def test_send_backoff_bounded_by_max_time(monkeypatch, mock_header_data) -> None:
"""
A persistently failing (500) target gives up on wall-time (``max_time``),
not just ``max_tries``. With the fake clock stepping +50s per backoff sample,
elapsed crosses ``max_time=120`` between the 2nd and 3rd POST, so exactly 3
POSTs happen (distinct from ``max_tries=5``). The terminal exception type is
unchanged on giveup.
"""
webhook_notification = _make_webhook(mock_header_data)
post_calls: list[int] = []
def fake_post(*args, **kwargs) -> _MockServerErrorResponse:
post_calls.append(1)
return _MockServerErrorResponse()
monkeypatch.setattr(
"superset.reports.notifications.webhook.current_app", _allow_internal_app()
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled",
lambda flag: True,
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.requests.post", fake_post
)
monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None)
monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(50))
with pytest.raises(NotificationUnprocessableException):
webhook_notification.send()
assert len(post_calls) == 3
def test_send_flat_clock_falls_back_to_max_tries(monkeypatch, mock_header_data) -> None:
"""
Characterization (NOT a RED discriminator): with the clock held flat,
``max_time`` can never fire, so ``max_tries=5`` governs and exactly 5 POSTs
happen. Passes on both buggy and fixed code; its job is to prove the 3-vs-5
delta in ``test_send_backoff_bounded_by_max_time`` is attributable to
wall-time, not to ``max_tries``.
"""
webhook_notification = _make_webhook(mock_header_data)
post_calls: list[int] = []
def fake_post(*args, **kwargs) -> _MockServerErrorResponse:
post_calls.append(1)
return _MockServerErrorResponse()
monkeypatch.setattr(
"superset.reports.notifications.webhook.current_app", _allow_internal_app()
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled",
lambda flag: True,
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.requests.post", fake_post
)
monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None)
monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(0))
with pytest.raises(NotificationUnprocessableException):
webhook_notification.send()
assert len(post_calls) == 5
def test_send_max_time_does_not_abandon_recovering_target(
monkeypatch, mock_header_data
) -> None:
"""
No-regression guard: a target that fails twice (500) then succeeds on the
3rd attempt — cumulative elapsed well under ``max_time`` — still succeeds.
Confirms ``max_time=120`` is not set so low that it abandons a target that
recovers within the normal retry window.
"""
webhook_notification = _make_webhook(mock_header_data)
post_calls: list[int] = []
class _OkResponse:
status_code = 200
text = ""
def fake_post(*args, **kwargs):
post_calls.append(1)
if len(post_calls) < 3:
return _MockServerErrorResponse()
return _OkResponse()
monkeypatch.setattr(
"superset.reports.notifications.webhook.current_app", _allow_internal_app()
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled",
lambda flag: True,
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.requests.post", fake_post
)
monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None)
monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(10))
webhook_notification.send()
assert len(post_calls) == 3