Compare commits

...

3 Commits

Author SHA1 Message Date
Elizabeth Thompson
3fe7b64f69 chore(alerts): soften working_timeout constraint comment in config
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 01:37:50 +00:00
Elizabeth Thompson
6739576787 fix(alerts): improve working_timeout help text and add UI tooltip
Add actionable guidance to the working_timeout field so users know to
set it relative to their report's typical execution time. Previously
the description only said the field resets a stalled alert to error,
with no guidance on what value to choose.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 01:34:58 +00:00
Elizabeth Thompson
c7f053f648 fix(alerts): recover reports stuck in WORKING state after worker crash
When a Celery worker pod crashes mid-execution, the report stays in
WORKING state until working_timeout expires (up to 1 hour), then
transitions to ERROR instead of retrying.

This adds a stale detection threshold (ALERT_REPORTS_STALE_WORKING_TIMEOUT,
default 300s). When a requeued worker sees a WORKING state older than this
threshold, it resets to NOOP and re-executes rather than blocking with
PreviousWorkingError. The existing working_timeout path (ERROR) is
preserved for genuinely runaway jobs.

Also refactors is_on_working_timeout() to accept an already-fetched log,
eliminating a redundant DB query on every working-state evaluation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 01:21:06 +00:00
5 changed files with 180 additions and 15 deletions

View File

@@ -2628,6 +2628,11 @@ const AlertReportModal: FunctionComponent<AlertReportModalProps> = ({
<>
<div className="control-label">
{t('Working timeout')}
<InfoTooltip
tooltip={t(
'Maximum time in seconds this job is allowed to run before being considered stuck and marked as failed. Set this to at least 2-3x the typical execution time of this report.',
)}
/>
<span className="required">*</span>
</div>
<div className="input-container">

View File

@@ -815,13 +815,16 @@ class BaseReportState:
< last_success.end_dttm
)
def is_on_working_timeout(self) -> bool:
def is_on_working_timeout(
self, last_working: ReportExecutionLog | None = None
) -> bool:
"""
Checks if an alert is in a working timeout
"""
last_working = ReportScheduleDAO.find_last_entered_working_log(
self._report_schedule
)
if last_working is None:
last_working = ReportScheduleDAO.find_last_entered_working_log(
self._report_schedule
)
if not last_working:
return False
return (
@@ -933,20 +936,22 @@ class ReportWorkingState(BaseReportState):
next states:
- Error
- Working
- NOOP -> re-execute (stale crash recovery)
"""
current_states = [ReportState.WORKING]
def next(self) -> None:
if self.is_on_working_timeout():
last_working = ReportScheduleDAO.find_last_entered_working_log(
self._report_schedule
)
elapsed_seconds = (
(datetime.utcnow() - last_working.end_dttm).total_seconds()
if last_working
else None
)
last_working = ReportScheduleDAO.find_last_entered_working_log(
self._report_schedule
)
elapsed_seconds = (
(datetime.utcnow() - last_working.end_dttm).total_seconds()
if last_working
else None
)
if self.is_on_working_timeout(last_working):
logger.error(
"Working state timeout after %.2fs - execution_id: %s",
elapsed_seconds if elapsed_seconds else 0,
@@ -958,6 +963,39 @@ class ReportWorkingState(BaseReportState):
error_message=str(exception_timeout),
)
raise exception_timeout
stale_timeout = app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"]
if elapsed_seconds is not None and elapsed_seconds >= stale_timeout:
logger.warning(
"Report found in stale WORKING state after %.0fs, "
"likely due to crashed worker - resetting and retrying - "
"execution_id: %s",
elapsed_seconds,
self._execution_id,
)
if (
self._report_schedule.working_timeout is not None
and stale_timeout >= self._report_schedule.working_timeout
):
logger.warning(
"ALERT_REPORTS_STALE_WORKING_TIMEOUT (%ds) is >= working_timeout "
"(%ds) for schedule %s; stale recovery will never fire before "
"working_timeout. Consider lowering "
"ALERT_REPORTS_STALE_WORKING_TIMEOUT.",
stale_timeout,
self._report_schedule.working_timeout,
self._report_schedule.name,
)
self.update_report_schedule_and_log(
ReportState.NOOP, error_message="stale working state reset"
)
ReportNotTriggeredErrorState(
self._report_schedule,
self._scheduled_dttm,
self._execution_id,
).next()
return
logger.warning(
"Report still in working state, refusing to re-compute - execution_id: %s",
self._execution_id,

View File

@@ -1971,6 +1971,10 @@ ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG = int(timedelta(seconds=1).total_seconds
ALERT_REPORTS_DEFAULT_WORKING_TIMEOUT = 3600
ALERT_REPORTS_DEFAULT_RETENTION = 90
ALERT_REPORTS_DEFAULT_CRON_VALUE = "0 0 * * *" # every day
# Minimum elapsed time (seconds) before a WORKING state is considered stale
# (e.g. due to a crashed Celery worker) and eligible for reset + retry.
# Ideally less than working_timeout on any schedule.
ALERT_REPORTS_STALE_WORKING_TIMEOUT = 300
# If set to true no notification is sent, the worker will just log a message.
# Useful for debugging
ALERT_REPORTS_NOTIFICATION_DRY_RUN = False

View File

@@ -99,8 +99,10 @@ grace_period_description = (
"Superset nags you again. (in seconds)"
)
working_timeout_description = (
"If an alert is staled at a working state, how long until it's state is reset to"
" error"
"Maximum time in seconds a report or alert job is allowed to run before it is "
"considered stuck and its state is reset to error. Set this to at least 2-3x the "
"typical execution time of this report. Jobs that exceed this limit are terminated "
"and marked as failed."
)
creation_method_description = (
"Creation method is used to inform the frontend whether the report/alert was "

View File

@@ -1133,13 +1133,23 @@ def test_working_state_timeout_raises_timeout_error(mocker: MockerFixture) -> No
def test_working_state_still_working_raises_previous_working(
app: SupersetApp,
mocker: MockerFixture,
) -> None:
"""Working state not yet timed out should raise PreviousWorkingError."""
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
state = _make_state_instance(mocker, ReportWorkingState)
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
mocker.patch.object(state, "update_report_schedule_and_log")
mock_log = mocker.Mock()
mock_log.end_dttm = datetime.utcnow() - timedelta(seconds=30)
mocker.patch(
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
return_value=mock_log,
)
with pytest.raises(ReportSchedulePreviousWorkingError):
state.next()
@@ -1149,6 +1159,112 @@ def test_working_state_still_working_raises_previous_working(
)
def test_working_state_stale_resets_and_retries(
app: SupersetApp,
mocker: MockerFixture,
) -> None:
"""WORKING state older than stale threshold should reset to NOOP and retry."""
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
state = _make_state_instance(mocker, ReportWorkingState)
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
mocker.patch.object(state, "update_report_schedule_and_log")
mock_log = mocker.Mock()
mock_log.end_dttm = datetime.utcnow() - timedelta(seconds=1800) # 30 min ago
mocker.patch(
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
return_value=mock_log,
)
# Spy on __init__ to verify arg order: (schedule, scheduled_dttm, execution_id)
init_spy = mocker.patch.object(
ReportNotTriggeredErrorState, "__init__", return_value=None
)
mocker.patch.object(ReportNotTriggeredErrorState, "next")
state.next()
state.update_report_schedule_and_log.assert_called_once_with( # type: ignore[attr-defined]
ReportState.NOOP, error_message="stale working state reset"
)
init_spy.assert_called_once_with(
state._report_schedule,
state._scheduled_dttm,
state._execution_id,
)
def test_working_state_elapsed_at_stale_boundary_resets(
app: SupersetApp,
mocker: MockerFixture,
) -> None:
"""WORKING state elapsed exactly at stale threshold should trigger reset."""
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
state = _make_state_instance(mocker, ReportWorkingState)
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
mocker.patch.object(state, "update_report_schedule_and_log")
mock_log = mocker.Mock()
mock_log.end_dttm = datetime.utcnow() - timedelta(seconds=300)
mocker.patch(
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
return_value=mock_log,
)
mocker.patch.object(ReportNotTriggeredErrorState, "__init__", return_value=None)
mock_retry_next = mocker.patch.object(ReportNotTriggeredErrorState, "next")
state.next()
state.update_report_schedule_and_log.assert_called_once_with( # type: ignore[attr-defined]
ReportState.NOOP, error_message="stale working state reset"
)
mock_retry_next.assert_called_once()
def test_working_state_elapsed_just_below_stale_threshold_blocks(
app: SupersetApp,
mocker: MockerFixture,
) -> None:
"""WORKING state below stale threshold should raise PreviousWorkingError."""
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
state = _make_state_instance(mocker, ReportWorkingState)
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
mocker.patch.object(state, "update_report_schedule_and_log")
mock_log = mocker.Mock()
mock_log.end_dttm = datetime.utcnow() - timedelta(seconds=299)
mocker.patch(
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
return_value=mock_log,
)
with pytest.raises(ReportSchedulePreviousWorkingError):
state.next()
def test_working_state_no_log_found_raises_previous_working(
app: SupersetApp,
mocker: MockerFixture,
) -> None:
"""When no working log exists, elapsed is unknown — block re-computation."""
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
state = _make_state_instance(mocker, ReportWorkingState)
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
mocker.patch.object(state, "update_report_schedule_and_log")
mocker.patch(
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
return_value=None,
)
with pytest.raises(ReportSchedulePreviousWorkingError):
state.next()
def test_success_state_grace_period_returns_without_sending(
mocker: MockerFixture,
) -> None: