diff --git a/UPDATING.md b/UPDATING.md index 7b887b1aa36..455e9a10ba4 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -64,19 +64,19 @@ ORDER BY total_calls DESC; **Security note:** Sensitive parameters (passwords, API keys, tokens) are automatically redacted in logs as `[REDACTED]`. -### Signal Cache Backend +### Distributed Coordination Backend -A new `SIGNAL_CACHE_CONFIG` configuration provides a unified Redis-based backend for real-time coordination features in Superset. This backend enables: +A new `DISTRIBUTED_COORDINATION_CONFIG` configuration provides a unified Redis-based backend for real-time coordination features in Superset. This backend enables: - **Pub/sub messaging** for real-time event notifications between workers - **Atomic distributed locking** using Redis SET NX EX (more performant than database-backed locks) - **Event-based coordination** for background task management -The signal cache is used by the Global Task Framework (GTF) for abort notifications and task completion signaling, and will eventually replace `GLOBAL_ASYNC_QUERIES_CACHE_BACKEND` as the standard signaling backend. Configuring this is recommended for Redis enabled production deployments. +The distributed coordination is used by the Global Task Framework (GTF) for abort notifications and task completion signaling, and will eventually replace `GLOBAL_ASYNC_QUERIES_CACHE_BACKEND` as the standard signaling backend. Configuring this is recommended for Redis enabled production deployments. Example configuration in `superset_config.py`: ```python -SIGNAL_CACHE_CONFIG = { +DISTRIBUTED_COORDINATION_CONFIG = { "CACHE_TYPE": "RedisCache", "CACHE_KEY_PREFIX": "signal_", "CACHE_REDIS_URL": "redis://localhost:6379/1", diff --git a/docs/admin_docs/configuration/cache.mdx b/docs/admin_docs/configuration/cache.mdx index 4bb37483181..be1459f09f1 100644 --- a/docs/admin_docs/configuration/cache.mdx +++ b/docs/admin_docs/configuration/cache.mdx @@ -159,9 +159,9 @@ Then on configuration: WEBDRIVER_AUTH_FUNC = auth_driver ``` -## Signal Cache Backend +## Distributed Coordination Backend -Superset supports an optional signal cache (`SIGNAL_CACHE_CONFIG`) for +Superset supports an optional distributed coordination (`DISTRIBUTED_COORDINATION_CONFIG`) for high-performance distributed operations. This configuration enables: - **Distributed locking**: Moves lock operations from the metadata database to Redis, improving @@ -176,11 +176,11 @@ that are not available in general Flask-Caching backends. ### Configuration -The signal cache uses Flask-Caching style configuration for consistency with other cache -backends. Configure `SIGNAL_CACHE_CONFIG` in `superset_config.py`: +The distributed coordination uses Flask-Caching style configuration for consistency with other cache +backends. Configure `DISTRIBUTED_COORDINATION_CONFIG` in `superset_config.py`: ```python -SIGNAL_CACHE_CONFIG = { +DISTRIBUTED_COORDINATION_CONFIG = { "CACHE_TYPE": "RedisCache", "CACHE_REDIS_HOST": "localhost", "CACHE_REDIS_PORT": 6379, @@ -192,7 +192,7 @@ SIGNAL_CACHE_CONFIG = { For Redis Sentinel deployments: ```python -SIGNAL_CACHE_CONFIG = { +DISTRIBUTED_COORDINATION_CONFIG = { "CACHE_TYPE": "RedisSentinelCache", "CACHE_REDIS_SENTINELS": [("sentinel1", 26379), ("sentinel2", 26379)], "CACHE_REDIS_SENTINEL_MASTER": "mymaster", @@ -205,7 +205,7 @@ SIGNAL_CACHE_CONFIG = { For SSL/TLS connections: ```python -SIGNAL_CACHE_CONFIG = { +DISTRIBUTED_COORDINATION_CONFIG = { "CACHE_TYPE": "RedisCache", "CACHE_REDIS_HOST": "redis.example.com", "CACHE_REDIS_PORT": 6380, @@ -229,7 +229,7 @@ Individual lock acquisitions can override this value when needed. ### Database-Only Mode -When `SIGNAL_CACHE_CONFIG` is not configured, Superset uses database-backed operations: +When `DISTRIBUTED_COORDINATION_CONFIG` is not configured, Superset uses database-backed operations: - **Locking**: Uses the KeyValue table with periodic cleanup of expired entries - **Event notifications**: Uses database polling instead of pub/sub diff --git a/docs/developer_docs/extensions/tasks.md b/docs/developer_docs/extensions/tasks.md index d1f33aa1475..47e0e7cf562 100644 --- a/docs/developer_docs/extensions/tasks.md +++ b/docs/developer_docs/extensions/tasks.md @@ -369,8 +369,8 @@ The prune job only removes tasks in terminal states (`SUCCESS`, `FAILURE`, `ABOR See `superset/config.py` for a complete example configuration. -:::tip Signal Cache for Faster Notifications -By default, abort detection and sync join-and-wait use database polling. Configure `SIGNAL_CACHE_CONFIG` to enable Redis pub/sub for real-time notifications. See [Signal Cache Backend](/admin-docs/configuration/cache#signal-cache-backend) for configuration details. +:::tip Distributed Coordination for Faster Notifications +By default, abort detection and sync join-and-wait use database polling. Configure `DISTRIBUTED_COORDINATION_CONFIG` to enable Redis pub/sub for real-time notifications. See [Distributed Coordination Backend](/admin-docs/configuration/cache#signal-cache-backend) for configuration details. ::: ## API Reference diff --git a/superset/commands/distributed_lock/acquire.py b/superset/commands/distributed_lock/acquire.py index e06439a49b4..c25501f7927 100644 --- a/superset/commands/distributed_lock/acquire.py +++ b/superset/commands/distributed_lock/acquire.py @@ -46,7 +46,7 @@ class AcquireDistributedLock(BaseDistributedLockCommand): """ Acquire a distributed lock with automatic backend selection. - Uses Redis SET NX EX when SIGNAL_CACHE_CONFIG is configured, + Uses Redis SET NX EX when DISTRIBUTED_COORDINATION_CONFIG is configured, otherwise falls back to KeyValue table. Raises AcquireDistributedLockFailedException if: diff --git a/superset/commands/distributed_lock/base.py b/superset/commands/distributed_lock/base.py index 3317887d8c8..55da69f7ebd 100644 --- a/superset/commands/distributed_lock/base.py +++ b/superset/commands/distributed_lock/base.py @@ -41,12 +41,12 @@ def get_default_lock_ttl() -> int: def get_redis_client() -> "redis.Redis[Any] | None": """ - Get Redis client from signal cache if available. + Get Redis client from distributed coordination if available. - Returns None if SIGNAL_CACHE_CONFIG is not configured, + Returns None if DISTRIBUTED_COORDINATION_CONFIG is not configured, allowing fallback to database-backed locking. """ - backend = cache_manager.signal_cache + backend = cache_manager.distributed_coordination return backend._cache if backend else None diff --git a/superset/commands/distributed_lock/release.py b/superset/commands/distributed_lock/release.py index 6d98f82aa4b..14f9deb4df5 100644 --- a/superset/commands/distributed_lock/release.py +++ b/superset/commands/distributed_lock/release.py @@ -40,7 +40,7 @@ class ReleaseDistributedLock(BaseDistributedLockCommand): """ Release a distributed lock with automatic backend selection. - Uses Redis DELETE when SIGNAL_CACHE_CONFIG is configured, + Uses Redis DELETE when DISTRIBUTED_COORDINATION_CONFIG is configured, otherwise deletes from KeyValue table. """ diff --git a/superset/config.py b/superset/config.py index dfeafb3b261..30f0f801227 100644 --- a/superset/config.py +++ b/superset/config.py @@ -2485,28 +2485,30 @@ TASK_ABORT_POLLING_DEFAULT_INTERVAL = 10 TASK_PROGRESS_UPDATE_THROTTLE_INTERVAL = 2 # seconds # --------------------------------------------------- -# Signal Cache Configuration +# Distributed Coordination Configuration # --------------------------------------------------- -# Shared Redis/Valkey configuration for signaling features that require -# Redis-specific primitives (pub/sub messaging, distributed locks). +# Shared Redis/Valkey backend for distributed coordination primitives. # # Uses Flask-Caching style configuration for consistency with other cache backends. # Set CACHE_TYPE to 'RedisCache' for standard Redis or 'RedisSentinelCache' for # Sentinel. # -# These features cannot use generic cache backends because they rely on: +# These features require Redis primitives unavailable in generic cache backends: # - Pub/Sub: Real-time message broadcasting between workers # - SET NX EX: Atomic lock acquisition with automatic expiration +# - Streams: Persistent ordered event logs (future) # # When configured, enables: # - Real-time abort/completion notifications for GTF tasks (vs database polling) # - Redis-based distributed locking (vs KeyValueDAO-backed DistributedLock) # -# Future: This cache will also be used by Global Async Queries, consolidating -# GLOBAL_ASYNC_QUERIES_CACHE_BACKEND into this unified configuration. +# Future: This backend will power a higher-level coordination service exposing +# standardized interfaces for distributed locks, pub/sub, and streams — consolidating +# all advanced Redis primitives under a single connection. Global Async Queries +# (GLOBAL_ASYNC_QUERIES_CACHE_BACKEND) will also be migrated to this configuration. # # Example with standard Redis: -# SIGNAL_CACHE_CONFIG: CacheConfig = { +# DISTRIBUTED_COORDINATION_CONFIG: CacheConfig = { # "CACHE_TYPE": "RedisCache", # "CACHE_REDIS_HOST": "localhost", # "CACHE_REDIS_PORT": 6379, @@ -2515,7 +2517,7 @@ TASK_PROGRESS_UPDATE_THROTTLE_INTERVAL = 2 # seconds # } # # Example with Redis Sentinel: -# SIGNAL_CACHE_CONFIG: CacheConfig = { +# DISTRIBUTED_COORDINATION_CONFIG: CacheConfig = { # "CACHE_TYPE": "RedisSentinelCache", # "CACHE_REDIS_SENTINELS": [("sentinel1", 26379), ("sentinel2", 26379)], # "CACHE_REDIS_SENTINEL_MASTER": "mymaster", @@ -2523,7 +2525,7 @@ TASK_PROGRESS_UPDATE_THROTTLE_INTERVAL = 2 # seconds # "CACHE_REDIS_DB": 0, # "CACHE_REDIS_PASSWORD": "", # } -SIGNAL_CACHE_CONFIG: CacheConfig | None = None +DISTRIBUTED_COORDINATION_CONFIG: CacheConfig | None = None # Default lock TTL (time-to-live) in seconds for distributed locks. # Can be overridden per-call via the `ttl_seconds` parameter. diff --git a/superset/distributed_lock/__init__.py b/superset/distributed_lock/__init__.py index 69ec3d82538..ab00143ba37 100644 --- a/superset/distributed_lock/__init__.py +++ b/superset/distributed_lock/__init__.py @@ -34,7 +34,7 @@ def DistributedLock( # noqa: N802 """ Distributed lock for coordinating operations across workers. - Automatically uses Redis-based locking when SIGNAL_CACHE_CONFIG is + Automatically uses Redis-based locking when DISTRIBUTED_COORDINATION_CONFIG is configured, falling back to database-backed locking otherwise. Redis locking uses SET NX EX for atomic acquisition with automatic expiration. diff --git a/superset/tasks/locks.py b/superset/tasks/locks.py index f6af3df13c7..73875925e3f 100644 --- a/superset/tasks/locks.py +++ b/superset/tasks/locks.py @@ -22,7 +22,7 @@ conditions during concurrent task creation, subscription, and cancellation. The lock key uses the task's dedup_key, ensuring all operations on the same logical task serialize correctly. -When SIGNAL_CACHE_CONFIG is configured, uses Redis SET NX EX for +When DISTRIBUTED_COORDINATION_CONFIG is configured, uses Redis SET NX EX for efficient single-command locking. Otherwise falls back to database-backed locking via DistributedLock. """ @@ -55,7 +55,7 @@ def task_lock(dedup_key: str) -> Iterator[None]: - Subscribe racing with cancel - Multiple concurrent cancel requests - When SIGNAL_CACHE_CONFIG is configured, uses Redis SET NX EX + When DISTRIBUTED_COORDINATION_CONFIG is configured, uses Redis SET NX EX for efficient single-command locking. Otherwise falls back to database-backed DistributedLock. diff --git a/superset/tasks/manager.py b/superset/tasks/manager.py index 21b28c7d42b..def7bf1e0c1 100644 --- a/superset/tasks/manager.py +++ b/superset/tasks/manager.py @@ -103,7 +103,7 @@ class TaskManager: 3. Handling deduplication (returning existing active task if duplicate) 4. Managing real-time abort notifications (optional) - Redis pub/sub is opt-in via SIGNAL_CACHE_CONFIG configuration. When not + Redis pub/sub is opt-in via DISTRIBUTED_COORDINATION_CONFIG configuration. When not configured, tasks use database polling for abort detection. """ @@ -134,11 +134,11 @@ class TaskManager: @classmethod def _get_cache(cls) -> RedisCacheBackend | RedisSentinelCacheBackend | None: """ - Get the signal cache backend. + Get the distributed coordination backend. - :returns: The signal cache backend, or None if not configured + :returns: The distributed coordination backend, or None if not configured """ - return cache_manager.signal_cache + return cache_manager.distributed_coordination @classmethod def is_pubsub_available(cls) -> bool: diff --git a/superset/utils/cache_manager.py b/superset/utils/cache_manager.py index 48ff0e11cd0..7ab54dead15 100644 --- a/superset/utils/cache_manager.py +++ b/superset/utils/cache_manager.py @@ -193,7 +193,9 @@ class CacheManager: self._thumbnail_cache = SupersetCache() self._filter_state_cache = SupersetCache() self._explore_form_data_cache = ExploreFormDataCache() - self._signal_cache: RedisCacheBackend | RedisSentinelCacheBackend | None = None + self._distributed_coordination: ( + RedisCacheBackend | RedisSentinelCacheBackend | None + ) = None @staticmethod def _init_cache( @@ -235,27 +237,29 @@ class CacheManager: "EXPLORE_FORM_DATA_CACHE_CONFIG", required=True, ) - self._init_signal_cache(app) + self._init_distributed_coordination(app) - def _init_signal_cache(self, app: Flask) -> None: - """Initialize the signal cache for pub/sub and distributed locks.""" + def _init_distributed_coordination(self, app: Flask) -> None: + """Initialize the distributed coordination backend (pub/sub, locks, streams).""" from superset.async_events.cache_backend import ( RedisCacheBackend, RedisSentinelCacheBackend, ) - config = app.config.get("SIGNAL_CACHE_CONFIG") + config = app.config.get("DISTRIBUTED_COORDINATION_CONFIG") if not config: return cache_type = config.get("CACHE_TYPE") if cache_type == "RedisCache": - self._signal_cache = RedisCacheBackend.from_config(config) + self._distributed_coordination = RedisCacheBackend.from_config(config) elif cache_type == "RedisSentinelCache": - self._signal_cache = RedisSentinelCacheBackend.from_config(config) + self._distributed_coordination = RedisSentinelCacheBackend.from_config( + config + ) else: logger.warning( - "Unsupported CACHE_TYPE for SIGNAL_CACHE_CONFIG: %s. " + "Unsupported CACHE_TYPE for DISTRIBUTED_COORDINATION_CONFIG: %s. " "Use 'RedisCache' or 'RedisSentinelCache'.", cache_type, ) @@ -281,13 +285,17 @@ class CacheManager: return self._explore_form_data_cache @property - def signal_cache( + def distributed_coordination( self, ) -> RedisCacheBackend | RedisSentinelCacheBackend | None: """ - Return the signal cache backend. + Return the distributed coordination backend for Redis-specific primitives. - Used for signaling features that require Redis-specific primitives: + This backend is the foundation for distributed coordination features including + pub/sub messaging, atomic distributed locking, and streams. A higher-level + service will eventually expose standardized interfaces on top of this backend. + + Coordination primitives currently backed by this: - Pub/Sub messaging for real-time abort/completion notifications - SET NX EX for atomic distributed lock acquisition @@ -296,6 +304,6 @@ class CacheManager: - `.key_prefix`: Configured key prefix (from CACHE_KEY_PREFIX) - `.default_timeout`: Default timeout in seconds (from CACHE_DEFAULT_TIMEOUT) - Returns None if SIGNAL_CACHE_CONFIG is not configured. + Returns None if DISTRIBUTED_COORDINATION_CONFIG is not configured. """ - return self._signal_cache + return self._distributed_coordination diff --git a/tests/integration_tests/tasks/test_sync_join_wait.py b/tests/integration_tests/tasks/test_sync_join_wait.py index 88702b6fce9..ff00958cf5e 100644 --- a/tests/integration_tests/tasks/test_sync_join_wait.py +++ b/tests/integration_tests/tasks/test_sync_join_wait.py @@ -89,9 +89,9 @@ def test_wait_for_completion_timeout(app_context, login_as, get_user) -> None: assert task.user_id == admin.id try: - # Force polling mode by mocking signal_cache as None + # Force polling mode by mocking distributed_coordination as None with patch("superset.tasks.manager.cache_manager") as mock_cache_manager: - mock_cache_manager.signal_cache = None + mock_cache_manager.distributed_coordination = None with pytest.raises(TimeoutError): TaskManager.wait_for_completion( task.uuid, diff --git a/tests/unit_tests/tasks/test_handlers.py b/tests/unit_tests/tasks/test_handlers.py index 1da4b4da93a..7c29a961a89 100644 --- a/tests/unit_tests/tasks/test_handlers.py +++ b/tests/unit_tests/tasks/test_handlers.py @@ -82,8 +82,8 @@ def task_context(mock_task, mock_task_dao, mock_update_command, mock_flask_app): patch("superset.tasks.context.current_app") as mock_current_app, patch("superset.tasks.manager.cache_manager") as mock_cache_manager, ): - # Disable Redis by making signal_cache return None - mock_cache_manager.signal_cache = None + # Disable Redis by making distributed_coordination return None + mock_cache_manager.distributed_coordination = None # Configure current_app mock mock_current_app.config = mock_flask_app.config diff --git a/tests/unit_tests/tasks/test_manager.py b/tests/unit_tests/tasks/test_manager.py index 4fc77c2e080..9f10c4da59b 100644 --- a/tests/unit_tests/tasks/test_manager.py +++ b/tests/unit_tests/tasks/test_manager.py @@ -123,13 +123,13 @@ class TestTaskManagerPubSub: @patch("superset.tasks.manager.cache_manager") def test_is_pubsub_available_no_redis(self, mock_cache_manager): """Test is_pubsub_available returns False when Redis not configured""" - mock_cache_manager.signal_cache = None + mock_cache_manager.distributed_coordination = None assert TaskManager.is_pubsub_available() is False @patch("superset.tasks.manager.cache_manager") def test_is_pubsub_available_with_redis(self, mock_cache_manager): """Test is_pubsub_available returns True when Redis is configured""" - mock_cache_manager.signal_cache = MagicMock() + mock_cache_manager.distributed_coordination = MagicMock() assert TaskManager.is_pubsub_available() is True def test_get_abort_channel(self): @@ -148,7 +148,7 @@ class TestTaskManagerPubSub: @patch("superset.tasks.manager.cache_manager") def test_publish_abort_no_redis(self, mock_cache_manager): """Test publish_abort returns False when Redis not available""" - mock_cache_manager.signal_cache = None + mock_cache_manager.distributed_coordination = None result = TaskManager.publish_abort("test-uuid") assert result is False @@ -157,7 +157,7 @@ class TestTaskManagerPubSub: """Test publish_abort publishes message successfully""" mock_redis = MagicMock() mock_redis.publish.return_value = 1 # One subscriber - mock_cache_manager.signal_cache = mock_redis + mock_cache_manager.distributed_coordination = mock_redis result = TaskManager.publish_abort("test-uuid") @@ -169,7 +169,7 @@ class TestTaskManagerPubSub: """Test publish_abort handles Redis errors gracefully""" mock_redis = MagicMock() mock_redis.publish.side_effect = redis.RedisError("Connection lost") - mock_cache_manager.signal_cache = mock_redis + mock_cache_manager.distributed_coordination = mock_redis result = TaskManager.publish_abort("test-uuid") @@ -194,7 +194,7 @@ class TestTaskManagerListenForAbort: @patch("superset.tasks.manager.cache_manager") def test_listen_for_abort_no_redis_uses_polling(self, mock_cache_manager): """Test listen_for_abort falls back to polling when Redis unavailable""" - mock_cache_manager.signal_cache = None + mock_cache_manager.distributed_coordination = None callback = MagicMock() with patch.object(TaskManager, "_poll_for_abort", return_value=None): @@ -218,7 +218,7 @@ class TestTaskManagerListenForAbort: mock_redis = MagicMock() mock_pubsub = MagicMock() mock_redis.pubsub.return_value = mock_pubsub - mock_cache_manager.signal_cache = mock_redis + mock_cache_manager.distributed_coordination = mock_redis callback = MagicMock() @@ -245,7 +245,7 @@ class TestTaskManagerListenForAbort: mock_redis = MagicMock() mock_redis.pubsub.side_effect = redis.RedisError("Connection failed") - mock_cache_manager.signal_cache = mock_redis + mock_cache_manager.distributed_coordination = mock_redis callback = MagicMock() @@ -290,7 +290,7 @@ class TestTaskManagerCompletion: @patch("superset.tasks.manager.cache_manager") def test_publish_completion_no_redis(self, mock_cache_manager): """Test publish_completion returns False when Redis not available""" - mock_cache_manager.signal_cache = None + mock_cache_manager.distributed_coordination = None result = TaskManager.publish_completion("test-uuid", "success") assert result is False @@ -299,7 +299,7 @@ class TestTaskManagerCompletion: """Test publish_completion publishes message successfully""" mock_redis = MagicMock() mock_redis.publish.return_value = 1 # One subscriber - mock_cache_manager.signal_cache = mock_redis + mock_cache_manager.distributed_coordination = mock_redis result = TaskManager.publish_completion("test-uuid", "success") @@ -311,7 +311,7 @@ class TestTaskManagerCompletion: """Test publish_completion handles Redis errors gracefully""" mock_redis = MagicMock() mock_redis.publish.side_effect = redis.RedisError("Connection lost") - mock_cache_manager.signal_cache = mock_redis + mock_cache_manager.distributed_coordination = mock_redis result = TaskManager.publish_completion("test-uuid", "success") @@ -323,7 +323,7 @@ class TestTaskManagerCompletion: """Test wait_for_completion raises ValueError for missing task""" import pytest - mock_cache_manager.signal_cache = None + mock_cache_manager.distributed_coordination = None mock_dao.find_one_or_none.return_value = None with pytest.raises(ValueError, match="not found"): @@ -333,7 +333,7 @@ class TestTaskManagerCompletion: @patch("superset.daos.tasks.TaskDAO") def test_wait_for_completion_already_complete(self, mock_dao, mock_cache_manager): """Test wait_for_completion returns immediately for terminal state""" - mock_cache_manager.signal_cache = None + mock_cache_manager.distributed_coordination = None mock_task = MagicMock() mock_task.uuid = "test-uuid" mock_task.status = "success" @@ -351,7 +351,7 @@ class TestTaskManagerCompletion: """Test wait_for_completion raises TimeoutError when timeout expires""" import pytest - mock_cache_manager.signal_cache = None + mock_cache_manager.distributed_coordination = None mock_task = MagicMock() mock_task.uuid = "test-uuid" mock_task.status = "in_progress" # Never completes @@ -364,7 +364,7 @@ class TestTaskManagerCompletion: @patch("superset.daos.tasks.TaskDAO") def test_wait_for_completion_polling_success(self, mock_dao, mock_cache_manager): """Test wait_for_completion returns when task completes via polling""" - mock_cache_manager.signal_cache = None + mock_cache_manager.distributed_coordination = None mock_task_pending = MagicMock() mock_task_pending.uuid = "test-uuid" mock_task_pending.status = "pending" @@ -414,7 +414,7 @@ class TestTaskManagerCompletion: "data": "success", } mock_redis.pubsub.return_value = mock_pubsub - mock_cache_manager.signal_cache = mock_redis + mock_cache_manager.distributed_coordination = mock_redis result = TaskManager.wait_for_completion( "test-uuid", @@ -446,7 +446,7 @@ class TestTaskManagerCompletion: # Set up mock Redis that fails mock_redis = MagicMock() mock_redis.pubsub.side_effect = redis.RedisError("Connection failed") - mock_cache_manager.signal_cache = mock_redis + mock_cache_manager.distributed_coordination = mock_redis # With fail-fast behavior, Redis error is raised instead of falling back with pytest.raises(redis.RedisError, match="Connection failed"): diff --git a/tests/unit_tests/tasks/test_timeout.py b/tests/unit_tests/tasks/test_timeout.py index ef8d5f9d761..002166fc370 100644 --- a/tests/unit_tests/tasks/test_timeout.py +++ b/tests/unit_tests/tasks/test_timeout.py @@ -89,8 +89,8 @@ def task_context_for_timeout(mock_flask_app, mock_task_abortable): patch("superset.daos.tasks.TaskDAO") as mock_dao, patch("superset.tasks.manager.cache_manager") as mock_cache_manager, ): - # Disable Redis by making signal_cache return None - mock_cache_manager.signal_cache = None + # Disable Redis by making distributed_coordination return None + mock_cache_manager.distributed_coordination = None # Configure current_app mock mock_current_app.config = mock_flask_app.config @@ -277,8 +277,8 @@ class TestTimeoutTrigger: ) as mock_update_cmd, patch("superset.tasks.manager.cache_manager") as mock_cache_manager, ): - # Disable Redis by making signal_cache return None - mock_cache_manager.signal_cache = None + # Disable Redis by making distributed_coordination return None + mock_cache_manager.distributed_coordination = None mock_current_app.config = mock_flask_app.config mock_current_app._get_current_object.return_value = mock_flask_app @@ -323,8 +323,8 @@ class TestTimeoutTrigger: patch("superset.tasks.context.logger") as mock_logger, patch("superset.tasks.manager.cache_manager") as mock_cache_manager, ): - # Disable Redis by making signal_cache return None - mock_cache_manager.signal_cache = None + # Disable Redis by making distributed_coordination return None + mock_cache_manager.distributed_coordination = None mock_current_app.config = mock_flask_app.config mock_current_app._get_current_object.return_value = mock_flask_app @@ -363,8 +363,8 @@ class TestTimeoutTrigger: patch("superset.commands.tasks.update.UpdateTaskCommand"), patch("superset.tasks.manager.cache_manager") as mock_cache_manager, ): - # Disable Redis by making signal_cache return None - mock_cache_manager.signal_cache = None + # Disable Redis by making distributed_coordination return None + mock_cache_manager.distributed_coordination = None mock_current_app.config = mock_flask_app.config mock_current_app._get_current_object.return_value = mock_flask_app @@ -469,8 +469,8 @@ class TestTimeoutTerminalState: patch("superset.commands.tasks.update.UpdateTaskCommand"), patch("superset.tasks.manager.cache_manager") as mock_cache_manager, ): - # Disable Redis by making signal_cache return None - mock_cache_manager.signal_cache = None + # Disable Redis by making distributed_coordination return None + mock_cache_manager.distributed_coordination = None mock_current_app.config = mock_flask_app.config mock_current_app._get_current_object.return_value = mock_flask_app @@ -510,8 +510,8 @@ class TestTimeoutTerminalState: patch("superset.commands.tasks.update.UpdateTaskCommand"), patch("superset.tasks.manager.cache_manager") as mock_cache_manager, ): - # Disable Redis by making signal_cache return None - mock_cache_manager.signal_cache = None + # Disable Redis by making distributed_coordination return None + mock_cache_manager.distributed_coordination = None mock_current_app.config = mock_flask_app.config mock_current_app._get_current_object.return_value = mock_flask_app @@ -547,8 +547,8 @@ class TestTimeoutTerminalState: patch("superset.commands.tasks.update.UpdateTaskCommand"), patch("superset.tasks.manager.cache_manager") as mock_cache_manager, ): - # Disable Redis by making signal_cache return None - mock_cache_manager.signal_cache = None + # Disable Redis by making distributed_coordination return None + mock_cache_manager.distributed_coordination = None mock_current_app.config = mock_flask_app.config mock_current_app._get_current_object.return_value = mock_flask_app @@ -584,8 +584,8 @@ class TestTimeoutTerminalState: patch("superset.commands.tasks.update.UpdateTaskCommand"), patch("superset.tasks.manager.cache_manager") as mock_cache_manager, ): - # Disable Redis by making signal_cache return None - mock_cache_manager.signal_cache = None + # Disable Redis by making distributed_coordination return None + mock_cache_manager.distributed_coordination = None mock_current_app.config = mock_flask_app.config mock_current_app._get_current_object.return_value = mock_flask_app