Files
superset2/docs/developer_docs/extensions/tasks.md

443 lines
16 KiB
Markdown

---
title: Tasks
sidebar_position: 10
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Global Task Framework
The Global Task Framework (GTF) provides a unified way to manage background tasks. It handles task execution, progress tracking, cancellation, and deduplication for both synchronous and asynchronous execution. The framework uses distributed locking internally to ensure race-free operations—you don't need to worry about concurrent task creation or cancellation conflicts.
## Enabling GTF
GTF is disabled by default and must be enabled via the `GLOBAL_TASK_FRAMEWORK` feature flag in your `superset_config.py`:
```python
FEATURE_FLAGS = {
"GLOBAL_TASK_FRAMEWORK": True,
}
```
When GTF is disabled:
- The Task List UI menu item is hidden
- The `/api/v1/task/*` endpoints return 404
- Calling or scheduling a `@task`-decorated function raises `GlobalTaskFrameworkDisabledError`
:::note Future Migration
When GTF is considered stable, it will replace legacy Celery tasks for built-in features like thumbnails and alerts & reports. Enabling this flag prepares your deployment for that migration.
:::
## Quick Start
### Define a Task
```python
from superset_core.tasks.decorators import task, get_context
@task
def process_data(dataset_id: int) -> None:
ctx = get_context()
@ctx.on_cleanup
def cleanup():
logger.info("Processing complete")
data = fetch_dataset(dataset_id)
process_and_cache(data)
```
### Execute a Task
```python
# Async execution - schedules on Celery worker
task = process_data.schedule(dataset_id=123)
print(task.status) # "pending"
# Sync execution - runs inline in current process
task = process_data(dataset_id=123)
# ... blocks until complete
print(task.status) # "success"
```
### Async vs Sync Execution
| Method | When to Use |
|--------|-------------|
| `.schedule()` | Long-running operations, background processing, when you need to return immediately |
| Direct call | Short operations, when deduplication matters, when you need the result before responding |
Both execution modes provide the same task features: deduplication, progress tracking, cancellation, and visibility in the Task List UI. The difference is whether execution happens in a Celery worker (async) or inline (sync).
## Task Lifecycle
```
PENDING ──→ IN_PROGRESS ────→ SUCCESS
│ │
│ ├──────────→ FAILURE
│ ↓ ↑
│ ABORTING ────────────┘
│ │
│ ├──────────→ TIMED_OUT (timeout)
│ │
└─────────────┴──────────→ ABORTED (user cancel)
```
| Status | Description |
|--------|-------------|
| `PENDING` | Queued, awaiting execution |
| `IN_PROGRESS` | Executing |
| `ABORTING` | Abort/timeout triggered, abort handlers running |
| `SUCCESS` | Completed successfully |
| `FAILURE` | Failed with error or abort/cleanup handler exception |
| `ABORTED` | Cancelled by user/admin |
| `TIMED_OUT` | Exceeded configured timeout |
## Context API
Access task context via `get_context()` from within any `@task` function. The context provides methods for updating task metadata and registering handlers.
### Updating Task Metadata
Use `update_task()` to report progress and store custom payload data:
```python
@task
def my_task(items: list[int]) -> None:
ctx = get_context()
for i, item in enumerate(items):
result = process(item)
ctx.update_task(
progress=(i + 1, len(items)),
payload={"last_result": result}
)
```
:::tip
Call `update_task()` once per iteration for best performance. Frequent DB writes are throttled to limit metastore load, so batching progress and payload updates together in a single call ensures both are persisted at the same time.
:::
#### Progress Formats
The `progress` parameter accepts three formats:
| Format | Example | Display |
|--------|---------|---------|
| `tuple[int, int]` | `progress=(3, 100)` | 3 of 100 (3%) with ETA |
| `float` (0.0-1.0) | `progress=0.5` | 50% with ETA |
| `int` | `progress=42` | 42 processed |
:::tip
Use the tuple format `(current, total)` whenever possible. It provides the richest information to users: showing both the count and percentage, while still computing ETA automatically.
:::
#### Payload
The `payload` parameter stores custom metadata that can help users understand what the task is doing. Each call to `update_task()` replaces the previous payload completely.
In the Task List UI, when a payload is defined, an info icon appears in the **Details** column. Users can hover over it to see the JSON content.
### Handlers
Register handlers to run cleanup logic or respond to abort requests:
| Handler | When it runs | Use case |
|---------|--------------|----------|
| `on_cleanup` | Always (success, failure, abort) | Release resources, close connections |
| `on_abort` | When task is aborted | Set stop flag, cancel external operations |
```python
@task
def my_task() -> None:
ctx = get_context()
@ctx.on_cleanup
def cleanup():
logger.info("Task ended, cleaning up")
@ctx.on_abort
def handle_abort():
logger.info("Abort requested")
# ... task logic
```
Multiple handlers of the same type execute in LIFO order (last registered runs first). Abort handlers run first when abort is detected, then cleanup handlers run when the task ends.
#### Best-Effort Execution
**All registered handlers will always be attempted, even if one fails.** This ensures that a failure in one handler doesn't prevent other handlers from running their cleanup logic.
For example, if you have three cleanup handlers and the second one throws an exception:
1. Handler 3 runs ✓
2. Handler 2 throws an exception ✗ (logged, but execution continues)
3. Handler 1 runs ✓
If any handler fails, the task is marked as `FAILURE` with combined error details showing all handler failures.
:::tip
Write handlers to be independent and self-contained. Don't assume previous handlers succeeded, and don't rely on shared state between handlers.
:::
## Making Tasks Abortable
When users click **Cancel** in the Task List, the system decides whether to **abort** (stop) the task or **unsubscribe** (remove the user from a shared task). Abort occurs when:
- It's a private or system task
- It's a shared task and the user is the last subscriber
- An admin checks **Force abort** to stop the task for all subscribers
Pending tasks can always be aborted: they simply won't start. In-progress tasks require an abort handler to be abortable:
```python
@task
def abortable_task(items: list[str]) -> None:
ctx = get_context()
should_stop = False
@ctx.on_abort
def handle_abort():
nonlocal should_stop
should_stop = True
logger.info("Abort signal received")
@ctx.on_cleanup
def cleanup():
logger.info("Task ended, cleaning up")
for item in items:
if should_stop:
return # Exit gracefully
process(item)
```
**Key points:**
- Registering `on_abort` marks the task as abortable and starts the abort listener
- The abort handler fires automatically when abort is triggered
- Use a flag pattern to gracefully stop processing at safe points
- Without an abort handler, in-progress tasks cannot be aborted: the Cancel button in the Task List UI will be disabled
The framework automatically skips execution if a task was aborted while pending: no manual check needed at task start.
:::tip
Always implement an abort handler for long-running tasks. This allows users to cancel unneeded tasks and free up worker capacity for other operations.
:::
## Timeouts
Set a timeout to automatically abort tasks that run too long:
```python
from superset_core.tasks.decorators import task, get_context
from superset_core.tasks.types import TaskOptions
# Set default timeout in decorator
@task(timeout=300) # 5 minutes
def process_data(dataset_id: int) -> None:
ctx = get_context()
should_stop = False
@ctx.on_abort
def handle_abort():
nonlocal should_stop
should_stop = True
for chunk in fetch_large_dataset(dataset_id):
if should_stop:
return
process(chunk)
# Override timeout at call time
task = process_data.schedule(
dataset_id=123,
options=TaskOptions(timeout=600) # Override to 10 minutes
)
```
### How Timeouts Work
The timeout timer starts when the task begins executing (status changes to `IN_PROGRESS`). When the timeout expires:
1. **With an abort handler registered:** The task transitions to `ABORTING`, abort handlers run, then cleanup handlers run. The final status depends on handler execution:
- If handlers complete successfully → `TIMED_OUT` status
- If handlers throw an exception → `FAILURE` status
2. **Without an abort handler:** The framework cannot forcibly terminate the task. A warning is logged, and the task continues running. The Task List UI shows a warning indicator (⚠️) in the Details column to alert users that the timeout cannot be enforced.
### Timeout Precedence
| Source | Priority | Example |
|--------|----------|---------|
| `TaskOptions.timeout` | Highest | `options=TaskOptions(timeout=600)` |
| `@task(timeout=...)` | Default | `@task(timeout=300)` |
| Not set | No timeout | Task runs indefinitely |
Call-time options always override decorator defaults, allowing tasks to have sensible defaults while permitting callers to extend or shorten the timeout for specific use cases.
:::warning
Timeouts require an abort handler to be effective. Without one, the timeout triggers only a warning and the task continues running. Always implement an abort handler when using timeouts.
:::
## Deduplication
Use `task_key` to prevent duplicate task execution:
```python
from superset_core.tasks.types import TaskOptions
# Without key - creates new task each time (random UUID)
task1 = my_task.schedule(x=1)
task2 = my_task.schedule(x=1) # Different task
# With key - joins existing task if active
task1 = my_task.schedule(x=1, options=TaskOptions(task_key="report_123"))
task2 = my_task.schedule(x=1, options=TaskOptions(task_key="report_123")) # Returns same task
```
When a task with matching key already exists, the user is added as a subscriber and the existing task is returned. This behavior is consistent across all scopes—private tasks naturally have only one subscriber since their deduplication key includes the user ID.
Deduplication only applies to active tasks (pending/in-progress). Once a task completes, a new task with the same key can be created.
### Sync Join-and-Wait
When a sync call joins an existing task, it blocks until the task completes:
```python
# Schedule async task
task = my_task.schedule(options=TaskOptions(task_key="report_123"))
# Later sync call with same key blocks until completion of the active task
task2 = my_task(options=TaskOptions(task_key="report_123"))
assert task.uuid == task2.uuid # True
print(task2.status) # "success" (terminal status)
```
## Task Scopes
```python
from superset_core.tasks.decorators import task
from superset_core.tasks.types import TaskScope
@task # Private by default
def private_task(): ...
@task(scope=TaskScope.SHARED) # Multiple users can subscribe
def shared_task(): ...
@task(scope=TaskScope.SYSTEM) # Admin-only visibility
def system_task(): ...
```
| Scope | Visibility | Cancel Behavior |
|-------|------------|-----------------|
| `PRIVATE` | Creator only | Cancels immediately |
| `SHARED` | All subscribers | Last subscriber cancels; others unsubscribe |
| `SYSTEM` | Admins only | Admin cancels |
## Task Cleanup
Completed tasks accumulate in the database over time. Configure a scheduled prune job to automatically remove old tasks:
```python
# In your superset_config.py, add to your Celery beat schedule:
CELERY_CONFIG.beat_schedule["prune_tasks"] = {
"task": "prune_tasks",
"schedule": crontab(minute=0, hour=0), # Run daily at midnight
"kwargs": {
"retention_period_days": 90, # Keep tasks for 90 days
"max_rows_per_run": 10000, # Limit deletions per run
},
}
```
The prune job only removes tasks in terminal states (`SUCCESS`, `FAILURE`, `ABORTED`, `TIMED_OUT`). Active tasks (`PENDING`, `IN_PROGRESS`, `ABORTING`) are never pruned.
See `superset/config.py` for a complete example configuration.
:::tip Distributed Coordination for Faster Notifications
By default, abort detection and sync join-and-wait use database polling. Configure `DISTRIBUTED_COORDINATION_CONFIG` to enable Redis pub/sub for real-time notifications. See [Distributed Coordination Backend](/admin-docs/configuration/cache#signal-cache-backend) for configuration details.
:::
## API Reference
### @task Decorator
```python
@task(
name: str | None = None,
scope: TaskScope = TaskScope.PRIVATE,
timeout: int | None = None
)
```
- `name`: Task identifier (defaults to function name)
- `scope`: `PRIVATE`, `SHARED`, or `SYSTEM`
- `timeout`: Default timeout in seconds (can be overridden via `TaskOptions`)
### TaskContext Methods
| Method | Description |
|--------|-------------|
| `update_task(progress, payload)` | Update progress and/or custom payload |
| `on_cleanup(handler)` | Register cleanup handler |
| `on_abort(handler)` | Register abort handler (makes task abortable) |
### TaskOptions
```python
TaskOptions(
task_key: str | None = None,
task_name: str | None = None,
timeout: int | None = None
)
```
- `task_key`: Deduplication key (also used as display name if `task_name` is not set)
- `task_name`: Human-readable display name for the Task List UI
- `timeout`: Timeout in seconds (overrides decorator default)
:::tip
Provide a descriptive `task_name` for better readability in the Task List UI. While `task_key` is used for deduplication and may be technical (e.g., `chart_export_123`), `task_name` can be user-friendly (e.g., `"Export Sales Chart 123"`).
:::
## Error Handling
Let exceptions propagate: the framework captures them automatically and sets task status to `FAILURE`:
```python
@task
def risky_task() -> None:
# No try/catch needed - framework handles it
result = operation_that_might_fail()
```
On failure, the framework records:
- `error_message`: Exception message
- `exception_type`: Exception class name
- `stack_trace`: Full traceback (visible when `SHOW_STACKTRACE=True`)
In the Task List UI, failed tasks show error details when hovering over the status. When stack traces are enabled, a separate bug icon appears in the **Details** column for viewing the full traceback.
Cleanup handlers still run after an exception, so resources can be properly released as necessary.
:::tip
Use descriptive exception messages. In environments where stack traces are hidden (`SHOW_STACKTRACE=False`), users see only the error message and exception type when hovering over failed tasks. Clear messages help users troubleshoot issues without administrator assistance.
:::