From 1e3aaab59075c5220e444759e07a26c080586fed Mon Sep 17 00:00:00 2001 From: Daniel Vaz Gaspar Date: Wed, 9 Dec 2020 18:19:07 +0000 Subject: [PATCH] fix(reports): validator_config, report state machine, working_timeout (#11890) * fix(reports): expect more exceptions and fix validator config * use a state pattern on command reports * use a state pattern on command reports continue * fix multiple heads * fix unittests * add more tests * fix api tests after enum rename * fix alembic multiple heads * fix tests * fix fixture cleanup * fix mysql tests * fix initial and not found state * fix schema, and private public methods, addressing comments * add new col to the API --- ...0e76_reports_add_working_timeout_column.py | 41 ++ superset/models/reports.py | 8 +- superset/reports/api.py | 2 + superset/reports/commands/alert.py | 15 +- superset/reports/commands/exceptions.py | 25 + superset/reports/commands/execute.py | 478 +++++++++++------- superset/reports/commands/log_prune.py | 13 +- superset/reports/dao.py | 11 +- superset/reports/schemas.py | 21 +- superset/tasks/scheduler.py | 3 + tests/reports/api_tests.py | 8 +- tests/reports/commands_tests.py | 201 ++++++-- 12 files changed, 588 insertions(+), 238 deletions(-) create mode 100644 superset/migrations/versions/5daced1f0e76_reports_add_working_timeout_column.py diff --git a/superset/migrations/versions/5daced1f0e76_reports_add_working_timeout_column.py b/superset/migrations/versions/5daced1f0e76_reports_add_working_timeout_column.py new file mode 100644 index 00000000000..b836ef01f52 --- /dev/null +++ b/superset/migrations/versions/5daced1f0e76_reports_add_working_timeout_column.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""reports add working_timeout column + +Revision ID: 5daced1f0e76 +Revises: e38177dbf641 +Create Date: 2020-12-03 10:11:22.894977 + +""" + +# revision identifiers, used by Alembic. +revision = "5daced1f0e76" +down_revision = "811494c0cc23" + +import sqlalchemy as sa +from alembic import op + + +def upgrade(): + op.add_column( + "report_schedule", + sa.Column("working_timeout", sa.Integer(), default=60 * 60 * 4, nullable=True), + ) + + +def downgrade(): + op.drop_column("report_schedule", "working_timeout") diff --git a/superset/models/reports.py b/superset/models/reports.py index 7b1f1831399..0cecaa8a76c 100644 --- a/superset/models/reports.py +++ b/superset/models/reports.py @@ -58,11 +58,12 @@ class ReportRecipientType(str, enum.Enum): SLACK = "Slack" -class ReportLogState(str, enum.Enum): +class ReportState(str, enum.Enum): SUCCESS = "Success" WORKING = "Working" ERROR = "Error" NOOP = "Not triggered" + GRACE = "On Grace" class ReportEmailFormat(str, enum.Enum): @@ -112,7 +113,7 @@ class ReportSchedule(Model, AuditMixinNullable): # (Alerts) Stamped last observations last_eval_dttm = Column(DateTime) - last_state = Column(String(50)) + last_state = Column(String(50), default=ReportState.NOOP) last_value = Column(Float) last_value_row_json = Column(Text) @@ -122,7 +123,10 @@ class ReportSchedule(Model, AuditMixinNullable): # Log retention log_retention = Column(Integer, default=90) + # (Alerts) After a success how long to wait for a new trigger (seconds) grace_period = Column(Integer, default=60 * 60 * 4) + # (Alerts/Reports) Unlock a possible stalled working state + working_timeout = Column(Integer, default=60 * 60 * 1) def __repr__(self) -> str: return str(self.name) diff --git a/superset/reports/api.py b/superset/reports/api.py index 4d9e0045893..d66ba74f028 100644 --- a/superset/reports/api.py +++ b/superset/reports/api.py @@ -87,6 +87,7 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi): "recipients.id", "recipients.type", "recipients.recipient_config_json", + "working_timeout", ] show_select_columns = show_columns + [ "chart.datasource_id", @@ -128,6 +129,7 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi): "recipients", "sql", "type", + "working_timeout", "validator_config_json", "validator_type", ] diff --git a/superset/reports/commands/alert.py b/superset/reports/commands/alert.py index cab294ce5c2..ae72bbd2662 100644 --- a/superset/reports/commands/alert.py +++ b/superset/reports/commands/alert.py @@ -29,6 +29,7 @@ from superset.reports.commands.exceptions import ( AlertQueryInvalidTypeError, AlertQueryMultipleColumnsError, AlertQueryMultipleRowsError, + AlertValidatorConfigError, ) logger = logging.getLogger(__name__) @@ -49,9 +50,14 @@ class AlertCommand(BaseCommand): self._report_schedule.last_value_row_json = self._result return self._result not in (0, None, np.nan) self._report_schedule.last_value = self._result - operator = json.loads(self._report_schedule.validator_config_json)["op"] - threshold = json.loads(self._report_schedule.validator_config_json)["threshold"] - return OPERATOR_FUNCTIONS[operator](self._result, threshold) + try: + operator = json.loads(self._report_schedule.validator_config_json)["op"] + threshold = json.loads(self._report_schedule.validator_config_json)[ + "threshold" + ] + return OPERATOR_FUNCTIONS[operator](self._result, threshold) + except (KeyError, json.JSONDecodeError): + raise AlertValidatorConfigError() def _validate_not_null(self, rows: np.recarray) -> None: self._result = rows[0][1] @@ -68,9 +74,10 @@ class AlertCommand(BaseCommand): # check if query returned more then one column if len(rows[0]) > 2: raise AlertQueryMultipleColumnsError( + # len is subtracted by 1 to discard pandas index column _( "Alert query returned more then one column. %s columns returned" - % len(rows[0]) + % (len(rows[0]) - 1) ) ) if rows[0][1] is None: diff --git a/superset/reports/commands/exceptions.py b/superset/reports/commands/exceptions.py index 3a56a49b9d7..b6fd375a917 100644 --- a/superset/reports/commands/exceptions.py +++ b/superset/reports/commands/exceptions.py @@ -119,6 +119,10 @@ class ReportSchedulePreviousWorkingError(CommandException): message = _("Report Schedule is still working, refusing to re-compute.") +class ReportScheduleWorkingTimeoutError(CommandException): + message = _("Report Schedule reached a working timeout.") + + class ReportScheduleNameUniquenessValidationError(ValidationError): """ Marshmallow validation error for Report Schedule name already exists @@ -133,6 +137,11 @@ class AlertQueryMultipleRowsError(CommandException): message = _("Alert query returned more then one row.") +class AlertValidatorConfigError(CommandException): + + message = _("Alert validator config error.") + + class AlertQueryMultipleColumnsError(CommandException): message = _("Alert query returned more then one column.") @@ -145,5 +154,21 @@ class ReportScheduleAlertGracePeriodError(CommandException): message = _("Alert fired during grace period.") +class ReportScheduleAlertEndGracePeriodError(CommandException): + message = _("Alert ended grace period.") + + class ReportScheduleNotificationError(CommandException): message = _("Alert on grace period") + + +class ReportScheduleSelleniumUserNotFoundError(CommandException): + message = _("Report Schedule sellenium user not found") + + +class ReportScheduleStateNotFoundError(CommandException): + message = _("Report Schedule state not found") + + +class ReportScheduleUnexpectedError(CommandException): + message = _("Report schedule unexpected error") diff --git a/superset/reports/commands/execute.py b/superset/reports/commands/execute.py index bb3384702d1..ab3ee5854f5 100644 --- a/superset/reports/commands/execute.py +++ b/superset/reports/commands/execute.py @@ -16,28 +16,33 @@ # under the License. import logging from datetime import datetime, timedelta -from typing import Optional +from typing import List, Optional +from flask_appbuilder.security.sqla.models import User from sqlalchemy.orm import Session from superset import app, thumbnail_cache from superset.commands.base import BaseCommand from superset.commands.exceptions import CommandException -from superset.extensions import security_manager from superset.models.reports import ( ReportExecutionLog, - ReportLogState, ReportSchedule, ReportScheduleType, + ReportState, ) from superset.reports.commands.alert import AlertCommand from superset.reports.commands.exceptions import ( + ReportScheduleAlertEndGracePeriodError, ReportScheduleAlertGracePeriodError, ReportScheduleExecuteUnexpectedError, ReportScheduleNotFoundError, ReportScheduleNotificationError, ReportSchedulePreviousWorkingError, ReportScheduleScreenshotFailedError, + ReportScheduleSelleniumUserNotFoundError, + ReportScheduleStateNotFoundError, + ReportScheduleUnexpectedError, + ReportScheduleWorkingTimeoutError, ) from superset.reports.dao import ReportScheduleDAO from superset.reports.notifications import create_notification @@ -54,6 +59,294 @@ from superset.utils.urls import get_url_path logger = logging.getLogger(__name__) +class BaseReportState: + current_states: List[ReportState] = [] + initial: bool = False + + def __init__( + self, + session: Session, + report_schedule: ReportSchedule, + scheduled_dttm: datetime, + ) -> None: + self._session = session + self._report_schedule = report_schedule + self._scheduled_dttm = scheduled_dttm + self._start_dttm = datetime.utcnow() + + def set_state_and_log( + self, state: ReportState, error_message: Optional[str] = None, + ) -> None: + """ + Updates current ReportSchedule state and TS. If on final state writes the log + for this execution + """ + now_dttm = datetime.utcnow() + self.set_state(state, now_dttm) + self.create_log( + state, error_message=error_message, + ) + + def set_state(self, state: ReportState, dttm: datetime) -> None: + """ + Set the current report schedule state, on this case we want to + commit immediately + """ + self._report_schedule.last_state = state + self._report_schedule.last_eval_dttm = dttm + self._session.merge(self._report_schedule) + self._session.commit() + + def create_log( # pylint: disable=too-many-arguments + self, state: ReportState, error_message: Optional[str] = None, + ) -> None: + """ + Creates a Report execution log, uses the current computed last_value for Alerts + """ + log = ReportExecutionLog( + scheduled_dttm=self._scheduled_dttm, + start_dttm=self._start_dttm, + end_dttm=datetime.utcnow(), + value=self._report_schedule.last_value, + value_row_json=self._report_schedule.last_value_row_json, + state=state, + error_message=error_message, + report_schedule=self._report_schedule, + ) + self._session.add(log) + self._session.commit() + + def _get_url(self, user_friendly: bool = False) -> str: + """ + Get the url for this report schedule: chart or dashboard + """ + if self._report_schedule.chart: + return get_url_path( + "Superset.slice", + user_friendly=user_friendly, + slice_id=self._report_schedule.chart_id, + standalone="true", + ) + return get_url_path( + "Superset.dashboard", + user_friendly=user_friendly, + dashboard_id_or_slug=self._report_schedule.dashboard_id, + ) + + def _get_screenshot_user(self) -> User: + user = ( + self._session.query(User) + .filter(User.username == app.config["THUMBNAIL_SELENIUM_USER"]) + .one_or_none() + ) + if not user: + raise ReportScheduleSelleniumUserNotFoundError() + return user + + def _get_screenshot(self) -> ScreenshotData: + """ + Get a chart or dashboard screenshot + :raises: ReportScheduleScreenshotFailedError + """ + url = self._get_url() + screenshot: Optional[BaseScreenshot] = None + if self._report_schedule.chart: + screenshot = ChartScreenshot(url, self._report_schedule.chart.digest) + else: + screenshot = DashboardScreenshot( + url, self._report_schedule.dashboard.digest + ) + image_url = self._get_url(user_friendly=True) + user = self._get_screenshot_user() + image_data = screenshot.compute_and_cache( + user=user, cache=thumbnail_cache, force=True, + ) + if not image_data: + raise ReportScheduleScreenshotFailedError() + return ScreenshotData(url=image_url, image=image_data) + + def _get_notification_content(self) -> NotificationContent: + """ + Gets a notification content, this is composed by a title and a screenshot + :raises: ReportScheduleScreenshotFailedError + """ + screenshot_data = self._get_screenshot() + if self._report_schedule.chart: + name = ( + f"{self._report_schedule.name}: " + f"{self._report_schedule.chart.slice_name}" + ) + else: + name = ( + f"{self._report_schedule.name}: " + f"{self._report_schedule.dashboard.dashboard_title}" + ) + return NotificationContent(name=name, screenshot=screenshot_data) + + def send(self) -> None: + """ + Creates the notification content and sends them to all recipients + + :raises: ReportScheduleNotificationError + """ + notification_errors = [] + notification_content = self._get_notification_content() + for recipient in self._report_schedule.recipients: + notification = create_notification(recipient, notification_content) + try: + notification.send() + except NotificationError as ex: + # collect notification errors but keep processing them + notification_errors.append(str(ex)) + if notification_errors: + raise ReportScheduleNotificationError(";".join(notification_errors)) + + def is_in_grace_period(self) -> bool: + """ + Checks if an alert is on it's grace period + """ + last_success = ReportScheduleDAO.find_last_success_log( + self._report_schedule, session=self._session + ) + return ( + last_success is not None + and self._report_schedule.grace_period + and datetime.utcnow() + - timedelta(seconds=self._report_schedule.grace_period) + < last_success.end_dttm + ) + + def is_on_working_timeout(self) -> bool: + """ + Checks if an alert is on a working timeout + """ + return ( + self._report_schedule.working_timeout is not None + and self._report_schedule.last_eval_dttm is not None + and datetime.utcnow() + - timedelta(seconds=self._report_schedule.working_timeout) + > self._report_schedule.last_eval_dttm + ) + + def next(self) -> None: + raise NotImplementedError() + + +class ReportNotTriggeredErrorState(BaseReportState): + """ + Handle Not triggered and Error state + next final states: + - Not Triggered + - Success + - Error + """ + + current_states = [ReportState.NOOP, ReportState.ERROR] + initial = True + + def next(self) -> None: + self.set_state_and_log(ReportState.WORKING) + try: + # If it's an alert check if the alert is triggered + if self._report_schedule.type == ReportScheduleType.ALERT: + if not AlertCommand(self._report_schedule).run(): + self.set_state_and_log(ReportState.NOOP) + return + self.send() + self.set_state_and_log(ReportState.SUCCESS) + except CommandException as ex: + self.set_state_and_log(ReportState.ERROR, error_message=str(ex)) + raise ex + + +class ReportWorkingState(BaseReportState): + """ + Handle Working state + next states: + - Error + - Working + """ + + current_states = [ReportState.WORKING] + + def next(self) -> None: + if self.is_on_working_timeout(): + exception_timeout = ReportScheduleWorkingTimeoutError() + self.set_state_and_log( + ReportState.ERROR, error_message=str(exception_timeout), + ) + raise exception_timeout + exception_working = ReportSchedulePreviousWorkingError() + self.set_state_and_log( + ReportState.WORKING, error_message=str(exception_working), + ) + raise exception_working + + +class ReportSuccessState(BaseReportState): + """ + Handle Success, Grace state + next states: + - Grace + - Not triggered + - Success + """ + + current_states = [ReportState.SUCCESS, ReportState.GRACE] + + def next(self) -> None: + self.set_state_and_log(ReportState.WORKING) + if self._report_schedule.type == ReportScheduleType.ALERT: + if self.is_in_grace_period(): + self.set_state_and_log( + ReportState.GRACE, + error_message=str(ReportScheduleAlertGracePeriodError()), + ) + return + self.set_state_and_log( + ReportState.NOOP, + error_message=str(ReportScheduleAlertEndGracePeriodError()), + ) + return + try: + self.send() + self.set_state_and_log(ReportState.SUCCESS) + except CommandException as ex: + self.set_state_and_log(ReportState.ERROR, error_message=str(ex)) + + +class ReportScheduleStateMachine: # pylint: disable=too-few-public-methods + """ + Simple state machine for Alerts/Reports states + """ + + states_cls = [ReportWorkingState, ReportNotTriggeredErrorState, ReportSuccessState] + + def __init__( + self, + session: Session, + report_schedule: ReportSchedule, + scheduled_dttm: datetime, + ): + self._session = session + self._report_schedule = report_schedule + self._scheduled_dttm = scheduled_dttm + + def run(self) -> None: + state_found = False + for state_cls in self.states_cls: + if (self._report_schedule.last_state is None and state_cls.initial) or ( + self._report_schedule.last_state in state_cls.current_states + ): + state_cls( + self._session, self._report_schedule, self._scheduled_dttm + ).next() + state_found = True + break + if not state_found: + raise ReportScheduleStateNotFoundError() + + class AsyncExecuteReportScheduleCommand(BaseCommand): """ Execute all types of report schedules. @@ -66,171 +359,19 @@ class AsyncExecuteReportScheduleCommand(BaseCommand): self._model: Optional[ReportSchedule] = None self._scheduled_dttm = scheduled_dttm - def set_state_and_log( - self, - session: Session, - start_dttm: datetime, - state: ReportLogState, - error_message: Optional[str] = None, - ) -> None: - """ - Updates current ReportSchedule state and TS. If on final state writes the log - for this execution - """ - now_dttm = datetime.utcnow() - if state == ReportLogState.WORKING: - self.set_state(session, state, now_dttm) - return - self.set_state(session, state, now_dttm) - self.create_log( - session, start_dttm, now_dttm, state, error_message=error_message, - ) - - def set_state( - self, session: Session, state: ReportLogState, dttm: datetime - ) -> None: - """ - Set the current report schedule state, on this case we want to - commit immediately - """ - if self._model: - self._model.last_state = state - self._model.last_eval_dttm = dttm - session.commit() - - def create_log( # pylint: disable=too-many-arguments - self, - session: Session, - start_dttm: datetime, - end_dttm: datetime, - state: ReportLogState, - error_message: Optional[str] = None, - ) -> None: - """ - Creates a Report execution log, uses the current computed last_value for Alerts - """ - if self._model: - log = ReportExecutionLog( - scheduled_dttm=self._scheduled_dttm, - start_dttm=start_dttm, - end_dttm=end_dttm, - value=self._model.last_value, - value_row_json=self._model.last_value_row_json, - state=state, - error_message=error_message, - report_schedule=self._model, - ) - session.add(log) - - @staticmethod - def _get_url(report_schedule: ReportSchedule, user_friendly: bool = False) -> str: - """ - Get the url for this report schedule: chart or dashboard - """ - if report_schedule.chart: - return get_url_path( - "Superset.slice", - user_friendly=user_friendly, - slice_id=report_schedule.chart_id, - standalone="true", - ) - return get_url_path( - "Superset.dashboard", - user_friendly=user_friendly, - dashboard_id_or_slug=report_schedule.dashboard_id, - ) - - def _get_screenshot(self, report_schedule: ReportSchedule) -> ScreenshotData: - """ - Get a chart or dashboard screenshot - :raises: ReportScheduleScreenshotFailedError - """ - url = self._get_url(report_schedule) - screenshot: Optional[BaseScreenshot] = None - if report_schedule.chart: - screenshot = ChartScreenshot(url, report_schedule.chart.digest) - else: - screenshot = DashboardScreenshot(url, report_schedule.dashboard.digest) - image_url = self._get_url(report_schedule, user_friendly=True) - user = security_manager.find_user(app.config["THUMBNAIL_SELENIUM_USER"]) - image_data = screenshot.compute_and_cache( - user=user, cache=thumbnail_cache, force=True, - ) - if not image_data: - raise ReportScheduleScreenshotFailedError() - return ScreenshotData(url=image_url, image=image_data) - - def _get_notification_content( - self, report_schedule: ReportSchedule - ) -> NotificationContent: - """ - Gets a notification content, this is composed by a title and a screenshot - :raises: ReportScheduleScreenshotFailedError - """ - screenshot_data = self._get_screenshot(report_schedule) - if report_schedule.chart: - name = report_schedule.chart.slice_name - else: - name = report_schedule.dashboard.dashboard_title - return NotificationContent(name=name, screenshot=screenshot_data) - - def _send(self, report_schedule: ReportSchedule) -> None: - """ - Creates the notification content and sends them to all recipients - - :raises: ReportScheduleNotificationError - """ - notification_errors = [] - notification_content = self._get_notification_content(report_schedule) - for recipient in report_schedule.recipients: - notification = create_notification(recipient, notification_content) - try: - notification.send() - except NotificationError as ex: - # collect notification errors but keep processing them - notification_errors.append(str(ex)) - if notification_errors: - raise ReportScheduleNotificationError(";".join(notification_errors)) - def run(self) -> None: with session_scope(nullpool=True) as session: try: - start_dttm = datetime.utcnow() self.validate(session=session) if not self._model: raise ReportScheduleExecuteUnexpectedError() - self.set_state_and_log(session, start_dttm, ReportLogState.WORKING) - # If it's an alert check if the alert is triggered - if self._model.type == ReportScheduleType.ALERT: - if not AlertCommand(self._model).run(): - self.set_state_and_log(session, start_dttm, ReportLogState.NOOP) - return - - self._send(self._model) - - # Log, state and TS - self.set_state_and_log(session, start_dttm, ReportLogState.SUCCESS) - except ReportScheduleAlertGracePeriodError as ex: - self.set_state_and_log( - session, start_dttm, ReportLogState.NOOP, error_message=str(ex) - ) - except ReportSchedulePreviousWorkingError as ex: - self.create_log( - session, - start_dttm, - datetime.utcnow(), - state=ReportLogState.ERROR, - error_message=str(ex), - ) - session.commit() - raise + ReportScheduleStateMachine( + session, self._model, self._scheduled_dttm + ).run() except CommandException as ex: - self.set_state_and_log( - session, start_dttm, ReportLogState.ERROR, error_message=str(ex) - ) - # We want to actually commit the state and log inside the scope - session.commit() - raise + raise ex + except Exception as ex: + raise ReportScheduleUnexpectedError(str(ex)) def validate( # pylint: disable=arguments-differ self, session: Session = None @@ -239,18 +380,3 @@ class AsyncExecuteReportScheduleCommand(BaseCommand): self._model = ReportScheduleDAO.find_by_id(self._model_id, session=session) if not self._model: raise ReportScheduleNotFoundError() - # Avoid overlap processing - if self._model.last_state == ReportLogState.WORKING: - raise ReportSchedulePreviousWorkingError() - # Check grace period - if self._model.type == ReportScheduleType.ALERT: - last_success = ReportScheduleDAO.find_last_success_log(session) - if ( - last_success - and self._model.last_state - in (ReportLogState.SUCCESS, ReportLogState.NOOP) - and self._model.grace_period - and datetime.utcnow() - timedelta(seconds=self._model.grace_period) - < last_success.end_dttm - ): - raise ReportScheduleAlertGracePeriodError() diff --git a/superset/reports/commands/log_prune.py b/superset/reports/commands/log_prune.py index 9825a35eef2..77f2ce54978 100644 --- a/superset/reports/commands/log_prune.py +++ b/superset/reports/commands/log_prune.py @@ -37,12 +37,13 @@ class AsyncPruneReportScheduleLogCommand(BaseCommand): with session_scope(nullpool=True) as session: self.validate() for report_schedule in session.query(ReportSchedule).all(): - from_date = datetime.utcnow() - timedelta( - days=report_schedule.log_retention - ) - ReportScheduleDAO.bulk_delete_logs( - report_schedule, from_date, session=session, commit=False - ) + if report_schedule.log_retention is not None: + from_date = datetime.utcnow() - timedelta( + days=report_schedule.log_retention + ) + ReportScheduleDAO.bulk_delete_logs( + report_schedule, from_date, session=session, commit=False + ) def validate(self) -> None: pass diff --git a/superset/reports/dao.py b/superset/reports/dao.py index aaa0c391906..2392ca38999 100644 --- a/superset/reports/dao.py +++ b/superset/reports/dao.py @@ -27,9 +27,9 @@ from superset.dao.exceptions import DAOCreateFailedError, DAODeleteFailedError from superset.extensions import db from superset.models.reports import ( ReportExecutionLog, - ReportLogState, ReportRecipients, ReportSchedule, + ReportState, ) logger = logging.getLogger(__name__) @@ -204,15 +204,18 @@ class ReportScheduleDAO(BaseDAO): @staticmethod def find_last_success_log( - session: Optional[Session] = None, + report_schedule: ReportSchedule, session: Optional[Session] = None, ) -> Optional[ReportExecutionLog]: """ - Finds last success execution log + Finds last success execution log for a given report """ session = session or db.session return ( session.query(ReportExecutionLog) - .filter(ReportExecutionLog.state == ReportLogState.SUCCESS) + .filter( + ReportExecutionLog.state == ReportState.SUCCESS, + ReportExecutionLog.report_schedule == report_schedule, + ) .order_by(ReportExecutionLog.end_dttm.desc()) .first() ) diff --git a/superset/reports/schemas.py b/superset/reports/schemas.py index a03b3394c27..4acbdf53d3b 100644 --- a/superset/reports/schemas.py +++ b/superset/reports/schemas.py @@ -77,6 +77,10 @@ grace_period_description = ( "Once an alert is triggered, how long, in seconds, before " "Superset nags you again. (in seconds)" ) +working_timeout_description = ( + "If an alert is staled at a working state, how long until it's state is reseted to" + " error" +) def validate_crontab(value: Union[bytes, bytearray, str]) -> None: @@ -85,7 +89,7 @@ def validate_crontab(value: Union[bytes, bytearray, str]) -> None: class ValidatorConfigJSONSchema(Schema): - operation = fields.String( + op = fields.String( # pylint: disable=invalid-name description=validator_config_json_op_description, validate=validate.OneOf(choices=["<", "<=", ">", ">=", "==", "!="]), ) @@ -155,7 +159,15 @@ class ReportSchedulePostSchema(Schema): ) validator_config_json = fields.Nested(ValidatorConfigJSONSchema) log_retention = fields.Integer(description=log_retention_description, example=90) - grace_period = fields.Integer(description=grace_period_description, example=14400) + grace_period = fields.Integer( + description=grace_period_description, example=60 * 60 * 4, default=60 * 60 * 4 + ) + working_timeout = fields.Integer( + description=working_timeout_description, + example=60 * 60 * 1, + default=60 * 60 * 1, + ) + recipients = fields.List(fields.Nested(ReportRecipientSchema)) @@ -204,6 +216,9 @@ class ReportSchedulePutSchema(Schema): description=log_retention_description, example=90, required=False ) grace_period = fields.Integer( - description=grace_period_description, example=14400, required=False + description=grace_period_description, example=60 * 60 * 4, required=False + ) + working_timeout = fields.Integer( + description=working_timeout_description, example=60 * 60 * 1, required=False ) recipients = fields.List(fields.Nested(ReportRecipientSchema), required=False) diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 62398f08df9..0ac0ce397ee 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -50,6 +50,9 @@ def scheduler() -> None: active_schedules = ReportScheduleDAO.find_active(session) for active_schedule in active_schedules: for schedule in cron_schedule_window(active_schedule.crontab): + logger.info( + "Scheduling alert %s eta: %s", active_schedule.name, schedule + ) execute.apply_async((active_schedule.id, schedule,), eta=schedule) diff --git a/tests/reports/api_tests.py b/tests/reports/api_tests.py index ff396e2c725..b868ab4c8a5 100644 --- a/tests/reports/api_tests.py +++ b/tests/reports/api_tests.py @@ -36,7 +36,7 @@ from superset.models.reports import ( ReportExecutionLog, ReportScheduleType, ReportRecipientType, - ReportLogState, + ReportState, ) from tests.base_tests import SupersetTestCase @@ -70,7 +70,7 @@ class TestReportSchedulesApi(SupersetTestCase): logs.append( ReportExecutionLog( scheduled_dttm=datetime(2020, 1, 1), - state=ReportLogState.ERROR, + state=ReportState.ERROR, error_message=f"Error {cy}", ) ) @@ -796,7 +796,7 @@ class TestReportSchedulesApi(SupersetTestCase): self.login(username="admin") arguments = { "columns": ["name"], - "filters": [{"col": "state", "opr": "eq", "value": ReportLogState.SUCCESS}], + "filters": [{"col": "state", "opr": "eq", "value": ReportState.SUCCESS}], } uri = f"api/v1/report/{report_schedule.id}/log/?q={prison.dumps(arguments)}" rv = self.get_assert_metric(uri, "get_list") @@ -816,7 +816,7 @@ class TestReportSchedulesApi(SupersetTestCase): .one_or_none() ) - data = {"state": ReportLogState.ERROR, "error_message": "New error changed"} + data = {"state": ReportState.ERROR, "error_message": "New error changed"} self.login(username="admin") uri = f"api/v1/report/{report_schedule.id}/log/" diff --git a/tests/reports/commands_tests.py b/tests/reports/commands_tests.py index d5566944e53..89fa37556e5 100644 --- a/tests/reports/commands_tests.py +++ b/tests/reports/commands_tests.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. import json -from datetime import datetime +from datetime import datetime, timedelta from typing import List, Optional from unittest.mock import patch @@ -29,12 +29,12 @@ from superset.models.core import Database from superset.models.dashboard import Dashboard from superset.models.reports import ( ReportExecutionLog, - ReportLogState, ReportRecipients, ReportRecipientType, ReportSchedule, ReportScheduleType, ReportScheduleValidatorType, + ReportState, ) from superset.models.slice import Slice from superset.reports.commands.exceptions import ( @@ -43,6 +43,7 @@ from superset.reports.commands.exceptions import ( ReportScheduleNotFoundError, ReportScheduleNotificationError, ReportSchedulePreviousWorkingError, + ReportScheduleWorkingTimeoutError, ) from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand from superset.utils.core import get_example_database @@ -61,9 +62,16 @@ def get_target_from_report_schedule(report_schedule) -> List[str]: def assert_log(state: str, error_message: Optional[str] = None): db.session.commit() logs = db.session.query(ReportExecutionLog).all() - assert len(logs) == 1 - assert logs[0].error_message == error_message - assert logs[0].state == state + if state == ReportState.WORKING: + assert len(logs) == 1 + assert logs[0].error_message == error_message + assert logs[0].state == state + return + assert len(logs) == 2 + log_states = [log.state for log in logs] + assert ReportState.WORKING in log_states + assert state in log_states + assert error_message in [log.error_message for log in logs] def create_report_notification( @@ -107,6 +115,18 @@ def create_report_notification( return report_schedule +def cleanup_report_schedule(report_schedule: ReportSchedule) -> None: + db.session.query(ReportExecutionLog).filter( + ReportExecutionLog.report_schedule == report_schedule + ).delete() + db.session.query(ReportRecipients).filter( + ReportRecipients.report_schedule == report_schedule + ).delete() + + db.session.delete(report_schedule) + db.session.commit() + + @pytest.yield_fixture() def create_report_email_chart(): with app.app_context(): @@ -116,8 +136,7 @@ def create_report_email_chart(): ) yield report_schedule - db.session.delete(report_schedule) - db.session.commit() + cleanup_report_schedule(report_schedule) @pytest.yield_fixture() @@ -129,8 +148,7 @@ def create_report_email_dashboard(): ) yield report_schedule - db.session.delete(report_schedule) - db.session.commit() + cleanup_report_schedule(report_schedule) @pytest.yield_fixture() @@ -142,8 +160,7 @@ def create_report_slack_chart(): ) yield report_schedule - db.session.delete(report_schedule) - db.session.commit() + cleanup_report_schedule(report_schedule) @pytest.yield_fixture() @@ -153,12 +170,64 @@ def create_report_slack_chart_working(): report_schedule = create_report_notification( slack_channel="slack_channel", chart=chart ) - report_schedule.last_state = ReportLogState.WORKING + report_schedule.last_state = ReportState.WORKING + report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0) db.session.commit() yield report_schedule - db.session.delete(report_schedule) + cleanup_report_schedule(report_schedule) + + +@pytest.yield_fixture() +def create_alert_slack_chart_success(): + with app.app_context(): + chart = db.session.query(Slice).first() + report_schedule = create_report_notification( + slack_channel="slack_channel", + chart=chart, + report_type=ReportScheduleType.ALERT, + ) + report_schedule.last_state = ReportState.SUCCESS + report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0) + + log = ReportExecutionLog( + report_schedule=report_schedule, + state=ReportState.SUCCESS, + start_dttm=report_schedule.last_eval_dttm, + end_dttm=report_schedule.last_eval_dttm, + scheduled_dttm=report_schedule.last_eval_dttm, + ) + db.session.add(log) db.session.commit() + yield report_schedule + + cleanup_report_schedule(report_schedule) + + +@pytest.yield_fixture() +def create_alert_slack_chart_grace(): + with app.app_context(): + chart = db.session.query(Slice).first() + report_schedule = create_report_notification( + slack_channel="slack_channel", + chart=chart, + report_type=ReportScheduleType.ALERT, + ) + report_schedule.last_state = ReportState.GRACE + report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0) + + log = ReportExecutionLog( + report_schedule=report_schedule, + state=ReportState.SUCCESS, + start_dttm=report_schedule.last_eval_dttm, + end_dttm=report_schedule.last_eval_dttm, + scheduled_dttm=report_schedule.last_eval_dttm, + ) + db.session.add(log) + db.session.commit() + yield report_schedule + + cleanup_report_schedule(report_schedule) @pytest.yield_fixture( @@ -217,8 +286,7 @@ def create_alert_email_chart(request): ) yield report_schedule - db.session.delete(report_schedule) - db.session.commit() + cleanup_report_schedule(report_schedule) @contextmanager @@ -291,8 +359,7 @@ def create_no_alert_email_chart(request): ) yield report_schedule - db.session.delete(report_schedule) - db.session.commit() + cleanup_report_schedule(report_schedule) @pytest.yield_fixture(params=["alert1", "alert2"]) @@ -327,17 +394,7 @@ def create_mul_alert_email_chart(request): ) yield report_schedule - # needed for MySQL - logs = ( - db.session.query(ReportExecutionLog) - .filter(ReportExecutionLog.report_schedule == report_schedule) - .all() - ) - for log in logs: - db.session.delete(log) - db.session.commit() - db.session.delete(report_schedule) - db.session.commit() + cleanup_report_schedule(report_schedule) @pytest.mark.usefixtures("create_report_email_chart") @@ -367,7 +424,7 @@ def test_email_chart_report_schedule( smtp_images = email_mock.call_args[1]["images"] assert smtp_images[list(smtp_images.keys())[0]] == screenshot # Assert logs are correct - assert_log(ReportLogState.SUCCESS) + assert_log(ReportState.SUCCESS) @pytest.mark.usefixtures("create_report_email_dashboard") @@ -397,7 +454,7 @@ def test_email_dashboard_report_schedule( smtp_images = email_mock.call_args[1]["images"] assert smtp_images[list(smtp_images.keys())[0]] == screenshot # Assert logs are correct - assert_log(ReportLogState.SUCCESS) + assert_log(ReportState.SUCCESS) @pytest.mark.usefixtures("create_report_slack_chart") @@ -425,7 +482,7 @@ def test_slack_chart_report_schedule( assert file_upload_mock.call_args[1]["file"] == screenshot # Assert logs are correct - assert_log(ReportLogState.SUCCESS) + assert_log(ReportState.SUCCESS) @pytest.mark.usefixtures("create_report_slack_chart") @@ -444,15 +501,81 @@ def test_report_schedule_working(create_report_slack_chart_working): ExecuteReport Command: Test report schedule still working """ # setup screenshot mock - with pytest.raises(ReportSchedulePreviousWorkingError): + with freeze_time("2020-01-01T00:00:00Z"): + with pytest.raises(ReportSchedulePreviousWorkingError): + AsyncExecuteReportScheduleCommand( + create_report_slack_chart_working.id, datetime.utcnow() + ).run() + + assert_log( + ReportState.WORKING, + error_message=ReportSchedulePreviousWorkingError.message, + ) + assert create_report_slack_chart_working.last_state == ReportState.WORKING + + +@pytest.mark.usefixtures("create_report_slack_chart_working") +def test_report_schedule_working_timeout(create_report_slack_chart_working): + """ + ExecuteReport Command: Test report schedule still working but should timed out + """ + # setup screenshot mock + current_time = create_report_slack_chart_working.last_eval_dttm + timedelta( + seconds=create_report_slack_chart_working.working_timeout + 1 + ) + + with freeze_time(current_time): + with pytest.raises(ReportScheduleWorkingTimeoutError): + AsyncExecuteReportScheduleCommand( + create_report_slack_chart_working.id, datetime.utcnow() + ).run() + + # Only needed for MySQL, understand why + db.session.commit() + logs = db.session.query(ReportExecutionLog).all() + assert len(logs) == 1 + assert logs[0].error_message == ReportScheduleWorkingTimeoutError.message + assert logs[0].state == ReportState.ERROR + + assert create_report_slack_chart_working.last_state == ReportState.ERROR + + +@pytest.mark.usefixtures("create_alert_slack_chart_success") +def test_report_schedule_success_grace(create_alert_slack_chart_success): + """ + ExecuteReport Command: Test report schedule on success to grace + """ + # set current time to within the grace period + current_time = create_alert_slack_chart_success.last_eval_dttm + timedelta( + seconds=create_alert_slack_chart_success.grace_period - 10 + ) + + with freeze_time(current_time): AsyncExecuteReportScheduleCommand( - create_report_slack_chart_working.id, datetime.utcnow() + create_alert_slack_chart_success.id, datetime.utcnow() ).run() - assert_log( - ReportLogState.ERROR, error_message=ReportSchedulePreviousWorkingError.message + db.session.commit() + assert create_alert_slack_chart_success.last_state == ReportState.GRACE + + +@pytest.mark.usefixtures("create_alert_slack_chart_grace") +def test_report_schedule_success_grace_end(create_alert_slack_chart_grace): + """ + ExecuteReport Command: Test report schedule on grace to noop + """ + # set current time to within the grace period + current_time = create_alert_slack_chart_grace.last_eval_dttm + timedelta( + seconds=create_alert_slack_chart_grace.grace_period + 1 ) - assert create_report_slack_chart_working.last_state == ReportLogState.WORKING + + with freeze_time(current_time): + AsyncExecuteReportScheduleCommand( + create_alert_slack_chart_grace.id, datetime.utcnow() + ).run() + + db.session.commit() + assert create_alert_slack_chart_grace.last_state == ReportState.NOOP @pytest.mark.usefixtures("create_report_email_dashboard") @@ -476,7 +599,7 @@ def test_email_dashboard_report_fails( create_report_email_dashboard.id, datetime.utcnow() ).run() - assert_log(ReportLogState.ERROR, error_message="Could not connect to SMTP XPTO") + assert_log(ReportState.ERROR, error_message="Could not connect to SMTP XPTO") @pytest.mark.usefixtures("create_alert_email_chart") @@ -502,7 +625,7 @@ def test_slack_chart_alert(screenshot_mock, email_mock, create_alert_email_chart smtp_images = email_mock.call_args[1]["images"] assert smtp_images[list(smtp_images.keys())[0]] == screenshot # Assert logs are correct - assert_log(ReportLogState.SUCCESS) + assert_log(ReportState.SUCCESS) @pytest.mark.usefixtures("create_no_alert_email_chart") @@ -514,7 +637,7 @@ def test_email_chart_no_alert(create_no_alert_email_chart): AsyncExecuteReportScheduleCommand( create_no_alert_email_chart.id, datetime.utcnow() ).run() - assert_log(ReportLogState.NOOP) + assert_log(ReportState.NOOP) @pytest.mark.usefixtures("create_mul_alert_email_chart")