diff --git a/superset/commands/logs/prune.py b/superset/commands/logs/prune.py index 9ad031ea516..21cfec8316a 100644 --- a/superset/commands/logs/prune.py +++ b/superset/commands/logs/prune.py @@ -39,13 +39,21 @@ class LogPruneCommand(BaseCommand): Attributes: retention_period_days (int): The number of days for which records should be retained. Records older than this period will be deleted. + max_rows_per_run (int | None): The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected + deterministically from the oldest first (by timestamp then id) + up to this limit in this execution. """ # noqa: E501 - def __init__(self, retention_period_days: int): + def __init__(self, retention_period_days: int, max_rows_per_run: int | None = None): """ :param retention_period_days: Number of days to keep in the logs table - """ + :param max_rows_per_run: The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected deterministically from the + oldest first (by timestamp then id) up to this limit in this execution. + """ # noqa: E501 self.retention_period_days = retention_period_days + self.max_rows_per_run = max_rows_per_run def run(self) -> None: """ @@ -56,17 +64,19 @@ class LogPruneCommand(BaseCommand): start_time = time.time() # Select all IDs that need to be deleted - ids_to_delete = ( - db.session.execute( - sa.select(Log.id).where( - Log.dttm - < datetime.now() - timedelta(days=self.retention_period_days) - ) - ) - .scalars() - .all() + select_stmt = sa.select(Log.id).where( + Log.dttm < datetime.now() - timedelta(days=self.retention_period_days) ) + # Optionally limited by max_rows_per_run + # order by oldest first for deterministic deletion + if self.max_rows_per_run is not None and self.max_rows_per_run > 0: + select_stmt = select_stmt.order_by(Log.dttm.asc(), Log.id.asc()).limit( + self.max_rows_per_run + ) + + ids_to_delete = db.session.execute(select_stmt).scalars().all() + total_rows = len(ids_to_delete) logger.info("Total rows to be deleted: %s", f"{total_rows:,}") diff --git a/superset/config.py b/superset/config.py index 2d08a952deb..14077057102 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1215,7 +1215,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods # "prune_logs": { # "task": "prune_logs", # "schedule": crontab(minute="*", hour="*"), - # "kwargs": {"retention_period_days": 180}, + # "kwargs": {"retention_period_days": 180, "max_rows_per_run": 10000}, # }, # Uncomment to enable Slack channel cache warm-up # "slack.cache_channels": { diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index da9765a8d6e..9f439166c26 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -177,7 +177,10 @@ def prune_query( @celery_app.task(name="prune_logs", bind=True) def prune_logs( - self: Task, retention_period_days: int | None = None, **kwargs: Any + self: Task, + retention_period_days: int | None = None, + max_rows_per_run: int | None = None, + **kwargs: Any, ) -> None: stats_logger: BaseStatsLogger = current_app.config["STATS_LOGGER"] stats_logger.incr("prune_logs") @@ -193,6 +196,6 @@ def prune_logs( ) try: - LogPruneCommand(retention_period_days).run() + LogPruneCommand(retention_period_days, max_rows_per_run).run() except CommandException as ex: logger.exception("An error occurred while pruning logs: %s", ex)