Compare commits

..

1 Commits

Author SHA1 Message Date
Elizabeth Thompson
9058b49971 fix(reports): commit permalink before Playwright navigates to tab URL
CreateDashboardPermalinkCommand only flushes the INSERT when invoked
inside an outer @transaction() (the nesting guard skips the commit).
The row is therefore not visible to Playwright's separate DB connection,
causing dashboard_permalink to return a 404, the <body class="standalone">
element to never appear, and the screenshot to time out after 600s.

Commit immediately after CreateDashboardPermalinkCommand.run() returns,
matching the pattern already used by create_log() in the same class.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 23:50:30 +00:00
5 changed files with 24 additions and 447 deletions

View File

@@ -16,7 +16,6 @@
# under the License.
import math
from typing import Optional
from flask_babel import lazy_gettext as _
@@ -201,19 +200,6 @@ class ReportScheduleDataFrameFailedError(CommandException):
message = _("Report Schedule execution failed when generating a dataframe.")
class ReportScheduleExecutorNotFoundError(CommandException):
status = 422
def __init__(self, username: str = "", exception: Optional[Exception] = None):
super().__init__(
_(
"Report Schedule executor user %(username)s was not found.",
username=f'"{username}"' if username else "",
),
exception,
)
class ReportScheduleExecuteUnexpectedError(CommandException):
message = _("Report Schedule execution got an unexpected error.")

View File

@@ -17,7 +17,7 @@
import logging
from collections.abc import Sequence
from datetime import datetime, timedelta
from typing import Any, Optional, TYPE_CHECKING, Union
from typing import Any, Optional, Union
from uuid import UUID
import pandas as pd
@@ -37,7 +37,6 @@ from superset.commands.report.exceptions import (
ReportScheduleDataFrameFailedError,
ReportScheduleDataFrameTimeout,
ReportScheduleExecuteUnexpectedError,
ReportScheduleExecutorNotFoundError,
ReportScheduleNotFoundError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
@@ -83,36 +82,9 @@ from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot
from superset.utils.slack import get_channels_with_search, SlackChannelTypes
from superset.utils.urls import get_url_path
if TYPE_CHECKING:
from flask_appbuilder.security.sqla.models import User
logger = logging.getLogger(__name__)
def resolve_executor_user(model: ReportSchedule) -> tuple["User", str]:
"""
Resolve the executor user for a report schedule.
Determines the configured executor username via ``get_executor`` and looks up
the corresponding user. A deleted/disabled user or a misconfigured
``ALERT_REPORTS_EXECUTORS`` makes ``security_manager.find_user`` return
``None``; rather than passing ``None`` into the webdriver/auth flow (which
fails with an opaque NoneType error), raise a dedicated, actionable error.
:returns: the ``(user, username)`` pair — the username is returned alongside
the user because several call sites log it after resolution.
:raises ReportScheduleExecutorNotFoundError: if the executor user is missing.
"""
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=model,
)
user = security_manager.find_user(username)
if user is None:
raise ReportScheduleExecutorNotFoundError(username)
return user, username
class BaseReportState:
current_states: list[ReportState] = []
initial: bool = False
@@ -163,17 +135,9 @@ class BaseReportState:
Update the report schedule type and channels for all slack recipients to v2.
V2 uses ids instead of names for channels.
"""
# Track each recipient mutated in this pass with its original (type,
# config) so a partial failure can revert ALL of them — not just the
# loop variable. Restoring the values to their loaded state keeps the
# persisted rows unchanged regardless of any intervening commit.
mutated: list[tuple[ReportRecipients, ReportRecipientType, str]] = []
try:
for recipient in self._report_schedule.recipients:
if recipient.type == ReportRecipientType.SLACK:
mutated.append(
(recipient, recipient.type, recipient.recipient_config_json)
)
recipient.type = ReportRecipientType.SLACKV2
slack_recipients = json.loads(recipient.recipient_config_json)
# V1 method allowed to use leading `#` in the channel name
@@ -205,15 +169,8 @@ class BaseReportState:
}
)
except Exception as ex:
# Revert every mutated recipient to v1 (both type AND config) to
# preserve configuration (requires manual fix). Reverting the full
# set — not just the loop variable — keeps earlier recipients
# consistent; iterating the snapshot also avoids the UnboundLocalError
# that a bare loop-variable reference raises on a pre-iteration
# failure (which would mask the real error).
for reverted_recipient, original_type, original_config in mutated:
reverted_recipient.type = original_type
reverted_recipient.recipient_config_json = original_config
# Revert to v1 to preserve configuration (requires manual fix)
recipient.type = ReportRecipientType.SLACK
msg = f"Failed to update slack recipients to v2: {str(ex)}"
logger.exception(msg)
raise UpdateFailedError(msg) from ex
@@ -401,6 +358,11 @@ class BaseReportState:
dashboard_id=str(self._report_schedule.dashboard.uuid),
state=dashboard_state,
).run()
# Commit the permalink immediately so Playwright's separate DB connection
# can resolve the URL. CreateDashboardPermalinkCommand only flushes when
# called inside an outer @transaction(), leaving the row invisible to
# other connections until we explicitly commit here.
db.session.commit() # pylint: disable=consider-using-transaction
return get_url_path(
"Superset.dashboard_permalink",
@@ -466,7 +428,11 @@ class BaseReportState:
"""
start_time = datetime.utcnow()
user, username = resolve_executor_user(self._report_schedule)
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._report_schedule,
)
user = security_manager.find_user(username)
max_width = app.config["ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH"]
@@ -548,7 +514,11 @@ class BaseReportState:
def _get_csv_data(self) -> bytes:
start_time = datetime.utcnow()
url = self._get_url(result_format=ChartDataResultFormat.CSV)
user, username = resolve_executor_user(self._report_schedule)
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._report_schedule,
)
user = security_manager.find_user(username)
auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user)
if self._report_schedule.chart.query_context is None:
@@ -594,7 +564,11 @@ class BaseReportState:
start_time = datetime.utcnow()
url = self._get_url(result_format=ChartDataResultFormat.JSON)
user, username = resolve_executor_user(self._report_schedule)
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._report_schedule,
)
user = security_manager.find_user(username)
auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user)
if self._report_schedule.chart.query_context is None:
@@ -1195,15 +1169,6 @@ class AsyncExecuteReportScheduleCommand(BaseCommand):
if not self._model:
raise ReportScheduleExecuteUnexpectedError()
# Resolve the executor at the run() boundary the same way master
# does: tolerate a missing user (find_user -> None) so the state
# machine still runs and its error envelope writes the ERROR
# execution-log row and sends the owner notification. The dedicated
# ReportScheduleExecutorNotFoundError guard lives at the content
# sites (_get_screenshots / _get_csv_data / _get_embedded_data),
# which raise inside that envelope. Guarding here instead would
# surface a 422 above the state machine, suppressing both the log
# row and the owner notification.
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._model,

View File

@@ -128,19 +128,7 @@ class WebhookNotification(BaseNotification):
raise NotificationParamException("Webhook URL target host is not allowed.")
@backoff.on_exception(
backoff.expo,
NotificationUnprocessableException,
factor=10,
base=2,
max_tries=5,
# Bound total wall-clock retry time. Without this, a hanging or
# persistently failing target can stall a worker for minutes per bad URL
# (up to ~5 socket waits at timeout=60 plus ~150s of retry sleeps),
# starving sequential report dispatch. max_time is checked between
# attempts, so the final in-flight request can still run its full
# timeout; factor is intentionally kept at 10 so legitimately-transient
# 5xx targets are not abandoned early.
max_time=120,
backoff.expo, NotificationUnprocessableException, factor=10, base=2, max_tries=5
)
@statsd_gauge("reports.webhook.send")
def send(self) -> None:

View File

@@ -28,7 +28,6 @@ from superset.commands.exceptions import UpdateFailedError
from superset.commands.report.exceptions import (
ReportScheduleAlertGracePeriodError,
ReportScheduleCsvFailedError,
ReportScheduleExecutorNotFoundError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
ReportScheduleScreenshotTimeout,
@@ -996,81 +995,6 @@ def test_screenshot_width_calculation(
)
def _executor_report_state(mocker: MockerFixture) -> BaseReportState:
report_schedule = create_report_schedule(mocker)
# _get_csv_data/_get_embedded_data build a chart-data URL from chart_id
# before resolving the executor; give it a concrete value so URL building
# succeeds and the executor resolution is actually reached.
report_schedule.chart_id = 1
report_schedule.force_screenshot = False
return BaseReportState(
report_schedule=report_schedule,
scheduled_dttm=datetime.now(),
execution_id=UUID("084e7ee6-5557-4ecd-9632-b7f39c9ec524"),
)
@pytest.mark.parametrize(
"method_name",
["_get_screenshots", "_get_csv_data", "_get_embedded_data"],
)
def test_get_content_raises_when_executor_user_missing(
app: SupersetApp, mocker: MockerFixture, method_name: str
) -> None:
"""
When the configured executor user cannot be resolved
(``security_manager.find_user`` returns ``None``), each content path raises a
dedicated ``ReportScheduleExecutorNotFoundError`` naming the username, rather
than passing ``None`` downstream and failing with an opaque NoneType error.
"""
app.config.update(
{
"ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH": 1600,
"WEBDRIVER_WINDOW": {"slice": (800, 600), "dashboard": (800, 600)},
"ALERT_REPORTS_EXECUTORS": {},
}
)
report_state = _executor_report_state(mocker)
with (
patch("superset.commands.report.execute.security_manager") as mock_sm,
patch("superset.commands.report.execute.get_executor") as mock_get_executor,
patch("superset.commands.report.execute.machine_auth_provider_factory"),
):
mock_get_executor.return_value = ("executor", "ghost_user")
mock_sm.find_user = mocker.MagicMock(return_value=None)
with pytest.raises(ReportScheduleExecutorNotFoundError, match="ghost_user"):
getattr(report_state, method_name)()
def test_resolve_executor_user_returns_user_and_username(
app: SupersetApp, mocker: MockerFixture
) -> None:
"""
Happy path: when the executor user exists, the helper returns the
``(user, username)`` tuple unchanged — locking the no-behavior-change exit
criterion for the four call sites.
"""
from superset.commands.report.execute import resolve_executor_user
app.config.update({"ALERT_REPORTS_EXECUTORS": {}})
report_schedule = create_report_schedule(mocker)
mock_user = mocker.MagicMock()
with (
patch("superset.commands.report.execute.security_manager") as mock_sm,
patch("superset.commands.report.execute.get_executor") as mock_get_executor,
):
mock_get_executor.return_value = ("executor", "real_user")
mock_sm.find_user = mocker.MagicMock(return_value=mock_user)
user, username = resolve_executor_user(report_schedule)
assert user is mock_user
assert username == "real_user"
def test_update_recipient_to_slack_v2(mocker: MockerFixture):
"""
Test converting a Slack recipient to Slack v2 format.
@@ -1146,122 +1070,6 @@ def test_update_recipient_to_slack_v2_missing_channels(mocker: MockerFixture):
mock_cmmd.update_report_schedule_slack_v2()
def test_update_recipient_to_slack_v2_reverts_all_on_partial_failure(
mocker: MockerFixture,
) -> None:
"""
When the second of two Slack recipients fails channel resolution, BOTH
recipients are fully reverted — type AND exact original
``recipient_config_json`` string — not just the loop variable's type. This
prevents the intervening ``create_log`` commit from flushing a half-migrated,
inconsistent state.
"""
def channels_side_effect(search_string, types, exact_match):
if search_string == "Channel-1":
return [
{
"id": "id_channel_1",
"name": "Channel-1",
"is_member": True,
"is_private": False,
}
]
# Second recipient: no channel found → length mismatch → UpdateFailedError
return []
mocker.patch(
"superset.commands.report.execute.get_channels_with_search",
side_effect=channels_side_effect,
)
original_config_1 = json.dumps({"target": "Channel-1"})
original_config_2 = json.dumps({"target": "Channel-2"})
mock_report_schedule = ReportSchedule(
name="Test Report",
recipients=[
ReportRecipients(
type=ReportRecipientType.SLACK,
recipient_config_json=original_config_1,
),
ReportRecipients(
type=ReportRecipientType.SLACK,
recipient_config_json=original_config_2,
),
],
)
mock_cmmd = BaseReportState(
mock_report_schedule, "January 1, 2021", "execution_id_example"
)
with pytest.raises(UpdateFailedError):
mock_cmmd.update_report_schedule_slack_v2()
first, second = mock_report_schedule.recipients
# The first recipient was mutated to v2 before the second failed; it must be
# reverted to its exact original type AND config string.
assert first.type == ReportRecipientType.SLACK
assert first.recipient_config_json == original_config_1
assert second.type == ReportRecipientType.SLACK
assert second.recipient_config_json == original_config_2
def test_update_recipient_to_slack_v2_pre_iteration_failure(
mocker: MockerFixture,
) -> None:
"""
A failure raised while accessing/iterating the recipients (before the loop
variable is bound) surfaces as ``UpdateFailedError``, not a ``NameError``
that would mask the real error.
"""
class _ExplodingRecipients:
def __iter__(self):
raise RuntimeError("recipients exploded")
mock_report_schedule = mocker.MagicMock()
mock_report_schedule.recipients = _ExplodingRecipients()
mock_cmmd = BaseReportState(
mock_report_schedule, "January 1, 2021", "execution_id_example"
)
with pytest.raises(UpdateFailedError):
mock_cmmd.update_report_schedule_slack_v2()
def test_update_recipient_to_slack_v2_no_slack_recipients_is_noop(
mocker: MockerFixture,
) -> None:
"""
With no SLACK recipients there is nothing to migrate: the method returns
without raising and leaves the non-Slack recipients untouched.
"""
mock_search = mocker.patch(
"superset.commands.report.execute.get_channels_with_search",
)
mock_report_schedule = ReportSchedule(
recipients=[
ReportRecipients(
type=ReportRecipientType.EMAIL,
recipient_config_json=json.dumps({"target": "user@example.com"}),
),
],
)
mock_cmmd: BaseReportState = BaseReportState(
mock_report_schedule, "January 1, 2021", "execution_id_example"
)
mock_cmmd.update_report_schedule_slack_v2()
assert mock_cmmd._report_schedule.recipients[0].type == ReportRecipientType.EMAIL
assert (
mock_cmmd._report_schedule.recipients[0].recipient_config_json
== '{"target": "user@example.com"}'
)
mock_search.assert_not_called()
# ---------------------------------------------------------------------------
# Tier 1: _update_query_context + create_log
# ---------------------------------------------------------------------------

View File

@@ -16,14 +16,11 @@
# under the License.
import datetime as datetime_module
import pandas as pd
import pytest
from superset.reports.notifications.exceptions import (
NotificationParamException,
NotificationUnprocessableException,
)
from superset.reports.notifications.webhook import WebhookNotification
from superset.utils.core import HeaderDataType
@@ -272,170 +269,3 @@ def test_send_treats_redirect_as_failure(monkeypatch, mock_header_data) -> None:
with pytest.raises(NotificationParamException, match="redirect"):
webhook_notification.send()
class _FakeBackoffDatetime:
"""
Drop-in for the ``datetime`` *module* referenced as ``backoff._sync.datetime``.
backoff 2.2.1 computes the ``max_time`` elapsed via
``datetime.datetime.now()`` inside ``backoff._sync`` (NOT via ``time``), so
patching only ``backoff._sync.time.sleep`` leaves ``elapsed`` pinned at 0 and
``max_time`` never fires. This fake advances the clock a fixed ``step`` per
``now()`` call so the wall-time bound is observable in a sub-second test.
A ``step`` of 0 holds the clock flat (elapsed stays 0).
"""
def __init__(self, step_seconds: float) -> None:
base = datetime_module.datetime(2020, 1, 1)
state = {"calls": 0}
class _FakeDateTime:
@staticmethod
def now() -> datetime_module.datetime:
offset = datetime_module.timedelta(
seconds=step_seconds * state["calls"]
)
state["calls"] += 1
return base + offset
self.datetime = _FakeDateTime
def _make_webhook(mock_header_data) -> WebhookNotification:
from superset.reports.models import ReportRecipients, ReportRecipientType
from superset.reports.notifications.base import NotificationContent
content = NotificationContent(
name="test alert", header_data=mock_header_data, description="Test description"
)
return WebhookNotification(
recipient=ReportRecipients(
type=ReportRecipientType.WEBHOOK,
recipient_config_json='{"target": "https://example.com/webhook"}',
),
content=content,
)
class _MockServerErrorResponse:
status_code = 500
text = ""
def _allow_internal_app() -> type:
class MockCurrentApp:
config = {
"ALERT_REPORTS_WEBHOOK_HTTPS_ONLY": True,
"ALERT_REPORTS_WEBHOOK_ALLOW_INTERNAL_HOSTS": True,
}
return MockCurrentApp
def test_send_backoff_bounded_by_max_time(monkeypatch, mock_header_data) -> None:
"""
A persistently failing (500) target gives up on wall-time (``max_time``),
not just ``max_tries``. With the fake clock stepping +50s per backoff sample,
elapsed crosses ``max_time=120`` between the 2nd and 3rd POST, so exactly 3
POSTs happen (distinct from ``max_tries=5``). The terminal exception type is
unchanged on giveup.
"""
webhook_notification = _make_webhook(mock_header_data)
post_calls: list[int] = []
def fake_post(*args, **kwargs) -> _MockServerErrorResponse:
post_calls.append(1)
return _MockServerErrorResponse()
monkeypatch.setattr(
"superset.reports.notifications.webhook.current_app", _allow_internal_app()
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled",
lambda flag: True,
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.requests.post", fake_post
)
monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None)
monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(50))
with pytest.raises(NotificationUnprocessableException):
webhook_notification.send()
assert len(post_calls) == 3
def test_send_flat_clock_falls_back_to_max_tries(monkeypatch, mock_header_data) -> None:
"""
Characterization (NOT a RED discriminator): with the clock held flat,
``max_time`` can never fire, so ``max_tries=5`` governs and exactly 5 POSTs
happen. Passes on both buggy and fixed code; its job is to prove the 3-vs-5
delta in ``test_send_backoff_bounded_by_max_time`` is attributable to
wall-time, not to ``max_tries``.
"""
webhook_notification = _make_webhook(mock_header_data)
post_calls: list[int] = []
def fake_post(*args, **kwargs) -> _MockServerErrorResponse:
post_calls.append(1)
return _MockServerErrorResponse()
monkeypatch.setattr(
"superset.reports.notifications.webhook.current_app", _allow_internal_app()
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled",
lambda flag: True,
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.requests.post", fake_post
)
monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None)
monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(0))
with pytest.raises(NotificationUnprocessableException):
webhook_notification.send()
assert len(post_calls) == 5
def test_send_max_time_does_not_abandon_recovering_target(
monkeypatch, mock_header_data
) -> None:
"""
No-regression guard: a target that fails twice (500) then succeeds on the
3rd attempt — cumulative elapsed well under ``max_time`` — still succeeds.
Confirms ``max_time=120`` is not set so low that it abandons a target that
recovers within the normal retry window.
"""
webhook_notification = _make_webhook(mock_header_data)
post_calls: list[int] = []
class _OkResponse:
status_code = 200
text = ""
def fake_post(*args, **kwargs):
post_calls.append(1)
if len(post_calls) < 3:
return _MockServerErrorResponse()
return _OkResponse()
monkeypatch.setattr(
"superset.reports.notifications.webhook.current_app", _allow_internal_app()
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled",
lambda flag: True,
)
monkeypatch.setattr(
"superset.reports.notifications.webhook.requests.post", fake_post
)
monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None)
monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(10))
webhook_notification.send()
assert len(post_calls) == 3