fix(tasks): Add type hints to scheduler signal handler and task

Added proper type hints to the `log_task_failure` signal handler and the `scheduler` task function to fix type checking errors.

- Added type annotations for all signal handler parameters
- Added `self: Task` parameter to scheduler function (required when bind=True)
- Added pylint directives for unused arguments
- Fixed logging to use % formatting instead of f-strings
- Added null check for sender before accessing .name attribute

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Elizabeth Thompson
2025-10-10 11:39:55 -07:00
parent 5a26910ee3
commit 69177a6d0d

View File

@@ -41,19 +41,33 @@ from superset.utils.log import get_logger_from_status
logger = logging.getLogger(__name__)
@task_failure.connect
def log_task_failure(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **kw):
logger.exception(f"Celery task {sender.name} failed: {exception}", exc_info=einfo)
def log_task_failure( # pylint: disable=unused-argument
sender: Task | None = None,
task_id: str | None = None,
exception: Exception | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
traceback: Any = None,
einfo: Any = None,
**kw: Any,
) -> None:
task_name = sender.name if sender else "Unknown"
logger.exception("Celery task %s failed: %s", task_name, exception, exc_info=einfo)
@celery_app.task(
name="reports.scheduler",
bind=True,
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 3, "countdown": 60}, # Retry up to 3 times, wait 60s between
retry_kwargs={
"max_retries": 3,
"countdown": 60,
}, # Retry up to 3 times, wait 60s between
retry_backoff=True, # exponential backoff
)
def scheduler() -> None:
def scheduler(self: Task) -> None: # pylint: disable=unused-argument
"""
Celery beat main scheduler for reports
"""