mirror of
https://github.com/apache/superset.git
synced 2026-04-28 12:34:23 +00:00
Compare commits
3 Commits
fix/postgr
...
alerts-rep
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3fe7b64f69 | ||
|
|
6739576787 | ||
|
|
c7f053f648 |
@@ -2628,6 +2628,11 @@ const AlertReportModal: FunctionComponent<AlertReportModalProps> = ({
|
||||
<>
|
||||
<div className="control-label">
|
||||
{t('Working timeout')}
|
||||
<InfoTooltip
|
||||
tooltip={t(
|
||||
'Maximum time in seconds this job is allowed to run before being considered stuck and marked as failed. Set this to at least 2-3x the typical execution time of this report.',
|
||||
)}
|
||||
/>
|
||||
<span className="required">*</span>
|
||||
</div>
|
||||
<div className="input-container">
|
||||
|
||||
@@ -815,13 +815,16 @@ class BaseReportState:
|
||||
< last_success.end_dttm
|
||||
)
|
||||
|
||||
def is_on_working_timeout(self) -> bool:
|
||||
def is_on_working_timeout(
|
||||
self, last_working: ReportExecutionLog | None = None
|
||||
) -> bool:
|
||||
"""
|
||||
Checks if an alert is in a working timeout
|
||||
"""
|
||||
last_working = ReportScheduleDAO.find_last_entered_working_log(
|
||||
self._report_schedule
|
||||
)
|
||||
if last_working is None:
|
||||
last_working = ReportScheduleDAO.find_last_entered_working_log(
|
||||
self._report_schedule
|
||||
)
|
||||
if not last_working:
|
||||
return False
|
||||
return (
|
||||
@@ -933,20 +936,22 @@ class ReportWorkingState(BaseReportState):
|
||||
next states:
|
||||
- Error
|
||||
- Working
|
||||
- NOOP -> re-execute (stale crash recovery)
|
||||
"""
|
||||
|
||||
current_states = [ReportState.WORKING]
|
||||
|
||||
def next(self) -> None:
|
||||
if self.is_on_working_timeout():
|
||||
last_working = ReportScheduleDAO.find_last_entered_working_log(
|
||||
self._report_schedule
|
||||
)
|
||||
elapsed_seconds = (
|
||||
(datetime.utcnow() - last_working.end_dttm).total_seconds()
|
||||
if last_working
|
||||
else None
|
||||
)
|
||||
last_working = ReportScheduleDAO.find_last_entered_working_log(
|
||||
self._report_schedule
|
||||
)
|
||||
elapsed_seconds = (
|
||||
(datetime.utcnow() - last_working.end_dttm).total_seconds()
|
||||
if last_working
|
||||
else None
|
||||
)
|
||||
|
||||
if self.is_on_working_timeout(last_working):
|
||||
logger.error(
|
||||
"Working state timeout after %.2fs - execution_id: %s",
|
||||
elapsed_seconds if elapsed_seconds else 0,
|
||||
@@ -958,6 +963,39 @@ class ReportWorkingState(BaseReportState):
|
||||
error_message=str(exception_timeout),
|
||||
)
|
||||
raise exception_timeout
|
||||
|
||||
stale_timeout = app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"]
|
||||
if elapsed_seconds is not None and elapsed_seconds >= stale_timeout:
|
||||
logger.warning(
|
||||
"Report found in stale WORKING state after %.0fs, "
|
||||
"likely due to crashed worker - resetting and retrying - "
|
||||
"execution_id: %s",
|
||||
elapsed_seconds,
|
||||
self._execution_id,
|
||||
)
|
||||
if (
|
||||
self._report_schedule.working_timeout is not None
|
||||
and stale_timeout >= self._report_schedule.working_timeout
|
||||
):
|
||||
logger.warning(
|
||||
"ALERT_REPORTS_STALE_WORKING_TIMEOUT (%ds) is >= working_timeout "
|
||||
"(%ds) for schedule %s; stale recovery will never fire before "
|
||||
"working_timeout. Consider lowering "
|
||||
"ALERT_REPORTS_STALE_WORKING_TIMEOUT.",
|
||||
stale_timeout,
|
||||
self._report_schedule.working_timeout,
|
||||
self._report_schedule.name,
|
||||
)
|
||||
self.update_report_schedule_and_log(
|
||||
ReportState.NOOP, error_message="stale working state reset"
|
||||
)
|
||||
ReportNotTriggeredErrorState(
|
||||
self._report_schedule,
|
||||
self._scheduled_dttm,
|
||||
self._execution_id,
|
||||
).next()
|
||||
return
|
||||
|
||||
logger.warning(
|
||||
"Report still in working state, refusing to re-compute - execution_id: %s",
|
||||
self._execution_id,
|
||||
|
||||
@@ -1971,6 +1971,10 @@ ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG = int(timedelta(seconds=1).total_seconds
|
||||
ALERT_REPORTS_DEFAULT_WORKING_TIMEOUT = 3600
|
||||
ALERT_REPORTS_DEFAULT_RETENTION = 90
|
||||
ALERT_REPORTS_DEFAULT_CRON_VALUE = "0 0 * * *" # every day
|
||||
# Minimum elapsed time (seconds) before a WORKING state is considered stale
|
||||
# (e.g. due to a crashed Celery worker) and eligible for reset + retry.
|
||||
# Ideally less than working_timeout on any schedule.
|
||||
ALERT_REPORTS_STALE_WORKING_TIMEOUT = 300
|
||||
# If set to true no notification is sent, the worker will just log a message.
|
||||
# Useful for debugging
|
||||
ALERT_REPORTS_NOTIFICATION_DRY_RUN = False
|
||||
|
||||
@@ -99,8 +99,10 @@ grace_period_description = (
|
||||
"Superset nags you again. (in seconds)"
|
||||
)
|
||||
working_timeout_description = (
|
||||
"If an alert is staled at a working state, how long until it's state is reset to"
|
||||
" error"
|
||||
"Maximum time in seconds a report or alert job is allowed to run before it is "
|
||||
"considered stuck and its state is reset to error. Set this to at least 2-3x the "
|
||||
"typical execution time of this report. Jobs that exceed this limit are terminated "
|
||||
"and marked as failed."
|
||||
)
|
||||
creation_method_description = (
|
||||
"Creation method is used to inform the frontend whether the report/alert was "
|
||||
|
||||
@@ -1133,13 +1133,23 @@ def test_working_state_timeout_raises_timeout_error(mocker: MockerFixture) -> No
|
||||
|
||||
|
||||
def test_working_state_still_working_raises_previous_working(
|
||||
app: SupersetApp,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""Working state not yet timed out should raise PreviousWorkingError."""
|
||||
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
|
||||
|
||||
state = _make_state_instance(mocker, ReportWorkingState)
|
||||
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
|
||||
mocker.patch.object(state, "update_report_schedule_and_log")
|
||||
|
||||
mock_log = mocker.Mock()
|
||||
mock_log.end_dttm = datetime.utcnow() - timedelta(seconds=30)
|
||||
mocker.patch(
|
||||
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
|
||||
return_value=mock_log,
|
||||
)
|
||||
|
||||
with pytest.raises(ReportSchedulePreviousWorkingError):
|
||||
state.next()
|
||||
|
||||
@@ -1149,6 +1159,112 @@ def test_working_state_still_working_raises_previous_working(
|
||||
)
|
||||
|
||||
|
||||
def test_working_state_stale_resets_and_retries(
|
||||
app: SupersetApp,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""WORKING state older than stale threshold should reset to NOOP and retry."""
|
||||
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
|
||||
|
||||
state = _make_state_instance(mocker, ReportWorkingState)
|
||||
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
|
||||
mocker.patch.object(state, "update_report_schedule_and_log")
|
||||
|
||||
mock_log = mocker.Mock()
|
||||
mock_log.end_dttm = datetime.utcnow() - timedelta(seconds=1800) # 30 min ago
|
||||
mocker.patch(
|
||||
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
|
||||
return_value=mock_log,
|
||||
)
|
||||
|
||||
# Spy on __init__ to verify arg order: (schedule, scheduled_dttm, execution_id)
|
||||
init_spy = mocker.patch.object(
|
||||
ReportNotTriggeredErrorState, "__init__", return_value=None
|
||||
)
|
||||
mocker.patch.object(ReportNotTriggeredErrorState, "next")
|
||||
|
||||
state.next()
|
||||
|
||||
state.update_report_schedule_and_log.assert_called_once_with( # type: ignore[attr-defined]
|
||||
ReportState.NOOP, error_message="stale working state reset"
|
||||
)
|
||||
init_spy.assert_called_once_with(
|
||||
state._report_schedule,
|
||||
state._scheduled_dttm,
|
||||
state._execution_id,
|
||||
)
|
||||
|
||||
|
||||
def test_working_state_elapsed_at_stale_boundary_resets(
|
||||
app: SupersetApp,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""WORKING state elapsed exactly at stale threshold should trigger reset."""
|
||||
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
|
||||
|
||||
state = _make_state_instance(mocker, ReportWorkingState)
|
||||
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
|
||||
mocker.patch.object(state, "update_report_schedule_and_log")
|
||||
|
||||
mock_log = mocker.Mock()
|
||||
mock_log.end_dttm = datetime.utcnow() - timedelta(seconds=300)
|
||||
mocker.patch(
|
||||
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
|
||||
return_value=mock_log,
|
||||
)
|
||||
|
||||
mocker.patch.object(ReportNotTriggeredErrorState, "__init__", return_value=None)
|
||||
mock_retry_next = mocker.patch.object(ReportNotTriggeredErrorState, "next")
|
||||
|
||||
state.next()
|
||||
|
||||
state.update_report_schedule_and_log.assert_called_once_with( # type: ignore[attr-defined]
|
||||
ReportState.NOOP, error_message="stale working state reset"
|
||||
)
|
||||
mock_retry_next.assert_called_once()
|
||||
|
||||
|
||||
def test_working_state_elapsed_just_below_stale_threshold_blocks(
|
||||
app: SupersetApp,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""WORKING state below stale threshold should raise PreviousWorkingError."""
|
||||
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
|
||||
|
||||
state = _make_state_instance(mocker, ReportWorkingState)
|
||||
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
|
||||
mocker.patch.object(state, "update_report_schedule_and_log")
|
||||
|
||||
mock_log = mocker.Mock()
|
||||
mock_log.end_dttm = datetime.utcnow() - timedelta(seconds=299)
|
||||
mocker.patch(
|
||||
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
|
||||
return_value=mock_log,
|
||||
)
|
||||
|
||||
with pytest.raises(ReportSchedulePreviousWorkingError):
|
||||
state.next()
|
||||
|
||||
|
||||
def test_working_state_no_log_found_raises_previous_working(
|
||||
app: SupersetApp,
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
"""When no working log exists, elapsed is unknown — block re-computation."""
|
||||
app.config["ALERT_REPORTS_STALE_WORKING_TIMEOUT"] = 300
|
||||
|
||||
state = _make_state_instance(mocker, ReportWorkingState)
|
||||
mocker.patch.object(state, "is_on_working_timeout", return_value=False)
|
||||
mocker.patch.object(state, "update_report_schedule_and_log")
|
||||
mocker.patch(
|
||||
"superset.commands.report.execute.ReportScheduleDAO.find_last_entered_working_log",
|
||||
return_value=None,
|
||||
)
|
||||
|
||||
with pytest.raises(ReportSchedulePreviousWorkingError):
|
||||
state.next()
|
||||
|
||||
|
||||
def test_success_state_grace_period_returns_without_sending(
|
||||
mocker: MockerFixture,
|
||||
) -> None:
|
||||
|
||||
Reference in New Issue
Block a user