refactor(config): SIGNAL_CACHE_CONFIG → DISTRIBUTED_COORDINATION_CONFIG (#38395)

This commit is contained in:
Michael S. Molina
2026-03-04 14:40:21 -03:00
committed by GitHub
parent 832fee3ff8
commit 19f949276c
15 changed files with 95 additions and 85 deletions

View File

@@ -82,8 +82,8 @@ def task_context(mock_task, mock_task_dao, mock_update_command, mock_flask_app):
patch("superset.tasks.context.current_app") as mock_current_app,
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
# Disable Redis by making signal_cache return None
mock_cache_manager.signal_cache = None
# Disable Redis by making distributed_coordination return None
mock_cache_manager.distributed_coordination = None
# Configure current_app mock
mock_current_app.config = mock_flask_app.config

View File

@@ -123,13 +123,13 @@ class TestTaskManagerPubSub:
@patch("superset.tasks.manager.cache_manager")
def test_is_pubsub_available_no_redis(self, mock_cache_manager):
"""Test is_pubsub_available returns False when Redis not configured"""
mock_cache_manager.signal_cache = None
mock_cache_manager.distributed_coordination = None
assert TaskManager.is_pubsub_available() is False
@patch("superset.tasks.manager.cache_manager")
def test_is_pubsub_available_with_redis(self, mock_cache_manager):
"""Test is_pubsub_available returns True when Redis is configured"""
mock_cache_manager.signal_cache = MagicMock()
mock_cache_manager.distributed_coordination = MagicMock()
assert TaskManager.is_pubsub_available() is True
def test_get_abort_channel(self):
@@ -148,7 +148,7 @@ class TestTaskManagerPubSub:
@patch("superset.tasks.manager.cache_manager")
def test_publish_abort_no_redis(self, mock_cache_manager):
"""Test publish_abort returns False when Redis not available"""
mock_cache_manager.signal_cache = None
mock_cache_manager.distributed_coordination = None
result = TaskManager.publish_abort("test-uuid")
assert result is False
@@ -157,7 +157,7 @@ class TestTaskManagerPubSub:
"""Test publish_abort publishes message successfully"""
mock_redis = MagicMock()
mock_redis.publish.return_value = 1 # One subscriber
mock_cache_manager.signal_cache = mock_redis
mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.publish_abort("test-uuid")
@@ -169,7 +169,7 @@ class TestTaskManagerPubSub:
"""Test publish_abort handles Redis errors gracefully"""
mock_redis = MagicMock()
mock_redis.publish.side_effect = redis.RedisError("Connection lost")
mock_cache_manager.signal_cache = mock_redis
mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.publish_abort("test-uuid")
@@ -194,7 +194,7 @@ class TestTaskManagerListenForAbort:
@patch("superset.tasks.manager.cache_manager")
def test_listen_for_abort_no_redis_uses_polling(self, mock_cache_manager):
"""Test listen_for_abort falls back to polling when Redis unavailable"""
mock_cache_manager.signal_cache = None
mock_cache_manager.distributed_coordination = None
callback = MagicMock()
with patch.object(TaskManager, "_poll_for_abort", return_value=None):
@@ -218,7 +218,7 @@ class TestTaskManagerListenForAbort:
mock_redis = MagicMock()
mock_pubsub = MagicMock()
mock_redis.pubsub.return_value = mock_pubsub
mock_cache_manager.signal_cache = mock_redis
mock_cache_manager.distributed_coordination = mock_redis
callback = MagicMock()
@@ -245,7 +245,7 @@ class TestTaskManagerListenForAbort:
mock_redis = MagicMock()
mock_redis.pubsub.side_effect = redis.RedisError("Connection failed")
mock_cache_manager.signal_cache = mock_redis
mock_cache_manager.distributed_coordination = mock_redis
callback = MagicMock()
@@ -290,7 +290,7 @@ class TestTaskManagerCompletion:
@patch("superset.tasks.manager.cache_manager")
def test_publish_completion_no_redis(self, mock_cache_manager):
"""Test publish_completion returns False when Redis not available"""
mock_cache_manager.signal_cache = None
mock_cache_manager.distributed_coordination = None
result = TaskManager.publish_completion("test-uuid", "success")
assert result is False
@@ -299,7 +299,7 @@ class TestTaskManagerCompletion:
"""Test publish_completion publishes message successfully"""
mock_redis = MagicMock()
mock_redis.publish.return_value = 1 # One subscriber
mock_cache_manager.signal_cache = mock_redis
mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.publish_completion("test-uuid", "success")
@@ -311,7 +311,7 @@ class TestTaskManagerCompletion:
"""Test publish_completion handles Redis errors gracefully"""
mock_redis = MagicMock()
mock_redis.publish.side_effect = redis.RedisError("Connection lost")
mock_cache_manager.signal_cache = mock_redis
mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.publish_completion("test-uuid", "success")
@@ -323,7 +323,7 @@ class TestTaskManagerCompletion:
"""Test wait_for_completion raises ValueError for missing task"""
import pytest
mock_cache_manager.signal_cache = None
mock_cache_manager.distributed_coordination = None
mock_dao.find_one_or_none.return_value = None
with pytest.raises(ValueError, match="not found"):
@@ -333,7 +333,7 @@ class TestTaskManagerCompletion:
@patch("superset.daos.tasks.TaskDAO")
def test_wait_for_completion_already_complete(self, mock_dao, mock_cache_manager):
"""Test wait_for_completion returns immediately for terminal state"""
mock_cache_manager.signal_cache = None
mock_cache_manager.distributed_coordination = None
mock_task = MagicMock()
mock_task.uuid = "test-uuid"
mock_task.status = "success"
@@ -351,7 +351,7 @@ class TestTaskManagerCompletion:
"""Test wait_for_completion raises TimeoutError when timeout expires"""
import pytest
mock_cache_manager.signal_cache = None
mock_cache_manager.distributed_coordination = None
mock_task = MagicMock()
mock_task.uuid = "test-uuid"
mock_task.status = "in_progress" # Never completes
@@ -364,7 +364,7 @@ class TestTaskManagerCompletion:
@patch("superset.daos.tasks.TaskDAO")
def test_wait_for_completion_polling_success(self, mock_dao, mock_cache_manager):
"""Test wait_for_completion returns when task completes via polling"""
mock_cache_manager.signal_cache = None
mock_cache_manager.distributed_coordination = None
mock_task_pending = MagicMock()
mock_task_pending.uuid = "test-uuid"
mock_task_pending.status = "pending"
@@ -414,7 +414,7 @@ class TestTaskManagerCompletion:
"data": "success",
}
mock_redis.pubsub.return_value = mock_pubsub
mock_cache_manager.signal_cache = mock_redis
mock_cache_manager.distributed_coordination = mock_redis
result = TaskManager.wait_for_completion(
"test-uuid",
@@ -446,7 +446,7 @@ class TestTaskManagerCompletion:
# Set up mock Redis that fails
mock_redis = MagicMock()
mock_redis.pubsub.side_effect = redis.RedisError("Connection failed")
mock_cache_manager.signal_cache = mock_redis
mock_cache_manager.distributed_coordination = mock_redis
# With fail-fast behavior, Redis error is raised instead of falling back
with pytest.raises(redis.RedisError, match="Connection failed"):

View File

@@ -89,8 +89,8 @@ def task_context_for_timeout(mock_flask_app, mock_task_abortable):
patch("superset.daos.tasks.TaskDAO") as mock_dao,
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
# Disable Redis by making signal_cache return None
mock_cache_manager.signal_cache = None
# Disable Redis by making distributed_coordination return None
mock_cache_manager.distributed_coordination = None
# Configure current_app mock
mock_current_app.config = mock_flask_app.config
@@ -277,8 +277,8 @@ class TestTimeoutTrigger:
) as mock_update_cmd,
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
# Disable Redis by making signal_cache return None
mock_cache_manager.signal_cache = None
# Disable Redis by making distributed_coordination return None
mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -323,8 +323,8 @@ class TestTimeoutTrigger:
patch("superset.tasks.context.logger") as mock_logger,
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
# Disable Redis by making signal_cache return None
mock_cache_manager.signal_cache = None
# Disable Redis by making distributed_coordination return None
mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -363,8 +363,8 @@ class TestTimeoutTrigger:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
# Disable Redis by making signal_cache return None
mock_cache_manager.signal_cache = None
# Disable Redis by making distributed_coordination return None
mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -469,8 +469,8 @@ class TestTimeoutTerminalState:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
# Disable Redis by making signal_cache return None
mock_cache_manager.signal_cache = None
# Disable Redis by making distributed_coordination return None
mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -510,8 +510,8 @@ class TestTimeoutTerminalState:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
# Disable Redis by making signal_cache return None
mock_cache_manager.signal_cache = None
# Disable Redis by making distributed_coordination return None
mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -547,8 +547,8 @@ class TestTimeoutTerminalState:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
# Disable Redis by making signal_cache return None
mock_cache_manager.signal_cache = None
# Disable Redis by making distributed_coordination return None
mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app
@@ -584,8 +584,8 @@ class TestTimeoutTerminalState:
patch("superset.commands.tasks.update.UpdateTaskCommand"),
patch("superset.tasks.manager.cache_manager") as mock_cache_manager,
):
# Disable Redis by making signal_cache return None
mock_cache_manager.signal_cache = None
# Disable Redis by making distributed_coordination return None
mock_cache_manager.distributed_coordination = None
mock_current_app.config = mock_flask_app.config
mock_current_app._get_current_object.return_value = mock_flask_app