diff --git a/superset/mcp_service/auth.py b/superset/mcp_service/auth.py index 2ffc94ebe44..326254acb2e 100644 --- a/superset/mcp_service/auth.py +++ b/superset/mcp_service/auth.py @@ -127,14 +127,22 @@ def has_dataset_access(dataset: "SqlaTable") -> bool: return False # Deny access on error -def _setup_user_context() -> User: +def _setup_user_context() -> User | None: """ Set up user context for MCP tool execution. Returns: - User object with roles and groups loaded + User object with roles and groups loaded, or None if no Flask context """ - user = get_user_from_request() + try: + user = get_user_from_request() + except RuntimeError as e: + # No Flask application context (e.g., prompts before middleware runs) + # This is expected for some FastMCP operations - return None gracefully + if "application context" in str(e): + logger.debug("No Flask app context available for user setup") + return None + raise # Validate user has necessary relationships loaded # (Force access to ensure they're loaded if lazy) @@ -212,6 +220,15 @@ def mcp_auth_hook(tool_func: F) -> F: # noqa: C901 with _get_app_context_manager(): user = _setup_user_context() + # No Flask context - this is a FastMCP internal operation + # (e.g., tool discovery, prompt listing) that doesn't require auth + if user is None: + logger.debug( + "MCP internal call without Flask context: tool=%s", + tool_func.__name__, + ) + return await tool_func(*args, **kwargs) + try: logger.debug( "MCP tool call: user=%s, tool=%s", @@ -235,6 +252,15 @@ def mcp_auth_hook(tool_func: F) -> F: # noqa: C901 with _get_app_context_manager(): user = _setup_user_context() + # No Flask context - this is a FastMCP internal operation + # (e.g., tool discovery, prompt listing) that doesn't require auth + if user is None: + logger.debug( + "MCP internal call without Flask context: tool=%s", + tool_func.__name__, + ) + return tool_func(*args, **kwargs) + try: logger.debug( "MCP tool call: user=%s, tool=%s", diff --git a/superset/mcp_service/mcp_config.py b/superset/mcp_service/mcp_config.py index c8e9435eac7..678ce78941f 100644 --- a/superset/mcp_service/mcp_config.py +++ b/superset/mcp_service/mcp_config.py @@ -126,12 +126,22 @@ MCP_FACTORY_CONFIG = { # ============================================================================= # MCP Store Configuration - shared Redis infrastructure for all MCP storage needs -# (caching, auth, events, etc.). Only used when a consumer explicitly requests it. +# (caching, auth, events, session state, etc.). +# +# When CACHE_REDIS_URL is set: +# - Response caching uses Redis (if MCP_CACHE_CONFIG enabled) +# - EventStore uses Redis for multi-pod session management +# +# For multi-pod/Kubernetes deployments, setting CACHE_REDIS_URL automatically +# enables Redis-backed EventStore to share session state across pods. MCP_STORE_CONFIG: Dict[str, Any] = { "enabled": False, # Disabled by default - caching uses in-memory store "CACHE_REDIS_URL": None, # Redis URL, e.g., "redis://localhost:6379/0" # Wrapper class that prefixes all keys. Each consumer provides their own prefix. "WRAPPER_TYPE": "key_value.aio.wrappers.prefix_keys.PrefixKeysWrapper", + # EventStore settings (for multi-pod session management) + "event_store_max_events": 100, # Keep last 100 events per session + "event_store_ttl": 3600, # Events expire after 1 hour } # MCP Response Caching Configuration - controls caching behavior and TTLs diff --git a/superset/mcp_service/server.py b/superset/mcp_service/server.py index 8ca289446cb..c159c6cb3b2 100644 --- a/superset/mcp_service/server.py +++ b/superset/mcp_service/server.py @@ -17,13 +17,23 @@ """ MCP server for Apache Superset + +Supports both single-pod (in-memory) and multi-pod (Redis) deployments. +For multi-pod deployments, configure MCP_EVENT_STORE_CONFIG with Redis URL. """ import logging import os +from typing import Any + +import uvicorn from superset.mcp_service.app import create_mcp_app, init_fastmcp_server -from superset.mcp_service.mcp_config import get_mcp_factory_config +from superset.mcp_service.mcp_config import ( + get_mcp_factory_config, + MCP_STORE_CONFIG, +) +from superset.mcp_service.storage import _create_redis_store def configure_logging(debug: bool = False) -> None: @@ -55,21 +65,83 @@ def configure_logging(debug: bool = False) -> None: logging.info("🔍 SQL Debug logging enabled") +def create_event_store(config: dict[str, Any] | None = None) -> Any | None: + """ + Create an EventStore for MCP session management. + + For multi-pod deployments, uses Redis-backed storage to share session state + across pods. For single-pod deployments, returns None (uses in-memory). + + Args: + config: Optional config dict. If None, reads from MCP_STORE_CONFIG. + + Returns: + EventStore instance if Redis URL is configured, None otherwise. + """ + if config is None: + config = MCP_STORE_CONFIG + + redis_url = config.get("CACHE_REDIS_URL") + if not redis_url: + logging.info("EventStore: Using in-memory storage (single-pod mode)") + return None + + try: + from fastmcp.server.event_store import EventStore + + # Get prefix from config (allows Preset to customize for multi-tenancy) + # Default prefix prevents key collisions in shared Redis environments + prefix = config.get("event_store_prefix", "mcp_events_") + + # Create wrapped Redis store with prefix for key namespacing + redis_store = _create_redis_store(config, prefix=prefix, wrap=True) + if redis_store is None: + logging.warning("Failed to create Redis store, falling back to in-memory") + return None + + # Create EventStore with Redis backend + event_store = EventStore( + storage=redis_store, + max_events_per_stream=config.get("event_store_max_events", 100), + ttl=config.get("event_store_ttl", 3600), + ) + + logging.info("EventStore: Using Redis storage (multi-pod mode)") + return event_store + + except ImportError as e: + logging.error( + "Failed to import EventStore dependencies: %s. " + "Ensure fastmcp package is installed.", + e, + ) + return None + except Exception as e: + logging.error("Failed to create Redis EventStore: %s", e) + return None + + def run_server( host: str = "127.0.0.1", port: int = 5008, debug: bool = False, use_factory_config: bool = False, + event_store_config: dict[str, Any] | None = None, ) -> None: """ Run the MCP service server with FastMCP endpoints. Uses streamable-http transport for HTTP server mode. + For multi-pod deployments, configure MCP_EVENT_STORE_CONFIG with Redis URL + to share session state across pods. + Args: host: Host to bind to port: Port to bind to debug: Enable debug logging use_factory_config: Use configuration from get_mcp_factory_config() + event_store_config: Optional EventStore configuration dict. + If None, reads from MCP_EVENT_STORE_CONFIG. """ configure_logging(debug) @@ -113,14 +185,33 @@ def run_server( middleware=middleware_list or None, ) + # Create EventStore for session management (Redis for multi-pod, None for in-memory) + event_store = create_event_store(event_store_config) + env_key = f"FASTMCP_RUNNING_{port}" if not os.environ.get(env_key): os.environ[env_key] = "1" try: logging.info("Starting FastMCP on %s:%s", host, port) - mcp_instance.run( - transport="streamable-http", host=host, port=port, stateless_http=True - ) + + if event_store is not None: + # Multi-pod: Use http_app with Redis EventStore, run with uvicorn + logging.info("Running in multi-pod mode with Redis EventStore") + app = mcp_instance.http_app( + transport="streamable-http", + event_store=event_store, + stateless_http=True, + ) + uvicorn.run(app, host=host, port=port) + else: + # Single-pod mode: Use built-in run() with in-memory sessions + logging.info("Running in single-pod mode with in-memory sessions") + mcp_instance.run( + transport="streamable-http", + host=host, + port=port, + stateless_http=True, + ) except Exception as e: logging.error("FastMCP failed: %s", e) os.environ.pop(env_key, None) diff --git a/superset/mcp_service/storage.py b/superset/mcp_service/storage.py index 47d19b0eb7a..b4c06a2758a 100644 --- a/superset/mcp_service/storage.py +++ b/superset/mcp_service/storage.py @@ -27,6 +27,9 @@ Reusable across caching middleware, OAuth providers, EventStore, etc. import logging from importlib import import_module from typing import Any, Callable, Dict +from urllib.parse import urlparse + +from redis.asyncio import Redis logger = logging.getLogger(__name__) @@ -82,17 +85,21 @@ def get_mcp_store( def _create_redis_store( store_config: Dict[str, Any], - prefix: str | Callable[[], str], + prefix: str | Callable[[], str] | None = None, + wrap: bool = True, ) -> Any | None: """ - Create a RedisStore with the given prefix. + Create a RedisStore, optionally wrapped with a prefix. + + Handles SSL/TLS connections (rediss:// scheme) for cloud deployments. Args: store_config: MCP_STORE_CONFIG dict (Redis URL, wrapper type) - prefix: Feature-specific prefix + prefix: Feature-specific prefix (required if wrap=True) + wrap: If True, wrap store with prefix. If False, return raw RedisStore. Returns: - Wrapped RedisStore instance or None if not configured + RedisStore instance (wrapped or raw) or None if not configured """ redis_url = store_config.get("CACHE_REDIS_URL") if not redis_url: @@ -109,15 +116,67 @@ def _create_redis_store( return None try: + # Parse URL to handle SSL properly + parsed = urlparse(redis_url) + use_ssl = parsed.scheme == "rediss" + + # RedisStore doesn't handle SSL from URL - it parses URL manually + # and ignores the scheme. We must create the Redis client ourselves. + + db = 0 + if parsed.path and parsed.path != "/": + try: + db = int(parsed.path.strip("/")) + except ValueError: + db = 0 + + redis_client: Redis[str] + if use_ssl: + # For ElastiCache with self-signed certs, disable cert verification. + # NOTE: ssl_cert_reqs="none" disables certificate verification. + # Do not use Python None - that would default to CERT_REQUIRED. + redis_client = Redis( + host=parsed.hostname or "localhost", + port=parsed.port or 6379, + db=db, + username=parsed.username, # Support Redis 6+ ACLs + password=parsed.password, + decode_responses=True, + ssl=True, + ssl_cert_reqs="none", + ) + logger.info("Created async Redis client with SSL at %s", parsed.hostname) + else: + redis_client = Redis( + host=parsed.hostname or "localhost", + port=parsed.port or 6379, + db=db, + username=parsed.username, # Support Redis 6+ ACLs + password=parsed.password, + decode_responses=True, + ) + logger.info("Created async Redis client at %s", parsed.hostname) + + # Pass pre-configured client to RedisStore + redis_store = RedisStore(client=redis_client) + + # Return raw store if wrapping not requested + if not wrap: + return redis_store + + # Wrap with prefix + if prefix is None: + logger.error("prefix is required when wrap=True") + return None + wrapper_type = store_config.get("WRAPPER_TYPE") if not wrapper_type: logger.error("MCP store WRAPPER_TYPE not configured") return None wrapper_class = _import_wrapper_class(wrapper_type) - redis_store = RedisStore(url=redis_url) store = wrapper_class(key_value=redis_store, prefix=prefix) - logger.info("✅ MCP RedisStore created") + logger.info("Created wrapped MCP RedisStore") return store except Exception as e: logger.error("Failed to create MCP store: %s", e) diff --git a/tests/unit_tests/mcp_service/test_mcp_server.py b/tests/unit_tests/mcp_service/test_mcp_server.py new file mode 100644 index 00000000000..0d7922a296a --- /dev/null +++ b/tests/unit_tests/mcp_service/test_mcp_server.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tests for MCP server EventStore creation.""" + +from unittest.mock import MagicMock, patch + + +def test_create_event_store_returns_none_when_no_redis_url(): + """EventStore returns None when no Redis URL configured (single-pod mode).""" + config = {"CACHE_REDIS_URL": None} + + from superset.mcp_service.server import create_event_store + + result = create_event_store(config) + + assert result is None + + +def test_create_event_store_returns_none_when_empty_config(): + """EventStore returns None when config has no CACHE_REDIS_URL.""" + config = {} + + from superset.mcp_service.server import create_event_store + + result = create_event_store(config) + + assert result is None + + +def test_create_event_store_creates_event_store_with_redis(): + """EventStore is created with Redis backend when URL is configured.""" + config = { + "CACHE_REDIS_URL": "redis://localhost:6379/0", + "event_store_max_events": 50, + "event_store_ttl": 1800, + } + + mock_redis_store = MagicMock() + mock_event_store = MagicMock() + + with patch( + "superset.mcp_service.server._create_redis_store", + return_value=mock_redis_store, + ) as mock_create_store: + with patch( + "fastmcp.server.event_store.EventStore", + return_value=mock_event_store, + ) as mock_event_store_class: + from superset.mcp_service.server import create_event_store + + result = create_event_store(config) + + # Verify EventStore was created + assert result is mock_event_store + # Verify _create_redis_store was called with prefix wrapper + mock_create_store.assert_called_once_with( + config, prefix="mcp_events_", wrap=True + ) + # Verify EventStore was initialized with correct params + mock_event_store_class.assert_called_once_with( + storage=mock_redis_store, + max_events_per_stream=50, + ttl=1800, + ) + + +def test_create_event_store_uses_default_config_values(): + """EventStore uses default values when not specified in config.""" + config = { + "CACHE_REDIS_URL": "redis://localhost:6379/0", + } + + mock_redis_store = MagicMock() + mock_event_store = MagicMock() + + with patch( + "superset.mcp_service.server._create_redis_store", + return_value=mock_redis_store, + ): + with patch( + "fastmcp.server.event_store.EventStore", + return_value=mock_event_store, + ) as mock_event_store_class: + from superset.mcp_service.server import create_event_store + + result = create_event_store(config) + + assert result is mock_event_store + # Verify defaults are used + mock_event_store_class.assert_called_once_with( + storage=mock_redis_store, + max_events_per_stream=100, # default + ttl=3600, # default + ) + + +def test_create_event_store_returns_none_when_redis_store_fails(): + """EventStore returns None when Redis store creation fails.""" + config = { + "CACHE_REDIS_URL": "redis://localhost:6379/0", + } + + with patch( + "superset.mcp_service.server._create_redis_store", + return_value=None, # Simulates Redis store creation failure + ): + from superset.mcp_service.server import create_event_store + + result = create_event_store(config) + + assert result is None diff --git a/tests/unit_tests/mcp_service/test_mcp_storage.py b/tests/unit_tests/mcp_service/test_mcp_storage.py index b61e99e62bd..c7203619800 100644 --- a/tests/unit_tests/mcp_service/test_mcp_storage.py +++ b/tests/unit_tests/mcp_service/test_mcp_storage.py @@ -68,6 +68,7 @@ def test_get_mcp_store_creates_store_when_enabled(): } mock_redis_store = MagicMock() + mock_redis_client = MagicMock() mock_wrapper_instance = MagicMock() mock_wrapper_class = MagicMock(return_value=mock_wrapper_instance) @@ -84,13 +85,190 @@ def test_get_mcp_store_creates_store_when_enabled(): "key_value.aio.stores.redis.RedisStore", return_value=mock_redis_store, ): - from superset.mcp_service.storage import get_mcp_store + with patch( + "superset.mcp_service.storage.Redis", + return_value=mock_redis_client, + ): + from superset.mcp_service.storage import get_mcp_store - result = get_mcp_store(prefix="test_prefix_") + result = get_mcp_store(prefix="test_prefix_") - # Verify store was created - assert result is mock_wrapper_instance - # Verify wrapper was called with correct args - mock_wrapper_class.assert_called_once_with( - key_value=mock_redis_store, prefix="test_prefix_" - ) + # Verify store was created + assert result is mock_wrapper_instance + # Verify wrapper was called with correct args + mock_wrapper_class.assert_called_once_with( + key_value=mock_redis_store, prefix="test_prefix_" + ) + + +def test_create_redis_store_wrap_false_returns_raw_store(): + """_create_redis_store with wrap=False returns unwrapped RedisStore.""" + store_config = { + "CACHE_REDIS_URL": "redis://localhost:6379/0", + "WRAPPER_TYPE": "key_value.aio.wrappers.prefix_keys.PrefixKeysWrapper", + } + + mock_redis_store = MagicMock() + mock_redis_client = MagicMock() + + with patch( + "key_value.aio.stores.redis.RedisStore", + return_value=mock_redis_store, + ) as mock_redis_store_class: + with patch( + "superset.mcp_service.storage.Redis", + return_value=mock_redis_client, + ) as mock_redis_class: + from superset.mcp_service.storage import _create_redis_store + + result = _create_redis_store(store_config, wrap=False) + + # Verify raw store is returned (not wrapped) + assert result is mock_redis_store + # Verify Redis client was created with correct params + mock_redis_class.assert_called_once() + call_kwargs = mock_redis_class.call_args[1] + assert call_kwargs["host"] == "localhost" + assert call_kwargs["port"] == 6379 + # Verify RedisStore was called with the client + mock_redis_store_class.assert_called_once_with(client=mock_redis_client) + + +def test_create_redis_store_wrap_true_requires_prefix(): + """_create_redis_store with wrap=True requires prefix parameter.""" + store_config = { + "CACHE_REDIS_URL": "redis://localhost:6379/0", + "WRAPPER_TYPE": "key_value.aio.wrappers.prefix_keys.PrefixKeysWrapper", + } + + mock_redis_store = MagicMock() + + with patch( + "key_value.aio.stores.redis.RedisStore", + return_value=mock_redis_store, + ): + from superset.mcp_service.storage import _create_redis_store + + # wrap=True (default) with no prefix should return None + result = _create_redis_store(store_config, prefix=None, wrap=True) + + assert result is None + + +def test_create_redis_store_handles_ssl_url(): + """_create_redis_store handles rediss:// URLs with SSL configuration.""" + store_config = { + "CACHE_REDIS_URL": "rediss://:password@redis.example.com:6380/1", + "WRAPPER_TYPE": "key_value.aio.wrappers.prefix_keys.PrefixKeysWrapper", + } + + mock_redis_store = MagicMock() + mock_redis_client = MagicMock() + + with patch( + "key_value.aio.stores.redis.RedisStore", + return_value=mock_redis_store, + ): + with patch( + "superset.mcp_service.storage.Redis", + return_value=mock_redis_client, + ) as mock_redis_class: + from superset.mcp_service.storage import _create_redis_store + + result = _create_redis_store(store_config, wrap=False) + + # Verify store was created + assert result is mock_redis_store + # Verify Redis client was created with SSL params + call_kwargs = mock_redis_class.call_args[1] + assert call_kwargs["ssl"] is True + assert call_kwargs["ssl_cert_reqs"] == "none" + assert call_kwargs["host"] == "redis.example.com" + assert call_kwargs["port"] == 6380 + assert call_kwargs["db"] == 1 + + +def test_create_redis_store_non_ssl_url_no_ssl_param(): + """_create_redis_store with redis:// URL doesn't pass SSL params.""" + store_config = { + "CACHE_REDIS_URL": "redis://localhost:6379/0", + } + + mock_redis_store = MagicMock() + mock_redis_client = MagicMock() + + with patch( + "key_value.aio.stores.redis.RedisStore", + return_value=mock_redis_store, + ): + with patch( + "superset.mcp_service.storage.Redis", + return_value=mock_redis_client, + ) as mock_redis_class: + from superset.mcp_service.storage import _create_redis_store + + result = _create_redis_store(store_config, wrap=False) + + assert result is mock_redis_store + # Verify SSL params were NOT passed for non-SSL URL + call_kwargs = mock_redis_class.call_args[1] + assert "ssl" not in call_kwargs + assert "ssl_cert_reqs" not in call_kwargs + + +def test_create_redis_store_handles_url_with_username_and_password(): + """_create_redis_store properly handles URL with username and password.""" + test_password = "mypassword" # noqa: S105 + store_config = { + "CACHE_REDIS_URL": f"redis://myuser:{test_password}@redis.example.com:6379/0", + } + + mock_redis_store = MagicMock() + mock_redis_client = MagicMock() + + with patch( + "key_value.aio.stores.redis.RedisStore", + return_value=mock_redis_store, + ): + with patch( + "superset.mcp_service.storage.Redis", + return_value=mock_redis_client, + ) as mock_redis_class: + from superset.mcp_service.storage import _create_redis_store + + result = _create_redis_store(store_config, wrap=False) + + assert result is mock_redis_store + # Verify Redis client was created with password from URL + call_kwargs = mock_redis_class.call_args[1] + assert call_kwargs["host"] == "redis.example.com" + assert call_kwargs["password"] == test_password + + +def test_create_redis_store_handles_url_with_only_username(): + """_create_redis_store handles URL with username but no password.""" + store_config = { + "CACHE_REDIS_URL": "redis://myuser@redis.example.com:6379/0", + } + + mock_redis_store = MagicMock() + mock_redis_client = MagicMock() + + with patch( + "key_value.aio.stores.redis.RedisStore", + return_value=mock_redis_store, + ): + with patch( + "superset.mcp_service.storage.Redis", + return_value=mock_redis_client, + ) as mock_redis_class: + from superset.mcp_service.storage import _create_redis_store + + result = _create_redis_store(store_config, wrap=False) + + assert result is mock_redis_store + # Verify Redis client was created with correct params + call_kwargs = mock_redis_class.call_args[1] + assert call_kwargs["host"] == "redis.example.com" + # No password in URL means password should be None + assert call_kwargs["password"] is None