perf(gtf): improve task base filter (#37900)

This commit is contained in:
Ville Brofeldt
2026-02-11 10:40:07 -08:00
committed by GitHub
parent 26a2e12779
commit 00d02cb2ea
3 changed files with 281 additions and 94 deletions

View File

@@ -26,87 +26,38 @@ from superset.views.base import BaseFilter
class TaskFilter(BaseFilter): # pylint: disable=too-few-public-methods
"""
Filter for Task that shows tasks based on scope and user permissions.
Filter for Task that shows tasks based on user subscriptions.
Filtering rules:
- Admins: See all tasks (private, shared, system)
- Non-admins:
- Private tasks: Only their own tasks
- Shared tasks: Tasks they're subscribed to
- System tasks: None (admin-only)
Non-admins only see tasks they're subscribed to. Task creators are
automatically subscribed when creating a task, so this covers both
owned and shared tasks. Unsubscribing removes visibility.
Admins see all tasks without filtering.
"""
def apply(self, query: Query, value: Any) -> Query:
"""Apply the filter to the query."""
from flask import g, has_request_context
from sqlalchemy import or_
from sqlalchemy import and_, select
from superset import db, security_manager
from superset import security_manager
from superset.models.task_subscribers import TaskSubscriber
from superset.models.tasks import Task
# If no request context or no user, return unfiltered query
# (this handles background tasks and system operations)
if not has_request_context() or not hasattr(g, "user"):
return query
# If user is admin, return unfiltered query
if security_manager.is_admin():
return query
# For non-admins, filter by scope and permissions
# If user is admin or no user_id, return unfiltered query.
# This typically applies to background tasks and system operations
user_id = get_user_id()
# Use subquery for shared tasks to avoid join ambiguity
shared_task_ids_query = (
db.session.query(Task.id)
.join(TaskSubscriber, Task.id == TaskSubscriber.task_id)
.filter(
Task.scope == "shared",
TaskSubscriber.user_id == user_id,
)
)
# Build filter conditions:
# 1. Private tasks created by current user
# 2. Shared tasks where user is subscribed (via subquery)
# 3. System tasks are excluded (admin-only)
return query.filter(
or_(
# Own private tasks
(Task.scope == "private") & (Task.created_by_fk == user_id),
# Shared tasks where user is subscribed
Task.id.in_(shared_task_ids_query),
)
)
class TaskSubscriberFilter(BaseFilter): # pylint: disable=too-few-public-methods
"""
Filter tasks by subscriber user ID.
This filter allows finding tasks where a specific user is subscribed.
Used by the frontend for the subscribers filter dropdown.
"""
def apply(self, query: Query, value: Any) -> Query:
"""Apply the filter to the query."""
from superset import db
from superset.models.task_subscribers import TaskSubscriber
from superset.models.tasks import Task
if not value:
if not user_id or security_manager.is_admin():
return query
# Handle both single ID and list of IDs
if isinstance(value, (list, tuple)):
user_ids = [int(v) for v in value]
else:
user_ids = [int(value)]
# Find tasks where any of these users are subscribers
subscribed_task_ids = db.session.query(TaskSubscriber.task_id).filter(
TaskSubscriber.user_id.in_(user_ids)
is_subscribed = (
select(TaskSubscriber.id)
.where(
and_(
TaskSubscriber.task_id == Task.id,
TaskSubscriber.user_id == user_id,
)
)
.exists()
)
return query.filter(Task.id.in_(subscribed_task_ids))
return query.filter(is_subscribed)

View File

@@ -450,33 +450,45 @@ def test_cancel_already_aborting_is_idempotent(app_context, get_user) -> None:
db.session.commit()
def test_cancel_shared_task_not_subscribed_raises_error(app_context, get_user) -> None:
"""Test non-subscriber cannot cancel shared task"""
def test_cancel_shared_task_not_subscribed_raises_not_found(
app_context, get_user
) -> None:
"""Test non-subscriber cannot see or cancel shared task.
With subscription-only filtering, users who aren't subscribed to a
shared task can't see it at all, so canceling returns "not found"
rather than "permission denied".
"""
admin = get_user("admin")
gamma = get_user("gamma")
unique_key = f"not_subscribed_test_{uuid4().hex[:8]}"
# Create a shared task with only admin as subscriber
task = TaskDAO.create_task(
task_type="test_type",
task_key="not_subscribed_test",
scope=TaskScope.SHARED,
user_id=admin.id,
)
task.created_by = admin
db.session.commit()
# Use test_request_context to ensure TaskFilter is applied
with app.test_request_context():
# Create a shared task with only admin as subscriber
with override_user(admin):
task = TaskDAO.create_task(
task_type="test_type",
task_key=unique_key,
scope=TaskScope.SHARED,
user_id=admin.id,
)
db.session.commit()
try:
# Try to cancel as gamma (not subscribed)
with override_user(gamma):
command = CancelTaskCommand(task_uuid=task.uuid)
try:
# Try to cancel as gamma (not subscribed) - they can't see the task
with override_user(gamma):
command = CancelTaskCommand(task_uuid=task.uuid)
with pytest.raises(TaskPermissionDeniedError):
command.run()
# TaskNotFoundError because gamma isn't subscribed and can't see
# the task
with pytest.raises(TaskNotFoundError):
command.run()
# Verify task unchanged
db.session.refresh(task)
assert task.status == TaskStatus.PENDING.value
finally:
# Cleanup
db.session.delete(task)
db.session.commit()
# Verify task unchanged
db.session.refresh(task)
assert task.status == TaskStatus.PENDING.value
finally:
# Cleanup
db.session.delete(task)
db.session.commit()

View File

@@ -0,0 +1,224 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Integration tests for TaskFilter subscription-based visibility."""
from uuid import uuid4
from superset_core.api.tasks import TaskScope, TaskStatus
from superset import db
from superset.commands.tasks import SubmitTaskCommand
from superset.commands.tasks.cancel import CancelTaskCommand
from superset.daos.tasks import TaskDAO
from superset.utils.core import override_user
from tests.integration_tests.test_app import app
def test_unsubscribed_user_cannot_see_task(app_context, get_user) -> None:
"""
Test that unsubscribing from a shared task removes visibility.
Scenario:
1. Gamma submits a shared task
2. Admin joins the same task (becomes subscriber)
3. Gamma cancels (unsubscribes since multiple subscribers)
4. Verify gamma can no longer see the task in their list
5. Verify admin still sees the task
"""
gamma = get_user("gamma")
admin = get_user("admin")
unique_key = f"visibility_test_{uuid4().hex[:8]}"
with app.test_request_context():
# Step 1: Gamma submits a shared task
with override_user(gamma):
submit_command = SubmitTaskCommand(
data={
"task_type": "test_type",
"task_key": unique_key,
"scope": TaskScope.SHARED.value,
}
)
task = submit_command.run()
task_uuid = task.uuid
try:
# Verify gamma is subscribed
db.session.refresh(task)
assert task.subscriber_count == 1
assert task.has_subscriber(gamma.id)
# Step 2: Admin joins the same task
with override_user(admin):
join_command = SubmitTaskCommand(
data={
"task_type": "test_type",
"task_key": unique_key,
"scope": TaskScope.SHARED.value,
}
)
joined_task = join_command.run()
# Verify same task, now with 2 subscribers
assert joined_task.uuid == task_uuid
db.session.refresh(task)
assert task.subscriber_count == 2
assert task.has_subscriber(gamma.id)
assert task.has_subscriber(admin.id)
# Step 3: Gamma cancels (unsubscribes)
with override_user(gamma):
cancel_command = CancelTaskCommand(task_uuid=task_uuid)
cancel_command.run()
# Verify gamma was unsubscribed
assert cancel_command.action_taken == "unsubscribed"
db.session.refresh(task)
assert task.subscriber_count == 1
assert not task.has_subscriber(gamma.id)
assert task.has_subscriber(admin.id)
# Step 4: Verify gamma can no longer see the task
with override_user(gamma):
gamma_visible_task = TaskDAO.find_one_or_none(uuid=task_uuid)
assert gamma_visible_task is None, (
"Gamma should not see task after unsubscribing"
)
# Step 5: Verify admin still sees the task
with override_user(admin):
admin_visible_task = TaskDAO.find_one_or_none(uuid=task_uuid)
assert admin_visible_task is not None
assert admin_visible_task.uuid == task_uuid
finally:
# Cleanup - use skip_base_filter to ensure we can delete
db.session.delete(task)
db.session.commit()
def test_task_creator_subscribed_automatically(app_context, get_user) -> None:
"""
Test that task creators are automatically subscribed.
This verifies the invariant that makes subscription-only filtering work:
creators must always be subscribed to their own tasks.
"""
gamma = get_user("gamma")
unique_key = f"creator_subscribed_test_{uuid4().hex[:8]}"
with app.test_request_context():
# Create a private task
with override_user(gamma):
command = SubmitTaskCommand(
data={
"task_type": "test_type",
"task_key": unique_key,
"scope": TaskScope.PRIVATE.value,
}
)
task = command.run()
try:
db.session.refresh(task)
# Creator should be subscribed
assert task.subscriber_count == 1
assert task.has_subscriber(gamma.id)
# Creator should see their task
with override_user(gamma):
visible_task = TaskDAO.find_one_or_none(uuid=task.uuid)
assert visible_task is not None
assert visible_task.uuid == task.uuid
finally:
db.session.delete(task)
db.session.commit()
def test_aborted_task_still_visible_to_subscribers(app_context, get_user) -> None:
"""
Test that aborting a task does NOT remove subscriptions.
When a task is aborted (not unsubscribed), all current subscribers
should still be able to see the task in their list.
"""
gamma = get_user("gamma")
admin = get_user("admin")
unique_key = f"abort_visibility_test_{uuid4().hex[:8]}"
with app.test_request_context():
# Create a shared task with gamma
with override_user(gamma):
submit_command = SubmitTaskCommand(
data={
"task_type": "test_type",
"task_key": unique_key,
"scope": TaskScope.SHARED.value,
}
)
task = submit_command.run()
task_uuid = task.uuid
try:
# Admin joins the task
with override_user(admin):
join_command = SubmitTaskCommand(
data={
"task_type": "test_type",
"task_key": unique_key,
"scope": TaskScope.SHARED.value,
}
)
join_command.run()
# Verify 2 subscribers
db.session.refresh(task)
assert task.subscriber_count == 2
# Admin force-aborts the task (aborts for all, doesn't unsubscribe)
with override_user(admin):
cancel_command = CancelTaskCommand(task_uuid=task_uuid, force=True)
cancel_command.run()
# Verify task is aborted
assert cancel_command.action_taken == "aborted"
db.session.refresh(task)
assert task.status == TaskStatus.ABORTED.value
# Subscriptions should remain intact
assert task.subscriber_count == 2
assert task.has_subscriber(gamma.id)
assert task.has_subscriber(admin.id)
# Both users should still see the aborted task
with override_user(gamma):
gamma_task = TaskDAO.find_one_or_none(uuid=task_uuid)
assert gamma_task is not None
assert gamma_task.status == TaskStatus.ABORTED.value
with override_user(admin):
admin_task = TaskDAO.find_one_or_none(uuid=task_uuid)
assert admin_task is not None
assert admin_task.status == TaskStatus.ABORTED.value
finally:
db.session.delete(task)
db.session.commit()