fix(reports): validator_config, report state machine, working_timeout (#11890)

* fix(reports): expect more exceptions and fix validator config

* use a state pattern on command reports

* use a state pattern on command reports continue

* fix multiple heads

* fix unittests

* add more tests

* fix api tests after enum rename

* fix alembic multiple heads

* fix tests

* fix fixture cleanup

* fix mysql tests

* fix initial and not found state

* fix schema, and private public methods, addressing comments

* add new col to the API
This commit is contained in:
Daniel Vaz Gaspar
2020-12-09 18:19:07 +00:00
committed by GitHub
parent 150c8e36b1
commit 1e3aaab590
12 changed files with 588 additions and 238 deletions

View File

@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""reports add working_timeout column
Revision ID: 5daced1f0e76
Revises: e38177dbf641
Create Date: 2020-12-03 10:11:22.894977
"""
# revision identifiers, used by Alembic.
revision = "5daced1f0e76"
down_revision = "811494c0cc23"
import sqlalchemy as sa
from alembic import op
def upgrade():
op.add_column(
"report_schedule",
sa.Column("working_timeout", sa.Integer(), default=60 * 60 * 4, nullable=True),
)
def downgrade():
op.drop_column("report_schedule", "working_timeout")

View File

@@ -58,11 +58,12 @@ class ReportRecipientType(str, enum.Enum):
SLACK = "Slack"
class ReportLogState(str, enum.Enum):
class ReportState(str, enum.Enum):
SUCCESS = "Success"
WORKING = "Working"
ERROR = "Error"
NOOP = "Not triggered"
GRACE = "On Grace"
class ReportEmailFormat(str, enum.Enum):
@@ -112,7 +113,7 @@ class ReportSchedule(Model, AuditMixinNullable):
# (Alerts) Stamped last observations
last_eval_dttm = Column(DateTime)
last_state = Column(String(50))
last_state = Column(String(50), default=ReportState.NOOP)
last_value = Column(Float)
last_value_row_json = Column(Text)
@@ -122,7 +123,10 @@ class ReportSchedule(Model, AuditMixinNullable):
# Log retention
log_retention = Column(Integer, default=90)
# (Alerts) After a success how long to wait for a new trigger (seconds)
grace_period = Column(Integer, default=60 * 60 * 4)
# (Alerts/Reports) Unlock a possible stalled working state
working_timeout = Column(Integer, default=60 * 60 * 1)
def __repr__(self) -> str:
return str(self.name)

View File

@@ -87,6 +87,7 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi):
"recipients.id",
"recipients.type",
"recipients.recipient_config_json",
"working_timeout",
]
show_select_columns = show_columns + [
"chart.datasource_id",
@@ -128,6 +129,7 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi):
"recipients",
"sql",
"type",
"working_timeout",
"validator_config_json",
"validator_type",
]

View File

@@ -29,6 +29,7 @@ from superset.reports.commands.exceptions import (
AlertQueryInvalidTypeError,
AlertQueryMultipleColumnsError,
AlertQueryMultipleRowsError,
AlertValidatorConfigError,
)
logger = logging.getLogger(__name__)
@@ -49,9 +50,14 @@ class AlertCommand(BaseCommand):
self._report_schedule.last_value_row_json = self._result
return self._result not in (0, None, np.nan)
self._report_schedule.last_value = self._result
operator = json.loads(self._report_schedule.validator_config_json)["op"]
threshold = json.loads(self._report_schedule.validator_config_json)["threshold"]
return OPERATOR_FUNCTIONS[operator](self._result, threshold)
try:
operator = json.loads(self._report_schedule.validator_config_json)["op"]
threshold = json.loads(self._report_schedule.validator_config_json)[
"threshold"
]
return OPERATOR_FUNCTIONS[operator](self._result, threshold)
except (KeyError, json.JSONDecodeError):
raise AlertValidatorConfigError()
def _validate_not_null(self, rows: np.recarray) -> None:
self._result = rows[0][1]
@@ -68,9 +74,10 @@ class AlertCommand(BaseCommand):
# check if query returned more then one column
if len(rows[0]) > 2:
raise AlertQueryMultipleColumnsError(
# len is subtracted by 1 to discard pandas index column
_(
"Alert query returned more then one column. %s columns returned"
% len(rows[0])
% (len(rows[0]) - 1)
)
)
if rows[0][1] is None:

View File

@@ -119,6 +119,10 @@ class ReportSchedulePreviousWorkingError(CommandException):
message = _("Report Schedule is still working, refusing to re-compute.")
class ReportScheduleWorkingTimeoutError(CommandException):
message = _("Report Schedule reached a working timeout.")
class ReportScheduleNameUniquenessValidationError(ValidationError):
"""
Marshmallow validation error for Report Schedule name already exists
@@ -133,6 +137,11 @@ class AlertQueryMultipleRowsError(CommandException):
message = _("Alert query returned more then one row.")
class AlertValidatorConfigError(CommandException):
message = _("Alert validator config error.")
class AlertQueryMultipleColumnsError(CommandException):
message = _("Alert query returned more then one column.")
@@ -145,5 +154,21 @@ class ReportScheduleAlertGracePeriodError(CommandException):
message = _("Alert fired during grace period.")
class ReportScheduleAlertEndGracePeriodError(CommandException):
message = _("Alert ended grace period.")
class ReportScheduleNotificationError(CommandException):
message = _("Alert on grace period")
class ReportScheduleSelleniumUserNotFoundError(CommandException):
message = _("Report Schedule sellenium user not found")
class ReportScheduleStateNotFoundError(CommandException):
message = _("Report Schedule state not found")
class ReportScheduleUnexpectedError(CommandException):
message = _("Report schedule unexpected error")

View File

@@ -16,28 +16,33 @@
# under the License.
import logging
from datetime import datetime, timedelta
from typing import Optional
from typing import List, Optional
from flask_appbuilder.security.sqla.models import User
from sqlalchemy.orm import Session
from superset import app, thumbnail_cache
from superset.commands.base import BaseCommand
from superset.commands.exceptions import CommandException
from superset.extensions import security_manager
from superset.models.reports import (
ReportExecutionLog,
ReportLogState,
ReportSchedule,
ReportScheduleType,
ReportState,
)
from superset.reports.commands.alert import AlertCommand
from superset.reports.commands.exceptions import (
ReportScheduleAlertEndGracePeriodError,
ReportScheduleAlertGracePeriodError,
ReportScheduleExecuteUnexpectedError,
ReportScheduleNotFoundError,
ReportScheduleNotificationError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
ReportScheduleSelleniumUserNotFoundError,
ReportScheduleStateNotFoundError,
ReportScheduleUnexpectedError,
ReportScheduleWorkingTimeoutError,
)
from superset.reports.dao import ReportScheduleDAO
from superset.reports.notifications import create_notification
@@ -54,6 +59,294 @@ from superset.utils.urls import get_url_path
logger = logging.getLogger(__name__)
class BaseReportState:
current_states: List[ReportState] = []
initial: bool = False
def __init__(
self,
session: Session,
report_schedule: ReportSchedule,
scheduled_dttm: datetime,
) -> None:
self._session = session
self._report_schedule = report_schedule
self._scheduled_dttm = scheduled_dttm
self._start_dttm = datetime.utcnow()
def set_state_and_log(
self, state: ReportState, error_message: Optional[str] = None,
) -> None:
"""
Updates current ReportSchedule state and TS. If on final state writes the log
for this execution
"""
now_dttm = datetime.utcnow()
self.set_state(state, now_dttm)
self.create_log(
state, error_message=error_message,
)
def set_state(self, state: ReportState, dttm: datetime) -> None:
"""
Set the current report schedule state, on this case we want to
commit immediately
"""
self._report_schedule.last_state = state
self._report_schedule.last_eval_dttm = dttm
self._session.merge(self._report_schedule)
self._session.commit()
def create_log( # pylint: disable=too-many-arguments
self, state: ReportState, error_message: Optional[str] = None,
) -> None:
"""
Creates a Report execution log, uses the current computed last_value for Alerts
"""
log = ReportExecutionLog(
scheduled_dttm=self._scheduled_dttm,
start_dttm=self._start_dttm,
end_dttm=datetime.utcnow(),
value=self._report_schedule.last_value,
value_row_json=self._report_schedule.last_value_row_json,
state=state,
error_message=error_message,
report_schedule=self._report_schedule,
)
self._session.add(log)
self._session.commit()
def _get_url(self, user_friendly: bool = False) -> str:
"""
Get the url for this report schedule: chart or dashboard
"""
if self._report_schedule.chart:
return get_url_path(
"Superset.slice",
user_friendly=user_friendly,
slice_id=self._report_schedule.chart_id,
standalone="true",
)
return get_url_path(
"Superset.dashboard",
user_friendly=user_friendly,
dashboard_id_or_slug=self._report_schedule.dashboard_id,
)
def _get_screenshot_user(self) -> User:
user = (
self._session.query(User)
.filter(User.username == app.config["THUMBNAIL_SELENIUM_USER"])
.one_or_none()
)
if not user:
raise ReportScheduleSelleniumUserNotFoundError()
return user
def _get_screenshot(self) -> ScreenshotData:
"""
Get a chart or dashboard screenshot
:raises: ReportScheduleScreenshotFailedError
"""
url = self._get_url()
screenshot: Optional[BaseScreenshot] = None
if self._report_schedule.chart:
screenshot = ChartScreenshot(url, self._report_schedule.chart.digest)
else:
screenshot = DashboardScreenshot(
url, self._report_schedule.dashboard.digest
)
image_url = self._get_url(user_friendly=True)
user = self._get_screenshot_user()
image_data = screenshot.compute_and_cache(
user=user, cache=thumbnail_cache, force=True,
)
if not image_data:
raise ReportScheduleScreenshotFailedError()
return ScreenshotData(url=image_url, image=image_data)
def _get_notification_content(self) -> NotificationContent:
"""
Gets a notification content, this is composed by a title and a screenshot
:raises: ReportScheduleScreenshotFailedError
"""
screenshot_data = self._get_screenshot()
if self._report_schedule.chart:
name = (
f"{self._report_schedule.name}: "
f"{self._report_schedule.chart.slice_name}"
)
else:
name = (
f"{self._report_schedule.name}: "
f"{self._report_schedule.dashboard.dashboard_title}"
)
return NotificationContent(name=name, screenshot=screenshot_data)
def send(self) -> None:
"""
Creates the notification content and sends them to all recipients
:raises: ReportScheduleNotificationError
"""
notification_errors = []
notification_content = self._get_notification_content()
for recipient in self._report_schedule.recipients:
notification = create_notification(recipient, notification_content)
try:
notification.send()
except NotificationError as ex:
# collect notification errors but keep processing them
notification_errors.append(str(ex))
if notification_errors:
raise ReportScheduleNotificationError(";".join(notification_errors))
def is_in_grace_period(self) -> bool:
"""
Checks if an alert is on it's grace period
"""
last_success = ReportScheduleDAO.find_last_success_log(
self._report_schedule, session=self._session
)
return (
last_success is not None
and self._report_schedule.grace_period
and datetime.utcnow()
- timedelta(seconds=self._report_schedule.grace_period)
< last_success.end_dttm
)
def is_on_working_timeout(self) -> bool:
"""
Checks if an alert is on a working timeout
"""
return (
self._report_schedule.working_timeout is not None
and self._report_schedule.last_eval_dttm is not None
and datetime.utcnow()
- timedelta(seconds=self._report_schedule.working_timeout)
> self._report_schedule.last_eval_dttm
)
def next(self) -> None:
raise NotImplementedError()
class ReportNotTriggeredErrorState(BaseReportState):
"""
Handle Not triggered and Error state
next final states:
- Not Triggered
- Success
- Error
"""
current_states = [ReportState.NOOP, ReportState.ERROR]
initial = True
def next(self) -> None:
self.set_state_and_log(ReportState.WORKING)
try:
# If it's an alert check if the alert is triggered
if self._report_schedule.type == ReportScheduleType.ALERT:
if not AlertCommand(self._report_schedule).run():
self.set_state_and_log(ReportState.NOOP)
return
self.send()
self.set_state_and_log(ReportState.SUCCESS)
except CommandException as ex:
self.set_state_and_log(ReportState.ERROR, error_message=str(ex))
raise ex
class ReportWorkingState(BaseReportState):
"""
Handle Working state
next states:
- Error
- Working
"""
current_states = [ReportState.WORKING]
def next(self) -> None:
if self.is_on_working_timeout():
exception_timeout = ReportScheduleWorkingTimeoutError()
self.set_state_and_log(
ReportState.ERROR, error_message=str(exception_timeout),
)
raise exception_timeout
exception_working = ReportSchedulePreviousWorkingError()
self.set_state_and_log(
ReportState.WORKING, error_message=str(exception_working),
)
raise exception_working
class ReportSuccessState(BaseReportState):
"""
Handle Success, Grace state
next states:
- Grace
- Not triggered
- Success
"""
current_states = [ReportState.SUCCESS, ReportState.GRACE]
def next(self) -> None:
self.set_state_and_log(ReportState.WORKING)
if self._report_schedule.type == ReportScheduleType.ALERT:
if self.is_in_grace_period():
self.set_state_and_log(
ReportState.GRACE,
error_message=str(ReportScheduleAlertGracePeriodError()),
)
return
self.set_state_and_log(
ReportState.NOOP,
error_message=str(ReportScheduleAlertEndGracePeriodError()),
)
return
try:
self.send()
self.set_state_and_log(ReportState.SUCCESS)
except CommandException as ex:
self.set_state_and_log(ReportState.ERROR, error_message=str(ex))
class ReportScheduleStateMachine: # pylint: disable=too-few-public-methods
"""
Simple state machine for Alerts/Reports states
"""
states_cls = [ReportWorkingState, ReportNotTriggeredErrorState, ReportSuccessState]
def __init__(
self,
session: Session,
report_schedule: ReportSchedule,
scheduled_dttm: datetime,
):
self._session = session
self._report_schedule = report_schedule
self._scheduled_dttm = scheduled_dttm
def run(self) -> None:
state_found = False
for state_cls in self.states_cls:
if (self._report_schedule.last_state is None and state_cls.initial) or (
self._report_schedule.last_state in state_cls.current_states
):
state_cls(
self._session, self._report_schedule, self._scheduled_dttm
).next()
state_found = True
break
if not state_found:
raise ReportScheduleStateNotFoundError()
class AsyncExecuteReportScheduleCommand(BaseCommand):
"""
Execute all types of report schedules.
@@ -66,171 +359,19 @@ class AsyncExecuteReportScheduleCommand(BaseCommand):
self._model: Optional[ReportSchedule] = None
self._scheduled_dttm = scheduled_dttm
def set_state_and_log(
self,
session: Session,
start_dttm: datetime,
state: ReportLogState,
error_message: Optional[str] = None,
) -> None:
"""
Updates current ReportSchedule state and TS. If on final state writes the log
for this execution
"""
now_dttm = datetime.utcnow()
if state == ReportLogState.WORKING:
self.set_state(session, state, now_dttm)
return
self.set_state(session, state, now_dttm)
self.create_log(
session, start_dttm, now_dttm, state, error_message=error_message,
)
def set_state(
self, session: Session, state: ReportLogState, dttm: datetime
) -> None:
"""
Set the current report schedule state, on this case we want to
commit immediately
"""
if self._model:
self._model.last_state = state
self._model.last_eval_dttm = dttm
session.commit()
def create_log( # pylint: disable=too-many-arguments
self,
session: Session,
start_dttm: datetime,
end_dttm: datetime,
state: ReportLogState,
error_message: Optional[str] = None,
) -> None:
"""
Creates a Report execution log, uses the current computed last_value for Alerts
"""
if self._model:
log = ReportExecutionLog(
scheduled_dttm=self._scheduled_dttm,
start_dttm=start_dttm,
end_dttm=end_dttm,
value=self._model.last_value,
value_row_json=self._model.last_value_row_json,
state=state,
error_message=error_message,
report_schedule=self._model,
)
session.add(log)
@staticmethod
def _get_url(report_schedule: ReportSchedule, user_friendly: bool = False) -> str:
"""
Get the url for this report schedule: chart or dashboard
"""
if report_schedule.chart:
return get_url_path(
"Superset.slice",
user_friendly=user_friendly,
slice_id=report_schedule.chart_id,
standalone="true",
)
return get_url_path(
"Superset.dashboard",
user_friendly=user_friendly,
dashboard_id_or_slug=report_schedule.dashboard_id,
)
def _get_screenshot(self, report_schedule: ReportSchedule) -> ScreenshotData:
"""
Get a chart or dashboard screenshot
:raises: ReportScheduleScreenshotFailedError
"""
url = self._get_url(report_schedule)
screenshot: Optional[BaseScreenshot] = None
if report_schedule.chart:
screenshot = ChartScreenshot(url, report_schedule.chart.digest)
else:
screenshot = DashboardScreenshot(url, report_schedule.dashboard.digest)
image_url = self._get_url(report_schedule, user_friendly=True)
user = security_manager.find_user(app.config["THUMBNAIL_SELENIUM_USER"])
image_data = screenshot.compute_and_cache(
user=user, cache=thumbnail_cache, force=True,
)
if not image_data:
raise ReportScheduleScreenshotFailedError()
return ScreenshotData(url=image_url, image=image_data)
def _get_notification_content(
self, report_schedule: ReportSchedule
) -> NotificationContent:
"""
Gets a notification content, this is composed by a title and a screenshot
:raises: ReportScheduleScreenshotFailedError
"""
screenshot_data = self._get_screenshot(report_schedule)
if report_schedule.chart:
name = report_schedule.chart.slice_name
else:
name = report_schedule.dashboard.dashboard_title
return NotificationContent(name=name, screenshot=screenshot_data)
def _send(self, report_schedule: ReportSchedule) -> None:
"""
Creates the notification content and sends them to all recipients
:raises: ReportScheduleNotificationError
"""
notification_errors = []
notification_content = self._get_notification_content(report_schedule)
for recipient in report_schedule.recipients:
notification = create_notification(recipient, notification_content)
try:
notification.send()
except NotificationError as ex:
# collect notification errors but keep processing them
notification_errors.append(str(ex))
if notification_errors:
raise ReportScheduleNotificationError(";".join(notification_errors))
def run(self) -> None:
with session_scope(nullpool=True) as session:
try:
start_dttm = datetime.utcnow()
self.validate(session=session)
if not self._model:
raise ReportScheduleExecuteUnexpectedError()
self.set_state_and_log(session, start_dttm, ReportLogState.WORKING)
# If it's an alert check if the alert is triggered
if self._model.type == ReportScheduleType.ALERT:
if not AlertCommand(self._model).run():
self.set_state_and_log(session, start_dttm, ReportLogState.NOOP)
return
self._send(self._model)
# Log, state and TS
self.set_state_and_log(session, start_dttm, ReportLogState.SUCCESS)
except ReportScheduleAlertGracePeriodError as ex:
self.set_state_and_log(
session, start_dttm, ReportLogState.NOOP, error_message=str(ex)
)
except ReportSchedulePreviousWorkingError as ex:
self.create_log(
session,
start_dttm,
datetime.utcnow(),
state=ReportLogState.ERROR,
error_message=str(ex),
)
session.commit()
raise
ReportScheduleStateMachine(
session, self._model, self._scheduled_dttm
).run()
except CommandException as ex:
self.set_state_and_log(
session, start_dttm, ReportLogState.ERROR, error_message=str(ex)
)
# We want to actually commit the state and log inside the scope
session.commit()
raise
raise ex
except Exception as ex:
raise ReportScheduleUnexpectedError(str(ex))
def validate( # pylint: disable=arguments-differ
self, session: Session = None
@@ -239,18 +380,3 @@ class AsyncExecuteReportScheduleCommand(BaseCommand):
self._model = ReportScheduleDAO.find_by_id(self._model_id, session=session)
if not self._model:
raise ReportScheduleNotFoundError()
# Avoid overlap processing
if self._model.last_state == ReportLogState.WORKING:
raise ReportSchedulePreviousWorkingError()
# Check grace period
if self._model.type == ReportScheduleType.ALERT:
last_success = ReportScheduleDAO.find_last_success_log(session)
if (
last_success
and self._model.last_state
in (ReportLogState.SUCCESS, ReportLogState.NOOP)
and self._model.grace_period
and datetime.utcnow() - timedelta(seconds=self._model.grace_period)
< last_success.end_dttm
):
raise ReportScheduleAlertGracePeriodError()

View File

@@ -37,12 +37,13 @@ class AsyncPruneReportScheduleLogCommand(BaseCommand):
with session_scope(nullpool=True) as session:
self.validate()
for report_schedule in session.query(ReportSchedule).all():
from_date = datetime.utcnow() - timedelta(
days=report_schedule.log_retention
)
ReportScheduleDAO.bulk_delete_logs(
report_schedule, from_date, session=session, commit=False
)
if report_schedule.log_retention is not None:
from_date = datetime.utcnow() - timedelta(
days=report_schedule.log_retention
)
ReportScheduleDAO.bulk_delete_logs(
report_schedule, from_date, session=session, commit=False
)
def validate(self) -> None:
pass

View File

@@ -27,9 +27,9 @@ from superset.dao.exceptions import DAOCreateFailedError, DAODeleteFailedError
from superset.extensions import db
from superset.models.reports import (
ReportExecutionLog,
ReportLogState,
ReportRecipients,
ReportSchedule,
ReportState,
)
logger = logging.getLogger(__name__)
@@ -204,15 +204,18 @@ class ReportScheduleDAO(BaseDAO):
@staticmethod
def find_last_success_log(
session: Optional[Session] = None,
report_schedule: ReportSchedule, session: Optional[Session] = None,
) -> Optional[ReportExecutionLog]:
"""
Finds last success execution log
Finds last success execution log for a given report
"""
session = session or db.session
return (
session.query(ReportExecutionLog)
.filter(ReportExecutionLog.state == ReportLogState.SUCCESS)
.filter(
ReportExecutionLog.state == ReportState.SUCCESS,
ReportExecutionLog.report_schedule == report_schedule,
)
.order_by(ReportExecutionLog.end_dttm.desc())
.first()
)

View File

@@ -77,6 +77,10 @@ grace_period_description = (
"Once an alert is triggered, how long, in seconds, before "
"Superset nags you again. (in seconds)"
)
working_timeout_description = (
"If an alert is staled at a working state, how long until it's state is reseted to"
" error"
)
def validate_crontab(value: Union[bytes, bytearray, str]) -> None:
@@ -85,7 +89,7 @@ def validate_crontab(value: Union[bytes, bytearray, str]) -> None:
class ValidatorConfigJSONSchema(Schema):
operation = fields.String(
op = fields.String( # pylint: disable=invalid-name
description=validator_config_json_op_description,
validate=validate.OneOf(choices=["<", "<=", ">", ">=", "==", "!="]),
)
@@ -155,7 +159,15 @@ class ReportSchedulePostSchema(Schema):
)
validator_config_json = fields.Nested(ValidatorConfigJSONSchema)
log_retention = fields.Integer(description=log_retention_description, example=90)
grace_period = fields.Integer(description=grace_period_description, example=14400)
grace_period = fields.Integer(
description=grace_period_description, example=60 * 60 * 4, default=60 * 60 * 4
)
working_timeout = fields.Integer(
description=working_timeout_description,
example=60 * 60 * 1,
default=60 * 60 * 1,
)
recipients = fields.List(fields.Nested(ReportRecipientSchema))
@@ -204,6 +216,9 @@ class ReportSchedulePutSchema(Schema):
description=log_retention_description, example=90, required=False
)
grace_period = fields.Integer(
description=grace_period_description, example=14400, required=False
description=grace_period_description, example=60 * 60 * 4, required=False
)
working_timeout = fields.Integer(
description=working_timeout_description, example=60 * 60 * 1, required=False
)
recipients = fields.List(fields.Nested(ReportRecipientSchema), required=False)

View File

@@ -50,6 +50,9 @@ def scheduler() -> None:
active_schedules = ReportScheduleDAO.find_active(session)
for active_schedule in active_schedules:
for schedule in cron_schedule_window(active_schedule.crontab):
logger.info(
"Scheduling alert %s eta: %s", active_schedule.name, schedule
)
execute.apply_async((active_schedule.id, schedule,), eta=schedule)

View File

@@ -36,7 +36,7 @@ from superset.models.reports import (
ReportExecutionLog,
ReportScheduleType,
ReportRecipientType,
ReportLogState,
ReportState,
)
from tests.base_tests import SupersetTestCase
@@ -70,7 +70,7 @@ class TestReportSchedulesApi(SupersetTestCase):
logs.append(
ReportExecutionLog(
scheduled_dttm=datetime(2020, 1, 1),
state=ReportLogState.ERROR,
state=ReportState.ERROR,
error_message=f"Error {cy}",
)
)
@@ -796,7 +796,7 @@ class TestReportSchedulesApi(SupersetTestCase):
self.login(username="admin")
arguments = {
"columns": ["name"],
"filters": [{"col": "state", "opr": "eq", "value": ReportLogState.SUCCESS}],
"filters": [{"col": "state", "opr": "eq", "value": ReportState.SUCCESS}],
}
uri = f"api/v1/report/{report_schedule.id}/log/?q={prison.dumps(arguments)}"
rv = self.get_assert_metric(uri, "get_list")
@@ -816,7 +816,7 @@ class TestReportSchedulesApi(SupersetTestCase):
.one_or_none()
)
data = {"state": ReportLogState.ERROR, "error_message": "New error changed"}
data = {"state": ReportState.ERROR, "error_message": "New error changed"}
self.login(username="admin")
uri = f"api/v1/report/{report_schedule.id}/log/"

View File

@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import json
from datetime import datetime
from datetime import datetime, timedelta
from typing import List, Optional
from unittest.mock import patch
@@ -29,12 +29,12 @@ from superset.models.core import Database
from superset.models.dashboard import Dashboard
from superset.models.reports import (
ReportExecutionLog,
ReportLogState,
ReportRecipients,
ReportRecipientType,
ReportSchedule,
ReportScheduleType,
ReportScheduleValidatorType,
ReportState,
)
from superset.models.slice import Slice
from superset.reports.commands.exceptions import (
@@ -43,6 +43,7 @@ from superset.reports.commands.exceptions import (
ReportScheduleNotFoundError,
ReportScheduleNotificationError,
ReportSchedulePreviousWorkingError,
ReportScheduleWorkingTimeoutError,
)
from superset.reports.commands.execute import AsyncExecuteReportScheduleCommand
from superset.utils.core import get_example_database
@@ -61,9 +62,16 @@ def get_target_from_report_schedule(report_schedule) -> List[str]:
def assert_log(state: str, error_message: Optional[str] = None):
db.session.commit()
logs = db.session.query(ReportExecutionLog).all()
assert len(logs) == 1
assert logs[0].error_message == error_message
assert logs[0].state == state
if state == ReportState.WORKING:
assert len(logs) == 1
assert logs[0].error_message == error_message
assert logs[0].state == state
return
assert len(logs) == 2
log_states = [log.state for log in logs]
assert ReportState.WORKING in log_states
assert state in log_states
assert error_message in [log.error_message for log in logs]
def create_report_notification(
@@ -107,6 +115,18 @@ def create_report_notification(
return report_schedule
def cleanup_report_schedule(report_schedule: ReportSchedule) -> None:
db.session.query(ReportExecutionLog).filter(
ReportExecutionLog.report_schedule == report_schedule
).delete()
db.session.query(ReportRecipients).filter(
ReportRecipients.report_schedule == report_schedule
).delete()
db.session.delete(report_schedule)
db.session.commit()
@pytest.yield_fixture()
def create_report_email_chart():
with app.app_context():
@@ -116,8 +136,7 @@ def create_report_email_chart():
)
yield report_schedule
db.session.delete(report_schedule)
db.session.commit()
cleanup_report_schedule(report_schedule)
@pytest.yield_fixture()
@@ -129,8 +148,7 @@ def create_report_email_dashboard():
)
yield report_schedule
db.session.delete(report_schedule)
db.session.commit()
cleanup_report_schedule(report_schedule)
@pytest.yield_fixture()
@@ -142,8 +160,7 @@ def create_report_slack_chart():
)
yield report_schedule
db.session.delete(report_schedule)
db.session.commit()
cleanup_report_schedule(report_schedule)
@pytest.yield_fixture()
@@ -153,12 +170,64 @@ def create_report_slack_chart_working():
report_schedule = create_report_notification(
slack_channel="slack_channel", chart=chart
)
report_schedule.last_state = ReportLogState.WORKING
report_schedule.last_state = ReportState.WORKING
report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0)
db.session.commit()
yield report_schedule
db.session.delete(report_schedule)
cleanup_report_schedule(report_schedule)
@pytest.yield_fixture()
def create_alert_slack_chart_success():
with app.app_context():
chart = db.session.query(Slice).first()
report_schedule = create_report_notification(
slack_channel="slack_channel",
chart=chart,
report_type=ReportScheduleType.ALERT,
)
report_schedule.last_state = ReportState.SUCCESS
report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0)
log = ReportExecutionLog(
report_schedule=report_schedule,
state=ReportState.SUCCESS,
start_dttm=report_schedule.last_eval_dttm,
end_dttm=report_schedule.last_eval_dttm,
scheduled_dttm=report_schedule.last_eval_dttm,
)
db.session.add(log)
db.session.commit()
yield report_schedule
cleanup_report_schedule(report_schedule)
@pytest.yield_fixture()
def create_alert_slack_chart_grace():
with app.app_context():
chart = db.session.query(Slice).first()
report_schedule = create_report_notification(
slack_channel="slack_channel",
chart=chart,
report_type=ReportScheduleType.ALERT,
)
report_schedule.last_state = ReportState.GRACE
report_schedule.last_eval_dttm = datetime(2020, 1, 1, 0, 0)
log = ReportExecutionLog(
report_schedule=report_schedule,
state=ReportState.SUCCESS,
start_dttm=report_schedule.last_eval_dttm,
end_dttm=report_schedule.last_eval_dttm,
scheduled_dttm=report_schedule.last_eval_dttm,
)
db.session.add(log)
db.session.commit()
yield report_schedule
cleanup_report_schedule(report_schedule)
@pytest.yield_fixture(
@@ -217,8 +286,7 @@ def create_alert_email_chart(request):
)
yield report_schedule
db.session.delete(report_schedule)
db.session.commit()
cleanup_report_schedule(report_schedule)
@contextmanager
@@ -291,8 +359,7 @@ def create_no_alert_email_chart(request):
)
yield report_schedule
db.session.delete(report_schedule)
db.session.commit()
cleanup_report_schedule(report_schedule)
@pytest.yield_fixture(params=["alert1", "alert2"])
@@ -327,17 +394,7 @@ def create_mul_alert_email_chart(request):
)
yield report_schedule
# needed for MySQL
logs = (
db.session.query(ReportExecutionLog)
.filter(ReportExecutionLog.report_schedule == report_schedule)
.all()
)
for log in logs:
db.session.delete(log)
db.session.commit()
db.session.delete(report_schedule)
db.session.commit()
cleanup_report_schedule(report_schedule)
@pytest.mark.usefixtures("create_report_email_chart")
@@ -367,7 +424,7 @@ def test_email_chart_report_schedule(
smtp_images = email_mock.call_args[1]["images"]
assert smtp_images[list(smtp_images.keys())[0]] == screenshot
# Assert logs are correct
assert_log(ReportLogState.SUCCESS)
assert_log(ReportState.SUCCESS)
@pytest.mark.usefixtures("create_report_email_dashboard")
@@ -397,7 +454,7 @@ def test_email_dashboard_report_schedule(
smtp_images = email_mock.call_args[1]["images"]
assert smtp_images[list(smtp_images.keys())[0]] == screenshot
# Assert logs are correct
assert_log(ReportLogState.SUCCESS)
assert_log(ReportState.SUCCESS)
@pytest.mark.usefixtures("create_report_slack_chart")
@@ -425,7 +482,7 @@ def test_slack_chart_report_schedule(
assert file_upload_mock.call_args[1]["file"] == screenshot
# Assert logs are correct
assert_log(ReportLogState.SUCCESS)
assert_log(ReportState.SUCCESS)
@pytest.mark.usefixtures("create_report_slack_chart")
@@ -444,15 +501,81 @@ def test_report_schedule_working(create_report_slack_chart_working):
ExecuteReport Command: Test report schedule still working
"""
# setup screenshot mock
with pytest.raises(ReportSchedulePreviousWorkingError):
with freeze_time("2020-01-01T00:00:00Z"):
with pytest.raises(ReportSchedulePreviousWorkingError):
AsyncExecuteReportScheduleCommand(
create_report_slack_chart_working.id, datetime.utcnow()
).run()
assert_log(
ReportState.WORKING,
error_message=ReportSchedulePreviousWorkingError.message,
)
assert create_report_slack_chart_working.last_state == ReportState.WORKING
@pytest.mark.usefixtures("create_report_slack_chart_working")
def test_report_schedule_working_timeout(create_report_slack_chart_working):
"""
ExecuteReport Command: Test report schedule still working but should timed out
"""
# setup screenshot mock
current_time = create_report_slack_chart_working.last_eval_dttm + timedelta(
seconds=create_report_slack_chart_working.working_timeout + 1
)
with freeze_time(current_time):
with pytest.raises(ReportScheduleWorkingTimeoutError):
AsyncExecuteReportScheduleCommand(
create_report_slack_chart_working.id, datetime.utcnow()
).run()
# Only needed for MySQL, understand why
db.session.commit()
logs = db.session.query(ReportExecutionLog).all()
assert len(logs) == 1
assert logs[0].error_message == ReportScheduleWorkingTimeoutError.message
assert logs[0].state == ReportState.ERROR
assert create_report_slack_chart_working.last_state == ReportState.ERROR
@pytest.mark.usefixtures("create_alert_slack_chart_success")
def test_report_schedule_success_grace(create_alert_slack_chart_success):
"""
ExecuteReport Command: Test report schedule on success to grace
"""
# set current time to within the grace period
current_time = create_alert_slack_chart_success.last_eval_dttm + timedelta(
seconds=create_alert_slack_chart_success.grace_period - 10
)
with freeze_time(current_time):
AsyncExecuteReportScheduleCommand(
create_report_slack_chart_working.id, datetime.utcnow()
create_alert_slack_chart_success.id, datetime.utcnow()
).run()
assert_log(
ReportLogState.ERROR, error_message=ReportSchedulePreviousWorkingError.message
db.session.commit()
assert create_alert_slack_chart_success.last_state == ReportState.GRACE
@pytest.mark.usefixtures("create_alert_slack_chart_grace")
def test_report_schedule_success_grace_end(create_alert_slack_chart_grace):
"""
ExecuteReport Command: Test report schedule on grace to noop
"""
# set current time to within the grace period
current_time = create_alert_slack_chart_grace.last_eval_dttm + timedelta(
seconds=create_alert_slack_chart_grace.grace_period + 1
)
assert create_report_slack_chart_working.last_state == ReportLogState.WORKING
with freeze_time(current_time):
AsyncExecuteReportScheduleCommand(
create_alert_slack_chart_grace.id, datetime.utcnow()
).run()
db.session.commit()
assert create_alert_slack_chart_grace.last_state == ReportState.NOOP
@pytest.mark.usefixtures("create_report_email_dashboard")
@@ -476,7 +599,7 @@ def test_email_dashboard_report_fails(
create_report_email_dashboard.id, datetime.utcnow()
).run()
assert_log(ReportLogState.ERROR, error_message="Could not connect to SMTP XPTO")
assert_log(ReportState.ERROR, error_message="Could not connect to SMTP XPTO")
@pytest.mark.usefixtures("create_alert_email_chart")
@@ -502,7 +625,7 @@ def test_slack_chart_alert(screenshot_mock, email_mock, create_alert_email_chart
smtp_images = email_mock.call_args[1]["images"]
assert smtp_images[list(smtp_images.keys())[0]] == screenshot
# Assert logs are correct
assert_log(ReportLogState.SUCCESS)
assert_log(ReportState.SUCCESS)
@pytest.mark.usefixtures("create_no_alert_email_chart")
@@ -514,7 +637,7 @@ def test_email_chart_no_alert(create_no_alert_email_chart):
AsyncExecuteReportScheduleCommand(
create_no_alert_email_chart.id, datetime.utcnow()
).run()
assert_log(ReportLogState.NOOP)
assert_log(ReportState.NOOP)
@pytest.mark.usefixtures("create_mul_alert_email_chart")