diff --git a/superset/commands/tasks/submit.py b/superset/commands/tasks/submit.py index 247f209b7d0..a4caeade09f 100644 --- a/superset/commands/tasks/submit.py +++ b/superset/commands/tasks/submit.py @@ -34,6 +34,7 @@ from superset.daos.exceptions import DAOCreateFailedError from superset.stats_logger import BaseStatsLogger from superset.tasks.locks import task_lock from superset.tasks.utils import get_active_dedup_key +from superset.utils.core import get_user_id from superset.utils.decorators import on_error, transaction if TYPE_CHECKING: @@ -87,7 +88,7 @@ class SubmitTaskCommand(BaseCommand): task_type = self._properties["task_type"] task_key = self._properties.get("task_key") or str(uuid.uuid4()) scope = self._properties.get("scope", TaskScope.PRIVATE.value) - user_id = self._properties.get("user_id") + user_id = get_user_id() # Build dedup_key for lock dedup_key = get_active_dedup_key( diff --git a/superset/tasks/decorators.py b/superset/tasks/decorators.py index 1c69280f80f..e2f753a94ad 100644 --- a/superset/tasks/decorators.py +++ b/superset/tasks/decorators.py @@ -32,7 +32,6 @@ from superset.tasks.context import TaskContext from superset.tasks.manager import TaskManager from superset.tasks.registry import TaskRegistry from superset.tasks.utils import generate_random_task_key -from superset.utils.core import get_user_id if TYPE_CHECKING: from superset.models.tasks import Task @@ -295,7 +294,6 @@ class TaskWrapper(Generic[P]): "task_name": task_name, "scope": scope.value, "properties": properties, - "user_id": get_user_id(), } ).run_with_info() diff --git a/tests/integration_tests/tasks/commands/test_submit.py b/tests/integration_tests/tasks/commands/test_submit.py index a6a7f6f3171..35290e12c2f 100644 --- a/tests/integration_tests/tasks/commands/test_submit.py +++ b/tests/integration_tests/tasks/commands/test_submit.py @@ -36,7 +36,6 @@ def test_submit_task_success(app_context, login_as, get_user) -> None: "task_type": "test-type", "task_key": "test-key", "task_name": "Test Task", - "user_id": admin.id, } ) @@ -49,6 +48,7 @@ def test_submit_task_success(app_context, login_as, get_user) -> None: assert result.task_name == "Test Task" assert result.status == TaskStatus.PENDING.value assert result.payload == "{}" + assert result.user_id == admin.id # Verify in database db.session.refresh(result) @@ -70,7 +70,6 @@ def test_submit_task_with_all_fields(app_context, login_as, get_user) -> None: "task_type": "test-type", "task_key": "test-key-full", "task_name": "Test Task Full", - "user_id": admin.id, "payload": {"key": "value"}, "properties": {"execution_mode": "async", "timeout": 300}, } @@ -117,10 +116,10 @@ def test_submit_task_joins_existing(app_context, login_as, get_user) -> None: "task_type": "test-type", "task_key": "shared-key", "task_name": "First Task", - "user_id": admin.id, } ) task1 = command1.run() + assert task1.user_id == admin.id try: # Submit second task with same task_key and type @@ -129,7 +128,6 @@ def test_submit_task_joins_existing(app_context, login_as, get_user) -> None: "task_type": "test-type", "task_key": "shared-key", "task_name": "Second Task", - "user_id": admin.id, } ) @@ -152,7 +150,6 @@ def test_submit_task_without_task_key(app_context, login_as, get_user) -> None: data={ "task_type": "test-type", "task_name": "Test Task No ID", - "user_id": admin.id, } ) @@ -164,6 +161,7 @@ def test_submit_task_without_task_key(app_context, login_as, get_user) -> None: assert result.task_name == "Test Task No ID" assert result.task_key is not None # Command generated UUID assert result.uuid is not None + assert result.user_id == admin.id finally: # Cleanup db.session.delete(result) @@ -182,7 +180,6 @@ def test_submit_task_run_with_info_returns_is_new_true( "task_type": "test-type", "task_key": "unique-key-is-new", "task_name": "Test Task", - "user_id": admin.id, } ) @@ -191,6 +188,7 @@ def test_submit_task_run_with_info_returns_is_new_true( assert is_new is True assert task.task_key == "unique-key-is-new" + assert task.user_id == admin.id finally: # Cleanup db.session.delete(task) @@ -210,11 +208,11 @@ def test_submit_task_run_with_info_returns_is_new_false( "task_type": "test-type", "task_key": "shared-key-is-new", "task_name": "First Task", - "user_id": admin.id, } ) task1, is_new1 = command1.run_with_info() assert is_new1 is True + assert task1.user_id == admin.id try: # Submit second task with same key @@ -223,7 +221,6 @@ def test_submit_task_run_with_info_returns_is_new_false( "task_type": "test-type", "task_key": "shared-key-is-new", "task_name": "Second Task", - "user_id": admin.id, } ) task2, is_new2 = command2.run_with_info() diff --git a/tests/integration_tests/tasks/test_sync_join_wait.py b/tests/integration_tests/tasks/test_sync_join_wait.py index 9a611cd6b29..88702b6fce9 100644 --- a/tests/integration_tests/tasks/test_sync_join_wait.py +++ b/tests/integration_tests/tasks/test_sync_join_wait.py @@ -42,11 +42,11 @@ def test_submit_task_distinguishes_new_vs_existing( "task_type": "test-type", "task_key": "distinguish-key", "task_name": "First Task", - "user_id": admin.id, } ).run_with_info() assert is_new1 is True + assert task1.user_id == admin.id try: # Second submission with same key - should join existing @@ -55,7 +55,6 @@ def test_submit_task_distinguishes_new_vs_existing( "task_type": "test-type", "task_key": "distinguish-key", "task_name": "Second Task", - "user_id": admin.id, } ).run_with_info() @@ -85,9 +84,9 @@ def test_wait_for_completion_timeout(app_context, login_as, get_user) -> None: "task_type": "test-timeout", "task_key": "timeout-key", "task_name": "Timeout Task", - "user_id": admin.id, } ).run_with_info() + assert task.user_id == admin.id try: # Force polling mode by mocking signal_cache as None @@ -119,9 +118,9 @@ def test_wait_returns_immediately_for_terminal_task( "task_type": "test-immediate", "task_key": "immediate-key", "task_name": "Immediate Task", - "user_id": admin.id, } ).run_with_info() + assert task.user_id == admin.id TaskDAO.update(task, {"status": TaskStatus.SUCCESS.value}) db.session.commit()