mirror of
https://github.com/apache/superset.git
synced 2026-06-07 16:49:17 +00:00
feat(prune_logs): add optional max_rows_per_run param (#36313)
This commit is contained in:
@@ -39,13 +39,21 @@ class LogPruneCommand(BaseCommand):
|
||||
Attributes:
|
||||
retention_period_days (int): The number of days for which records should be retained.
|
||||
Records older than this period will be deleted.
|
||||
max_rows_per_run (int | None): The maximum number of rows to delete in a single run.
|
||||
If provided and greater than zero, rows are selected
|
||||
deterministically from the oldest first (by timestamp then id)
|
||||
up to this limit in this execution.
|
||||
""" # noqa: E501
|
||||
|
||||
def __init__(self, retention_period_days: int):
|
||||
def __init__(self, retention_period_days: int, max_rows_per_run: int | None = None):
|
||||
"""
|
||||
:param retention_period_days: Number of days to keep in the logs table
|
||||
"""
|
||||
:param max_rows_per_run: The maximum number of rows to delete in a single run.
|
||||
If provided and greater than zero, rows are selected deterministically from the
|
||||
oldest first (by timestamp then id) up to this limit in this execution.
|
||||
""" # noqa: E501
|
||||
self.retention_period_days = retention_period_days
|
||||
self.max_rows_per_run = max_rows_per_run
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
@@ -56,17 +64,19 @@ class LogPruneCommand(BaseCommand):
|
||||
start_time = time.time()
|
||||
|
||||
# Select all IDs that need to be deleted
|
||||
ids_to_delete = (
|
||||
db.session.execute(
|
||||
sa.select(Log.id).where(
|
||||
Log.dttm
|
||||
< datetime.now() - timedelta(days=self.retention_period_days)
|
||||
)
|
||||
)
|
||||
.scalars()
|
||||
.all()
|
||||
select_stmt = sa.select(Log.id).where(
|
||||
Log.dttm < datetime.now() - timedelta(days=self.retention_period_days)
|
||||
)
|
||||
|
||||
# Optionally limited by max_rows_per_run
|
||||
# order by oldest first for deterministic deletion
|
||||
if self.max_rows_per_run is not None and self.max_rows_per_run > 0:
|
||||
select_stmt = select_stmt.order_by(Log.dttm.asc(), Log.id.asc()).limit(
|
||||
self.max_rows_per_run
|
||||
)
|
||||
|
||||
ids_to_delete = db.session.execute(select_stmt).scalars().all()
|
||||
|
||||
total_rows = len(ids_to_delete)
|
||||
|
||||
logger.info("Total rows to be deleted: %s", f"{total_rows:,}")
|
||||
|
||||
@@ -1215,7 +1215,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
|
||||
# "prune_logs": {
|
||||
# "task": "prune_logs",
|
||||
# "schedule": crontab(minute="*", hour="*"),
|
||||
# "kwargs": {"retention_period_days": 180},
|
||||
# "kwargs": {"retention_period_days": 180, "max_rows_per_run": 10000},
|
||||
# },
|
||||
# Uncomment to enable Slack channel cache warm-up
|
||||
# "slack.cache_channels": {
|
||||
|
||||
@@ -177,7 +177,10 @@ def prune_query(
|
||||
|
||||
@celery_app.task(name="prune_logs", bind=True)
|
||||
def prune_logs(
|
||||
self: Task, retention_period_days: int | None = None, **kwargs: Any
|
||||
self: Task,
|
||||
retention_period_days: int | None = None,
|
||||
max_rows_per_run: int | None = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
stats_logger: BaseStatsLogger = current_app.config["STATS_LOGGER"]
|
||||
stats_logger.incr("prune_logs")
|
||||
@@ -193,6 +196,6 @@ def prune_logs(
|
||||
)
|
||||
|
||||
try:
|
||||
LogPruneCommand(retention_period_days).run()
|
||||
LogPruneCommand(retention_period_days, max_rows_per_run).run()
|
||||
except CommandException as ex:
|
||||
logger.exception("An error occurred while pruning logs: %s", ex)
|
||||
|
||||
Reference in New Issue
Block a user