mirror of
https://github.com/apache/superset.git
synced 2026-04-18 23:55:00 +00:00
* fix: thumbnails and reports will be use WEBDRIVER_WINDOW option * changes reformatted * config change reverted. thumbnails sizes changed to original * typo fix * bugfix defining defaults in thumbnails.py caused thumbnail caches invalidated. they moved to init. Co-authored-by: Ibrahim Ercan <ibrahim.ercan@vlmedia.com.tr>
448 lines
15 KiB
Python
448 lines
15 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, List, Optional
|
|
|
|
from flask_appbuilder.security.sqla.models import User
|
|
from sqlalchemy.orm import Session
|
|
|
|
from superset import app, thumbnail_cache
|
|
from superset.commands.base import BaseCommand
|
|
from superset.commands.exceptions import CommandException
|
|
from superset.models.reports import (
|
|
ReportExecutionLog,
|
|
ReportSchedule,
|
|
ReportScheduleType,
|
|
ReportState,
|
|
)
|
|
from superset.reports.commands.alert import AlertCommand
|
|
from superset.reports.commands.exceptions import (
|
|
ReportScheduleAlertEndGracePeriodError,
|
|
ReportScheduleAlertGracePeriodError,
|
|
ReportScheduleExecuteUnexpectedError,
|
|
ReportScheduleNotFoundError,
|
|
ReportScheduleNotificationError,
|
|
ReportSchedulePreviousWorkingError,
|
|
ReportScheduleScreenshotFailedError,
|
|
ReportScheduleSelleniumUserNotFoundError,
|
|
ReportScheduleStateNotFoundError,
|
|
ReportScheduleUnexpectedError,
|
|
ReportScheduleWorkingTimeoutError,
|
|
)
|
|
from superset.reports.dao import (
|
|
REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
|
|
ReportScheduleDAO,
|
|
)
|
|
from superset.reports.notifications import create_notification
|
|
from superset.reports.notifications.base import NotificationContent, ScreenshotData
|
|
from superset.reports.notifications.exceptions import NotificationError
|
|
from superset.utils.celery import session_scope
|
|
from superset.utils.screenshots import (
|
|
BaseScreenshot,
|
|
ChartScreenshot,
|
|
DashboardScreenshot,
|
|
)
|
|
from superset.utils.urls import get_url_path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BaseReportState:
|
|
current_states: List[ReportState] = []
|
|
initial: bool = False
|
|
|
|
def __init__(
|
|
self,
|
|
session: Session,
|
|
report_schedule: ReportSchedule,
|
|
scheduled_dttm: datetime,
|
|
) -> None:
|
|
self._session = session
|
|
self._report_schedule = report_schedule
|
|
self._scheduled_dttm = scheduled_dttm
|
|
self._start_dttm = datetime.utcnow()
|
|
|
|
def set_state_and_log(
|
|
self, state: ReportState, error_message: Optional[str] = None,
|
|
) -> None:
|
|
"""
|
|
Updates current ReportSchedule state and TS. If on final state writes the log
|
|
for this execution
|
|
"""
|
|
now_dttm = datetime.utcnow()
|
|
self.set_state(state, now_dttm)
|
|
self.create_log(
|
|
state, error_message=error_message,
|
|
)
|
|
|
|
def set_state(self, state: ReportState, dttm: datetime) -> None:
|
|
"""
|
|
Set the current report schedule state, on this case we want to
|
|
commit immediately
|
|
"""
|
|
self._report_schedule.last_state = state
|
|
self._report_schedule.last_eval_dttm = dttm
|
|
self._session.merge(self._report_schedule)
|
|
self._session.commit()
|
|
|
|
def create_log( # pylint: disable=too-many-arguments
|
|
self, state: ReportState, error_message: Optional[str] = None,
|
|
) -> None:
|
|
"""
|
|
Creates a Report execution log, uses the current computed last_value for Alerts
|
|
"""
|
|
log = ReportExecutionLog(
|
|
scheduled_dttm=self._scheduled_dttm,
|
|
start_dttm=self._start_dttm,
|
|
end_dttm=datetime.utcnow(),
|
|
value=self._report_schedule.last_value,
|
|
value_row_json=self._report_schedule.last_value_row_json,
|
|
state=state,
|
|
error_message=error_message,
|
|
report_schedule=self._report_schedule,
|
|
)
|
|
self._session.add(log)
|
|
self._session.commit()
|
|
|
|
def _get_url(self, user_friendly: bool = False, **kwargs: Any) -> str:
|
|
"""
|
|
Get the url for this report schedule: chart or dashboard
|
|
"""
|
|
if self._report_schedule.chart:
|
|
return get_url_path(
|
|
"Superset.slice",
|
|
user_friendly=user_friendly,
|
|
slice_id=self._report_schedule.chart_id,
|
|
**kwargs,
|
|
)
|
|
return get_url_path(
|
|
"Superset.dashboard",
|
|
user_friendly=user_friendly,
|
|
dashboard_id_or_slug=self._report_schedule.dashboard_id,
|
|
**kwargs,
|
|
)
|
|
|
|
def _get_screenshot_user(self) -> User:
|
|
user = (
|
|
self._session.query(User)
|
|
.filter(User.username == app.config["THUMBNAIL_SELENIUM_USER"])
|
|
.one_or_none()
|
|
)
|
|
if not user:
|
|
raise ReportScheduleSelleniumUserNotFoundError()
|
|
return user
|
|
|
|
def _get_screenshot(self) -> ScreenshotData:
|
|
"""
|
|
Get a chart or dashboard screenshot
|
|
|
|
:raises: ReportScheduleScreenshotFailedError
|
|
"""
|
|
screenshot: Optional[BaseScreenshot] = None
|
|
if self._report_schedule.chart:
|
|
url = self._get_url(standalone="true")
|
|
screenshot = ChartScreenshot(
|
|
url,
|
|
self._report_schedule.chart.digest,
|
|
window_size=app.config["WEBDRIVER_WINDOW"]["slice"],
|
|
thumb_size=app.config["WEBDRIVER_WINDOW"]["slice"],
|
|
)
|
|
else:
|
|
url = self._get_url()
|
|
screenshot = DashboardScreenshot(
|
|
url,
|
|
self._report_schedule.dashboard.digest,
|
|
window_size=app.config["WEBDRIVER_WINDOW"]["dashboard"],
|
|
thumb_size=app.config["WEBDRIVER_WINDOW"]["dashboard"],
|
|
)
|
|
image_url = self._get_url(user_friendly=True)
|
|
user = self._get_screenshot_user()
|
|
image_data = screenshot.compute_and_cache(
|
|
user=user, cache=thumbnail_cache, force=True,
|
|
)
|
|
if not image_data:
|
|
raise ReportScheduleScreenshotFailedError()
|
|
return ScreenshotData(url=image_url, image=image_data)
|
|
|
|
def _get_notification_content(self) -> NotificationContent:
|
|
"""
|
|
Gets a notification content, this is composed by a title and a screenshot
|
|
|
|
:raises: ReportScheduleScreenshotFailedError
|
|
"""
|
|
screenshot_data = self._get_screenshot()
|
|
if self._report_schedule.chart:
|
|
name = (
|
|
f"{self._report_schedule.name}: "
|
|
f"{self._report_schedule.chart.slice_name}"
|
|
)
|
|
else:
|
|
name = (
|
|
f"{self._report_schedule.name}: "
|
|
f"{self._report_schedule.dashboard.dashboard_title}"
|
|
)
|
|
return NotificationContent(name=name, screenshot=screenshot_data)
|
|
|
|
def _send(self, notification_content: NotificationContent) -> None:
|
|
"""
|
|
Sends a notification to all recipients
|
|
|
|
:raises: ReportScheduleNotificationError
|
|
"""
|
|
notification_errors = []
|
|
for recipient in self._report_schedule.recipients:
|
|
notification = create_notification(recipient, notification_content)
|
|
try:
|
|
notification.send()
|
|
except NotificationError as ex:
|
|
# collect notification errors but keep processing them
|
|
notification_errors.append(str(ex))
|
|
if notification_errors:
|
|
raise ReportScheduleNotificationError(";".join(notification_errors))
|
|
|
|
def send(self) -> None:
|
|
"""
|
|
Creates the notification content and sends them to all recipients
|
|
|
|
:raises: ReportScheduleNotificationError
|
|
"""
|
|
notification_content = self._get_notification_content()
|
|
self._send(notification_content)
|
|
|
|
def send_error(self, name: str, message: str) -> None:
|
|
"""
|
|
Creates and sends a notification for an error, to all recipients
|
|
|
|
:raises: ReportScheduleNotificationError
|
|
"""
|
|
notification_content = NotificationContent(name=name, text=message)
|
|
self._send(notification_content)
|
|
|
|
def is_in_grace_period(self) -> bool:
|
|
"""
|
|
Checks if an alert is on it's grace period
|
|
"""
|
|
last_success = ReportScheduleDAO.find_last_success_log(
|
|
self._report_schedule, session=self._session
|
|
)
|
|
return (
|
|
last_success is not None
|
|
and self._report_schedule.grace_period
|
|
and datetime.utcnow()
|
|
- timedelta(seconds=self._report_schedule.grace_period)
|
|
< last_success.end_dttm
|
|
)
|
|
|
|
def is_in_error_grace_period(self) -> bool:
|
|
"""
|
|
Checks if an alert/report on error is on it's notification grace period
|
|
"""
|
|
last_success = ReportScheduleDAO.find_last_error_notification(
|
|
self._report_schedule, session=self._session
|
|
)
|
|
if not last_success:
|
|
return False
|
|
return (
|
|
last_success is not None
|
|
and self._report_schedule.grace_period
|
|
and datetime.utcnow()
|
|
- timedelta(seconds=self._report_schedule.grace_period)
|
|
< last_success.end_dttm
|
|
)
|
|
|
|
def is_on_working_timeout(self) -> bool:
|
|
"""
|
|
Checks if an alert is on a working timeout
|
|
"""
|
|
return (
|
|
self._report_schedule.working_timeout is not None
|
|
and self._report_schedule.last_eval_dttm is not None
|
|
and datetime.utcnow()
|
|
- timedelta(seconds=self._report_schedule.working_timeout)
|
|
> self._report_schedule.last_eval_dttm
|
|
)
|
|
|
|
def next(self) -> None:
|
|
raise NotImplementedError()
|
|
|
|
|
|
class ReportNotTriggeredErrorState(BaseReportState):
|
|
"""
|
|
Handle Not triggered and Error state
|
|
next final states:
|
|
- Not Triggered
|
|
- Success
|
|
- Error
|
|
"""
|
|
|
|
current_states = [ReportState.NOOP, ReportState.ERROR]
|
|
initial = True
|
|
|
|
def next(self) -> None:
|
|
self.set_state_and_log(ReportState.WORKING)
|
|
try:
|
|
# If it's an alert check if the alert is triggered
|
|
if self._report_schedule.type == ReportScheduleType.ALERT:
|
|
if not AlertCommand(self._report_schedule).run():
|
|
self.set_state_and_log(ReportState.NOOP)
|
|
return
|
|
self.send()
|
|
self.set_state_and_log(ReportState.SUCCESS)
|
|
except CommandException as first_ex:
|
|
self.set_state_and_log(ReportState.ERROR, error_message=str(first_ex))
|
|
# TODO (dpgaspar) convert this logic to a new state eg: ERROR_ON_GRACE
|
|
if not self.is_in_error_grace_period():
|
|
try:
|
|
self.send_error(
|
|
f"Error occurred for {self._report_schedule.type}:"
|
|
f" {self._report_schedule.name}",
|
|
str(first_ex),
|
|
)
|
|
self.set_state_and_log(
|
|
ReportState.ERROR,
|
|
error_message=REPORT_SCHEDULE_ERROR_NOTIFICATION_MARKER,
|
|
)
|
|
except CommandException as second_ex:
|
|
self.set_state_and_log(
|
|
ReportState.ERROR, error_message=str(second_ex)
|
|
)
|
|
raise first_ex
|
|
|
|
|
|
class ReportWorkingState(BaseReportState):
|
|
"""
|
|
Handle Working state
|
|
next states:
|
|
- Error
|
|
- Working
|
|
"""
|
|
|
|
current_states = [ReportState.WORKING]
|
|
|
|
def next(self) -> None:
|
|
if self.is_on_working_timeout():
|
|
exception_timeout = ReportScheduleWorkingTimeoutError()
|
|
self.set_state_and_log(
|
|
ReportState.ERROR, error_message=str(exception_timeout),
|
|
)
|
|
raise exception_timeout
|
|
exception_working = ReportSchedulePreviousWorkingError()
|
|
self.set_state_and_log(
|
|
ReportState.WORKING, error_message=str(exception_working),
|
|
)
|
|
raise exception_working
|
|
|
|
|
|
class ReportSuccessState(BaseReportState):
|
|
"""
|
|
Handle Success, Grace state
|
|
next states:
|
|
- Grace
|
|
- Not triggered
|
|
- Success
|
|
"""
|
|
|
|
current_states = [ReportState.SUCCESS, ReportState.GRACE]
|
|
|
|
def next(self) -> None:
|
|
self.set_state_and_log(ReportState.WORKING)
|
|
if self._report_schedule.type == ReportScheduleType.ALERT:
|
|
if self.is_in_grace_period():
|
|
self.set_state_and_log(
|
|
ReportState.GRACE,
|
|
error_message=str(ReportScheduleAlertGracePeriodError()),
|
|
)
|
|
return
|
|
self.set_state_and_log(
|
|
ReportState.NOOP,
|
|
error_message=str(ReportScheduleAlertEndGracePeriodError()),
|
|
)
|
|
return
|
|
try:
|
|
self.send()
|
|
self.set_state_and_log(ReportState.SUCCESS)
|
|
except CommandException as ex:
|
|
self.set_state_and_log(ReportState.ERROR, error_message=str(ex))
|
|
|
|
|
|
class ReportScheduleStateMachine: # pylint: disable=too-few-public-methods
|
|
"""
|
|
Simple state machine for Alerts/Reports states
|
|
"""
|
|
|
|
states_cls = [ReportWorkingState, ReportNotTriggeredErrorState, ReportSuccessState]
|
|
|
|
def __init__(
|
|
self,
|
|
session: Session,
|
|
report_schedule: ReportSchedule,
|
|
scheduled_dttm: datetime,
|
|
):
|
|
self._session = session
|
|
self._report_schedule = report_schedule
|
|
self._scheduled_dttm = scheduled_dttm
|
|
|
|
def run(self) -> None:
|
|
state_found = False
|
|
for state_cls in self.states_cls:
|
|
if (self._report_schedule.last_state is None and state_cls.initial) or (
|
|
self._report_schedule.last_state in state_cls.current_states
|
|
):
|
|
state_cls(
|
|
self._session, self._report_schedule, self._scheduled_dttm
|
|
).next()
|
|
state_found = True
|
|
break
|
|
if not state_found:
|
|
raise ReportScheduleStateNotFoundError()
|
|
|
|
|
|
class AsyncExecuteReportScheduleCommand(BaseCommand):
|
|
"""
|
|
Execute all types of report schedules.
|
|
- On reports takes chart or dashboard screenshots and sends configured notifications
|
|
- On Alerts uses related Command AlertCommand and sends configured notifications
|
|
"""
|
|
|
|
def __init__(self, model_id: int, scheduled_dttm: datetime):
|
|
self._model_id = model_id
|
|
self._model: Optional[ReportSchedule] = None
|
|
self._scheduled_dttm = scheduled_dttm
|
|
|
|
def run(self) -> None:
|
|
with session_scope(nullpool=True) as session:
|
|
try:
|
|
self.validate(session=session)
|
|
if not self._model:
|
|
raise ReportScheduleExecuteUnexpectedError()
|
|
ReportScheduleStateMachine(
|
|
session, self._model, self._scheduled_dttm
|
|
).run()
|
|
except CommandException as ex:
|
|
raise ex
|
|
except Exception as ex:
|
|
raise ReportScheduleUnexpectedError(str(ex))
|
|
|
|
def validate( # pylint: disable=arguments-differ
|
|
self, session: Session = None
|
|
) -> None:
|
|
# Validate/populate model exists
|
|
self._model = ReportScheduleDAO.find_by_id(self._model_id, session=session)
|
|
if not self._model:
|
|
raise ReportScheduleNotFoundError()
|