mirror of
https://github.com/apache/superset.git
synced 2026-04-19 16:14:52 +00:00
feat(reports): Set a minimum interval for each report's execution (#28176)
This commit is contained in:
@@ -195,6 +195,15 @@ Please refer to `ExecutorType` in the codebase for other executor types.
|
||||
its default value of `http://0.0.0.0:8080/`.
|
||||
|
||||
|
||||
It's also possible to specify a minimum interval between each report's execution through the config file:
|
||||
|
||||
``` python
|
||||
# Set a minimum interval threshold between executions (for each Alert/Report)
|
||||
# Value should be an integer
|
||||
ALERT_MINIMUM_INTERVAL = int(timedelta(minutes=10).total_seconds())
|
||||
REPORT_MINIMUM_INTERVAL = int(timedelta(minutes=5).total_seconds())
|
||||
```
|
||||
|
||||
## Custom Dockerfile
|
||||
|
||||
If you're running the dev version of a released Superset image, like `apache/superset:3.1.0-dev`, you should be set with the above.
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from croniter import croniter
|
||||
from flask import current_app
|
||||
from marshmallow import ValidationError
|
||||
|
||||
from superset.commands.base import BaseCommand
|
||||
@@ -26,11 +28,12 @@ from superset.commands.report.exceptions import (
|
||||
DashboardNotFoundValidationError,
|
||||
DashboardNotSavedValidationError,
|
||||
ReportScheduleEitherChartOrDashboardError,
|
||||
ReportScheduleFrequencyNotAllowed,
|
||||
ReportScheduleOnlyChartOrDashboardError,
|
||||
)
|
||||
from superset.daos.chart import ChartDAO
|
||||
from superset.daos.dashboard import DashboardDAO
|
||||
from superset.reports.models import ReportCreationMethod
|
||||
from superset.reports.models import ReportCreationMethod, ReportScheduleType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -76,3 +79,45 @@ class BaseReportScheduleCommand(BaseCommand):
|
||||
self._properties["dashboard"] = dashboard
|
||||
elif not update:
|
||||
exceptions.append(ReportScheduleEitherChartOrDashboardError())
|
||||
|
||||
def validate_report_frequency(
|
||||
self,
|
||||
cron_schedule: str,
|
||||
report_type: str,
|
||||
) -> None:
|
||||
"""
|
||||
Validates if the report scheduled frequency doesn't exceed a limit
|
||||
configured in `config.py`.
|
||||
|
||||
:param cron_schedule: The cron schedule configured.
|
||||
:param report_type: The report type (Alert/Report).
|
||||
"""
|
||||
config_key = (
|
||||
"ALERT_MINIMUM_INTERVAL"
|
||||
if report_type == ReportScheduleType.ALERT
|
||||
else "REPORT_MINIMUM_INTERVAL"
|
||||
)
|
||||
minimum_interval = current_app.config.get(config_key, 0)
|
||||
|
||||
if not isinstance(minimum_interval, int):
|
||||
logger.error(
|
||||
"Invalid value for %s: %s", config_key, minimum_interval, exc_info=True
|
||||
)
|
||||
return
|
||||
|
||||
# Since configuration is in minutes, we only need to validate
|
||||
# in case `minimum_interval` is <= 120 (2min)
|
||||
if minimum_interval < 120:
|
||||
return
|
||||
|
||||
iterations = 60 if minimum_interval <= 3660 else 24
|
||||
schedule = croniter(cron_schedule)
|
||||
current_exec = next(schedule)
|
||||
|
||||
for _ in range(iterations):
|
||||
next_exec = next(schedule)
|
||||
diff, current_exec = next_exec - current_exec, next_exec
|
||||
if int(diff) < minimum_interval:
|
||||
raise ReportScheduleFrequencyNotAllowed(
|
||||
report_type=report_type, minimum_interval=minimum_interval
|
||||
)
|
||||
|
||||
@@ -30,7 +30,6 @@ from superset.commands.report.exceptions import (
|
||||
ReportScheduleCreationMethodUniquenessValidationError,
|
||||
ReportScheduleInvalidError,
|
||||
ReportScheduleNameUniquenessValidationError,
|
||||
ReportScheduleRequiredTypeValidationError,
|
||||
)
|
||||
from superset.daos.database import DatabaseDAO
|
||||
from superset.daos.exceptions import DAOCreateFailedError
|
||||
@@ -58,38 +57,53 @@ class CreateReportScheduleCommand(CreateMixin, BaseReportScheduleCommand):
|
||||
raise ReportScheduleCreateFailedError() from ex
|
||||
|
||||
def validate(self) -> None:
|
||||
exceptions: list[ValidationError] = []
|
||||
owner_ids: Optional[list[int]] = self._properties.get("owners")
|
||||
name = self._properties.get("name", "")
|
||||
report_type = self._properties.get("type")
|
||||
creation_method = self._properties.get("creation_method")
|
||||
chart_id = self._properties.get("chart")
|
||||
dashboard_id = self._properties.get("dashboard")
|
||||
"""
|
||||
Validates the properties of a report schedule configuration, including uniqueness
|
||||
of name and type, relations based on the report type, frequency, etc. Populates
|
||||
a list of `ValidationErrors` to be returned in the API response if any.
|
||||
|
||||
# Validate type is required
|
||||
if not report_type:
|
||||
exceptions.append(ReportScheduleRequiredTypeValidationError())
|
||||
Fields were loaded according to the `ReportSchedulePostSchema` schema.
|
||||
"""
|
||||
# Required fields
|
||||
cron_schedule = self._properties["crontab"]
|
||||
name = self._properties["name"]
|
||||
report_type = self._properties["type"]
|
||||
|
||||
# Optional fields
|
||||
chart_id = self._properties.get("chart")
|
||||
creation_method = self._properties.get("creation_method")
|
||||
dashboard_id = self._properties.get("dashboard")
|
||||
owner_ids: Optional[list[int]] = self._properties.get("owners")
|
||||
|
||||
exceptions: list[ValidationError] = []
|
||||
|
||||
# Validate name type uniqueness
|
||||
if report_type and not ReportScheduleDAO.validate_update_uniqueness(
|
||||
name, report_type
|
||||
):
|
||||
if not ReportScheduleDAO.validate_update_uniqueness(name, report_type):
|
||||
exceptions.append(
|
||||
ReportScheduleNameUniquenessValidationError(
|
||||
report_type=report_type, name=name
|
||||
)
|
||||
)
|
||||
|
||||
# validate relation by report type
|
||||
# Validate if DB exists (for alerts)
|
||||
if report_type == ReportScheduleType.ALERT:
|
||||
database_id = self._properties.get("database")
|
||||
if not database_id:
|
||||
exceptions.append(ReportScheduleAlertRequiredDatabaseValidationError())
|
||||
else:
|
||||
database = DatabaseDAO.find_by_id(database_id)
|
||||
if not database:
|
||||
try:
|
||||
database_id = self._properties["database"]
|
||||
if database := DatabaseDAO.find_by_id(database_id):
|
||||
self._properties["database"] = database
|
||||
else:
|
||||
exceptions.append(DatabaseNotFoundValidationError())
|
||||
self._properties["database"] = database
|
||||
except KeyError:
|
||||
exceptions.append(ReportScheduleAlertRequiredDatabaseValidationError())
|
||||
|
||||
# validate report frequency
|
||||
try:
|
||||
self.validate_report_frequency(
|
||||
cron_schedule,
|
||||
report_type,
|
||||
)
|
||||
except ValidationError as exc:
|
||||
exceptions.append(exc)
|
||||
|
||||
# Validate chart or dashboard relations
|
||||
self.validate_chart_dashboard(exceptions)
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import math
|
||||
|
||||
from flask_babel import lazy_gettext as _
|
||||
|
||||
from superset.commands.exceptions import (
|
||||
@@ -93,6 +95,31 @@ class ReportScheduleEitherChartOrDashboardError(ValidationError):
|
||||
)
|
||||
|
||||
|
||||
class ReportScheduleFrequencyNotAllowed(ValidationError):
|
||||
"""
|
||||
Marshmallow validation error for report schedule configured to run more
|
||||
frequently than allowed
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
report_type: str = "Report",
|
||||
minimum_interval: int = 120,
|
||||
) -> None:
|
||||
interval_in_minutes = math.ceil(minimum_interval / 60)
|
||||
|
||||
super().__init__(
|
||||
_(
|
||||
"%(report_type)s schedule frequency exceeding limit."
|
||||
" Please configure a schedule with a minimum interval of"
|
||||
" %(minimum_interval)d minutes per execution.",
|
||||
report_type=report_type,
|
||||
minimum_interval=interval_in_minutes,
|
||||
),
|
||||
field_name="crontab",
|
||||
)
|
||||
|
||||
|
||||
class ChartNotSavedValidationError(ValidationError):
|
||||
"""
|
||||
Marshmallow validation error for charts that haven't been saved yet
|
||||
|
||||
@@ -59,17 +59,29 @@ class UpdateReportScheduleCommand(UpdateMixin, BaseReportScheduleCommand):
|
||||
return report_schedule
|
||||
|
||||
def validate(self) -> None:
|
||||
exceptions: list[ValidationError] = []
|
||||
owner_ids: Optional[list[int]] = self._properties.get("owners")
|
||||
report_type = self._properties.get("type", ReportScheduleType.ALERT)
|
||||
"""
|
||||
Validates the properties of a report schedule configuration, including uniqueness
|
||||
of name and type, relations based on the report type, frequency, etc. Populates
|
||||
a list of `ValidationErrors` to be returned in the API response if any.
|
||||
|
||||
name = self._properties.get("name", "")
|
||||
Fields were loaded according to the `ReportSchedulePutSchema` schema.
|
||||
"""
|
||||
# Load existing report schedule config
|
||||
self._model = ReportScheduleDAO.find_by_id(self._model_id)
|
||||
|
||||
# Does the report exist?
|
||||
if not self._model:
|
||||
raise ReportScheduleNotFoundError()
|
||||
|
||||
# Required fields for validation
|
||||
cron_schedule = self._properties.get("crontab", self._model.crontab)
|
||||
name = self._properties.get("name", self._model.name)
|
||||
report_type = self._properties.get("type", self._model.type)
|
||||
|
||||
# Optional fields
|
||||
database_id = self._properties.get("database")
|
||||
owner_ids: Optional[list[int]] = self._properties.get("owners")
|
||||
|
||||
exceptions: list[ValidationError] = []
|
||||
|
||||
# Change the state to not triggered when the user deactivates
|
||||
# A report that is currently in a working state. This prevents
|
||||
# an alert/report from being kept in a working state if activated back
|
||||
@@ -80,28 +92,31 @@ class UpdateReportScheduleCommand(UpdateMixin, BaseReportScheduleCommand):
|
||||
):
|
||||
self._properties["last_state"] = ReportState.NOOP
|
||||
|
||||
# validate relation by report type
|
||||
if not report_type:
|
||||
report_type = self._model.type
|
||||
|
||||
# Validate name type uniqueness
|
||||
if not ReportScheduleDAO.validate_update_uniqueness(
|
||||
name, report_type, expect_id=self._model_id
|
||||
):
|
||||
exceptions.append(
|
||||
ReportScheduleNameUniquenessValidationError(
|
||||
report_type=report_type, name=name
|
||||
# Validate name/type uniqueness if either is changing
|
||||
if name != self._model.name or report_type != self._model.type:
|
||||
if not ReportScheduleDAO.validate_update_uniqueness(
|
||||
name, report_type, expect_id=self._model_id
|
||||
):
|
||||
exceptions.append(
|
||||
ReportScheduleNameUniquenessValidationError(
|
||||
report_type=report_type, name=name
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
if report_type == ReportScheduleType.ALERT:
|
||||
database_id = self._properties.get("database")
|
||||
# If database_id was sent let's validate it exists
|
||||
if database_id:
|
||||
database = DatabaseDAO.find_by_id(database_id)
|
||||
if not database:
|
||||
exceptions.append(DatabaseNotFoundValidationError())
|
||||
self._properties["database"] = database
|
||||
# Validate if DB exists (for alerts)
|
||||
if report_type == ReportScheduleType.ALERT and database_id:
|
||||
if not (database := DatabaseDAO.find_by_id(database_id)):
|
||||
exceptions.append(DatabaseNotFoundValidationError())
|
||||
self._properties["database"] = database
|
||||
|
||||
# validate report frequency
|
||||
try:
|
||||
self.validate_report_frequency(
|
||||
cron_schedule,
|
||||
report_type,
|
||||
)
|
||||
except ValidationError as exc:
|
||||
exceptions.append(exc)
|
||||
|
||||
# Validate chart or dashboard relations
|
||||
self.validate_chart_dashboard(exceptions, update=True)
|
||||
|
||||
@@ -1328,6 +1328,10 @@ ALERT_REPORTS_QUERY_EXECUTION_MAX_TRIES = 1
|
||||
# Custom width for screenshots
|
||||
ALERT_REPORTS_MIN_CUSTOM_SCREENSHOT_WIDTH = 600
|
||||
ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH = 2400
|
||||
# Set a minimum interval threshold between executions (for each Alert/Report)
|
||||
# Value should be an integer i.e. int(timedelta(minutes=5).total_seconds())
|
||||
ALERT_MINIMUM_INTERVAL = int(timedelta(minutes=0).total_seconds())
|
||||
REPORT_MINIMUM_INTERVAL = int(timedelta(minutes=0).total_seconds())
|
||||
|
||||
# A custom prefix to use on all Alerts & Reports emails
|
||||
EMAIL_REPORTS_SUBJECT_PREFIX = "[Report] "
|
||||
|
||||
@@ -17,7 +17,8 @@
|
||||
# isort:skip_file
|
||||
"""Unit tests for Superset"""
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import patch
|
||||
import json
|
||||
|
||||
import pytz
|
||||
@@ -1259,6 +1260,220 @@ class TestReportSchedulesApi(SupersetTestCase):
|
||||
}
|
||||
assert rv.status_code == 400
|
||||
|
||||
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
|
||||
def test_create_report_schedule_valid_schedule(self):
|
||||
"""
|
||||
ReportSchedule API: Test create report schedule when a minimum
|
||||
interval is set in config.
|
||||
"""
|
||||
self.login(ADMIN_USERNAME)
|
||||
|
||||
chart = db.session.query(Slice).first()
|
||||
example_db = get_example_database()
|
||||
report_schedule_data = {
|
||||
"type": ReportScheduleType.ALERT,
|
||||
"name": "Alert with a valid frequency",
|
||||
"description": "description",
|
||||
"creation_method": "alerts_reports",
|
||||
"crontab": "5,10 9 * * *",
|
||||
"recipients": [
|
||||
{
|
||||
"type": ReportRecipientType.EMAIL,
|
||||
"recipient_config_json": {"target": "target@superset.org"},
|
||||
},
|
||||
{
|
||||
"type": ReportRecipientType.SLACK,
|
||||
"recipient_config_json": {"target": "channel"},
|
||||
},
|
||||
],
|
||||
"grace_period": 14400,
|
||||
"working_timeout": 3600,
|
||||
"chart": chart.id,
|
||||
"database": example_db.id,
|
||||
}
|
||||
with patch.dict(
|
||||
"superset.commands.report.base.current_app.config",
|
||||
{
|
||||
"ALERT_MINIMUM_INTERVAL": int(timedelta(minutes=2).total_seconds()),
|
||||
"REPORT_MINIMUM_INTERVAL": int(timedelta(minutes=5).total_seconds()),
|
||||
},
|
||||
):
|
||||
uri = "api/v1/report/"
|
||||
rv = self.post_assert_metric(uri, report_schedule_data, "post")
|
||||
assert rv.status_code == 201
|
||||
report_schedule_data["type"] = ReportScheduleType.REPORT
|
||||
report_schedule_data["name"] = "Report with a valid frequency"
|
||||
del report_schedule_data["database"]
|
||||
rv = self.post_assert_metric(uri, report_schedule_data, "post")
|
||||
assert rv.status_code == 201
|
||||
|
||||
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
|
||||
def test_create_report_schedule_invalid_schedule(self):
|
||||
"""
|
||||
ReportSchedule API: Test create report schedule when a minimum
|
||||
interval is set in config and the scheduled frequency exceeds it.
|
||||
"""
|
||||
self.login(ADMIN_USERNAME)
|
||||
|
||||
chart = db.session.query(Slice).first()
|
||||
example_db = get_example_database()
|
||||
report_schedule_data = {
|
||||
"type": ReportScheduleType.ALERT,
|
||||
"name": "Invalid Frequency",
|
||||
"description": "description",
|
||||
"creation_method": "alerts_reports",
|
||||
"crontab": "5,10 9 * * *",
|
||||
"recipients": [
|
||||
{
|
||||
"type": ReportRecipientType.EMAIL,
|
||||
"recipient_config_json": {"target": "target@superset.org"},
|
||||
},
|
||||
{
|
||||
"type": ReportRecipientType.SLACK,
|
||||
"recipient_config_json": {"target": "channel"},
|
||||
},
|
||||
],
|
||||
"grace_period": 14400,
|
||||
"working_timeout": 3600,
|
||||
"chart": chart.id,
|
||||
"database": example_db.id,
|
||||
}
|
||||
with patch.dict(
|
||||
"superset.commands.report.base.current_app.config",
|
||||
{
|
||||
"ALERT_MINIMUM_INTERVAL": int(timedelta(minutes=6).total_seconds()),
|
||||
"REPORT_MINIMUM_INTERVAL": int(timedelta(minutes=8).total_seconds()),
|
||||
},
|
||||
):
|
||||
uri = "api/v1/report/"
|
||||
rv = self.post_assert_metric(uri, report_schedule_data, "post")
|
||||
response = json.loads(rv.data.decode("utf-8"))
|
||||
assert response == {
|
||||
"message": {
|
||||
"crontab": (
|
||||
"Alert schedule frequency exceeding limit. "
|
||||
"Please configure a schedule with a minimum interval of 6 minutes per execution."
|
||||
)
|
||||
}
|
||||
}
|
||||
assert rv.status_code == 422
|
||||
report_schedule_data["type"] = ReportScheduleType.REPORT
|
||||
del report_schedule_data["database"]
|
||||
rv = self.post_assert_metric(uri, report_schedule_data, "post")
|
||||
response = json.loads(rv.data.decode("utf-8"))
|
||||
assert response == {
|
||||
"message": {
|
||||
"crontab": (
|
||||
"Report schedule frequency exceeding limit. "
|
||||
"Please configure a schedule with a minimum interval of 8 minutes per execution."
|
||||
)
|
||||
}
|
||||
}
|
||||
assert rv.status_code == 422
|
||||
|
||||
@pytest.mark.usefixtures("create_report_schedules")
|
||||
def test_update_report_schedule_valid_schedule(self) -> None:
|
||||
"""
|
||||
ReportSchedule API: Test update report schedule when a minimum
|
||||
interval is set in config.
|
||||
"""
|
||||
self.login(ADMIN_USERNAME)
|
||||
report_schedule = (
|
||||
db.session.query(ReportSchedule)
|
||||
.filter(ReportSchedule.name == "name2")
|
||||
.one_or_none()
|
||||
)
|
||||
assert report_schedule.type == ReportScheduleType.ALERT
|
||||
previous_cron = report_schedule.crontab
|
||||
update_payload = {
|
||||
"crontab": "5,10 * * * *",
|
||||
}
|
||||
with patch.dict(
|
||||
"superset.commands.report.base.current_app.config",
|
||||
{
|
||||
"ALERT_MINIMUM_INTERVAL": int(timedelta(minutes=5).total_seconds()),
|
||||
"REPORT_MINIMUM_INTERVAL": int(timedelta(minutes=3).total_seconds()),
|
||||
},
|
||||
):
|
||||
# Test alert minimum interval
|
||||
uri = f"api/v1/report/{report_schedule.id}"
|
||||
rv = self.put_assert_metric(uri, update_payload, "put")
|
||||
assert rv.status_code == 200
|
||||
|
||||
# Test report minimum interval
|
||||
update_payload["crontab"] = "5,8 * * * *"
|
||||
update_payload["type"] = ReportScheduleType.REPORT
|
||||
uri = f"api/v1/report/{report_schedule.id}"
|
||||
rv = self.put_assert_metric(uri, update_payload, "put")
|
||||
assert rv.status_code == 200
|
||||
|
||||
with patch.dict(
|
||||
"superset.commands.report.base.current_app.config",
|
||||
{
|
||||
"ALERT_MINIMUM_INTERVAL": 0,
|
||||
"REPORT_MINIMUM_INTERVAL": 0,
|
||||
},
|
||||
):
|
||||
# Undo changes
|
||||
update_payload["crontab"] = previous_cron
|
||||
update_payload["type"] = ReportScheduleType.ALERT
|
||||
uri = f"api/v1/report/{report_schedule.id}"
|
||||
rv = self.put_assert_metric(uri, update_payload, "put")
|
||||
assert rv.status_code == 200
|
||||
|
||||
@pytest.mark.usefixtures("create_report_schedules")
|
||||
def test_update_report_schedule_invalid_schedule(self) -> None:
|
||||
"""
|
||||
ReportSchedule API: Test update report schedule when a minimum
|
||||
interval is set in config and the scheduled frequency exceeds it.
|
||||
"""
|
||||
self.login(ADMIN_USERNAME)
|
||||
report_schedule = (
|
||||
db.session.query(ReportSchedule)
|
||||
.filter(ReportSchedule.name == "name2")
|
||||
.one_or_none()
|
||||
)
|
||||
assert report_schedule.type == ReportScheduleType.ALERT
|
||||
update_payload = {
|
||||
"crontab": "5,10 * * * *",
|
||||
}
|
||||
with patch.dict(
|
||||
"superset.commands.report.base.current_app.config",
|
||||
{
|
||||
"ALERT_MINIMUM_INTERVAL": int(timedelta(minutes=6).total_seconds()),
|
||||
"REPORT_MINIMUM_INTERVAL": int(timedelta(minutes=4).total_seconds()),
|
||||
},
|
||||
):
|
||||
# Exceed alert minimum interval
|
||||
uri = f"api/v1/report/{report_schedule.id}"
|
||||
rv = self.put_assert_metric(uri, update_payload, "put")
|
||||
assert rv.status_code == 422
|
||||
response = json.loads(rv.data.decode("utf-8"))
|
||||
assert response == {
|
||||
"message": {
|
||||
"crontab": (
|
||||
"Alert schedule frequency exceeding limit. "
|
||||
"Please configure a schedule with a minimum interval of 6 minutes per execution."
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
# Exceed report minimum interval
|
||||
update_payload["crontab"] = "5,8 * * * *"
|
||||
update_payload["type"] = ReportScheduleType.REPORT
|
||||
uri = f"api/v1/report/{report_schedule.id}"
|
||||
rv = self.put_assert_metric(uri, update_payload, "put")
|
||||
assert rv.status_code == 422
|
||||
response = json.loads(rv.data.decode("utf-8"))
|
||||
assert response == {
|
||||
"message": {
|
||||
"crontab": (
|
||||
"Report schedule frequency exceeding limit. "
|
||||
"Please configure a schedule with a minimum interval of 4 minutes per execution."
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@pytest.mark.usefixtures("create_report_schedules")
|
||||
def test_update_report_schedule(self):
|
||||
"""
|
||||
|
||||
253
tests/unit_tests/commands/report/base_test.py
Normal file
253
tests/unit_tests/commands/report/base_test.py
Normal file
@@ -0,0 +1,253 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
from datetime import timedelta
|
||||
from functools import wraps
|
||||
from typing import Any, Callable
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from superset.commands.report.base import BaseReportScheduleCommand
|
||||
from superset.commands.report.exceptions import ReportScheduleFrequencyNotAllowed
|
||||
from superset.reports.models import ReportScheduleType
|
||||
|
||||
REPORT_TYPES = {
|
||||
ReportScheduleType.ALERT,
|
||||
ReportScheduleType.REPORT,
|
||||
}
|
||||
|
||||
TEST_SCHEDULES_EVERY_MINUTE = {
|
||||
"* * * * *",
|
||||
"1-5 * * * *",
|
||||
"10-20 * * * *",
|
||||
"0,45,10-20 * * * *",
|
||||
"23,45,50,51 * * * *",
|
||||
"10,20,30,40-45 * * * *",
|
||||
}
|
||||
|
||||
TEST_SCHEDULES_SINGLE_MINUTES = {
|
||||
"1,5,8,10,12 * * * *",
|
||||
"10 1 * * *",
|
||||
"27,2 1-5 * * *",
|
||||
}
|
||||
|
||||
TEST_SCHEDULES = TEST_SCHEDULES_EVERY_MINUTE.union(TEST_SCHEDULES_SINGLE_MINUTES)
|
||||
|
||||
|
||||
def app_custom_config(
|
||||
alert_minimum_interval: int | str = 0,
|
||||
report_minimum_interval: int | str = 0,
|
||||
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
|
||||
"""
|
||||
Decorator to mock the current_app.config values dynamically for each test.
|
||||
|
||||
:param alert_minimum_interval: Minimum interval. Defaults to None.
|
||||
:param report_minimum_interval: Minimum interval. Defaults to None.
|
||||
|
||||
:returns: A decorator that wraps a function.
|
||||
"""
|
||||
|
||||
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||
@wraps(func)
|
||||
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||
with patch(
|
||||
"superset.commands.report.base.current_app.config"
|
||||
) as mock_config:
|
||||
mock_config.get.side_effect = lambda key, default=0: {
|
||||
"ALERT_MINIMUM_INTERVAL": alert_minimum_interval,
|
||||
"REPORT_MINIMUM_INTERVAL": report_minimum_interval,
|
||||
}.get(key, default)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@pytest.mark.parametrize("report_type", REPORT_TYPES)
|
||||
@pytest.mark.parametrize("schedule", TEST_SCHEDULES)
|
||||
@app_custom_config()
|
||||
def test_validate_report_frequency(report_type: str, schedule: str) -> None:
|
||||
"""
|
||||
Test the ``validate_report_frequency`` method when there's
|
||||
no minimum frequency configured.
|
||||
"""
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
schedule,
|
||||
report_type,
|
||||
)
|
||||
|
||||
|
||||
@app_custom_config(
|
||||
alert_minimum_interval=int(timedelta(minutes=4).total_seconds()),
|
||||
report_minimum_interval=int(timedelta(minutes=5).total_seconds()),
|
||||
)
|
||||
def test_validate_report_frequency_minimum_set() -> None:
|
||||
"""
|
||||
Test the ``validate_report_frequency`` method when there's
|
||||
minimum frequencies configured.
|
||||
"""
|
||||
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
"1,5 * * * *",
|
||||
ReportScheduleType.ALERT,
|
||||
)
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
"6,11 * * * *",
|
||||
ReportScheduleType.REPORT,
|
||||
)
|
||||
|
||||
|
||||
@app_custom_config(
|
||||
alert_minimum_interval=int(timedelta(minutes=2).total_seconds()),
|
||||
report_minimum_interval=int(timedelta(minutes=5).total_seconds()),
|
||||
)
|
||||
def test_validate_report_frequency_invalid_schedule() -> None:
|
||||
"""
|
||||
Test the ``validate_report_frequency`` method when the configured
|
||||
schedule exceeds the limit.
|
||||
"""
|
||||
with pytest.raises(ReportScheduleFrequencyNotAllowed):
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
"1,2 * * * *",
|
||||
ReportScheduleType.ALERT,
|
||||
)
|
||||
|
||||
with pytest.raises(ReportScheduleFrequencyNotAllowed):
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
"1,5 * * * *",
|
||||
ReportScheduleType.REPORT,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("schedule", TEST_SCHEDULES)
|
||||
@app_custom_config(
|
||||
alert_minimum_interval=int(timedelta(minutes=10).total_seconds()),
|
||||
)
|
||||
def test_validate_report_frequency_alert_only(schedule: str) -> None:
|
||||
"""
|
||||
Test the ``validate_report_frequency`` method when there's
|
||||
only a configuration for alerts and user is creating report.
|
||||
"""
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
schedule,
|
||||
ReportScheduleType.REPORT,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("schedule", TEST_SCHEDULES)
|
||||
@app_custom_config(
|
||||
report_minimum_interval=int(timedelta(minutes=10).total_seconds()),
|
||||
)
|
||||
def test_validate_report_frequency_report_only(schedule: str) -> None:
|
||||
"""
|
||||
Test the ``validate_report_frequency`` method when there's
|
||||
only a configuration for reports and user is creating alert.
|
||||
"""
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
schedule,
|
||||
ReportScheduleType.ALERT,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("report_type", REPORT_TYPES)
|
||||
@pytest.mark.parametrize("schedule", TEST_SCHEDULES)
|
||||
@app_custom_config(
|
||||
alert_minimum_interval=int(timedelta(minutes=1).total_seconds()),
|
||||
report_minimum_interval=int(timedelta(minutes=1).total_seconds()),
|
||||
)
|
||||
def test_validate_report_frequency_accepts_every_minute_with_one(
|
||||
report_type: str, schedule: str
|
||||
) -> None:
|
||||
"""
|
||||
Test the ``validate_report_frequency`` method when configuration
|
||||
is set to `1`. Validates the usage of `-` and `*` in the cron.
|
||||
"""
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
schedule,
|
||||
report_type,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("report_type", REPORT_TYPES)
|
||||
@pytest.mark.parametrize("schedule", TEST_SCHEDULES_SINGLE_MINUTES)
|
||||
@app_custom_config(
|
||||
alert_minimum_interval=int(timedelta(minutes=2).total_seconds()),
|
||||
report_minimum_interval=int(timedelta(minutes=2).total_seconds()),
|
||||
)
|
||||
def test_validate_report_frequency_accepts_every_minute_with_two(
|
||||
report_type: str,
|
||||
schedule: str,
|
||||
) -> None:
|
||||
"""
|
||||
Test the ``validate_report_frequency`` method when configuration
|
||||
is set to `2`.
|
||||
"""
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
schedule,
|
||||
report_type,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("report_type", REPORT_TYPES)
|
||||
@pytest.mark.parametrize("schedule", TEST_SCHEDULES_EVERY_MINUTE)
|
||||
@app_custom_config(
|
||||
alert_minimum_interval=int(timedelta(minutes=2).total_seconds()),
|
||||
report_minimum_interval=int(timedelta(minutes=2).total_seconds()),
|
||||
)
|
||||
def test_validate_report_frequency_accepts_every_minute_with_two_raises(
|
||||
report_type: str,
|
||||
schedule: str,
|
||||
) -> None:
|
||||
"""
|
||||
Test the ``validate_report_frequency`` method when configuration
|
||||
is set to `2`. Validates the usage of `-` and `*` in the cron.
|
||||
"""
|
||||
# Should fail for schedules with `-` and `*`
|
||||
with pytest.raises(ReportScheduleFrequencyNotAllowed):
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
schedule,
|
||||
report_type,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("report_type", REPORT_TYPES)
|
||||
@pytest.mark.parametrize("schedule", TEST_SCHEDULES)
|
||||
@app_custom_config(
|
||||
alert_minimum_interval="10 minutes",
|
||||
report_minimum_interval="10 minutes",
|
||||
)
|
||||
def test_validate_report_frequency_invalid_config(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
report_type: str,
|
||||
schedule: str,
|
||||
) -> None:
|
||||
"""
|
||||
Test the ``validate_report_frequency`` method when the configuration
|
||||
is invalid.
|
||||
"""
|
||||
caplog.set_level(logging.ERROR)
|
||||
BaseReportScheduleCommand().validate_report_frequency(
|
||||
schedule,
|
||||
report_type,
|
||||
)
|
||||
expected_error_message = (
|
||||
f"invalid value for {report_type}_MINIMUM_INTERVAL: 10 minutes"
|
||||
)
|
||||
assert expected_error_message.lower() in caplog.text.lower()
|
||||
Reference in New Issue
Block a user