fix(mcp): hide write tools from users without write permissions

Phase 1: MCPPermissionDeniedError falls through to GlobalErrorHandlerMiddleware's
generic "Internal error" branch (500-style response) because it doesn't subclass
PermissionError. Fixed by adding it to _USER_ERROR_TYPES and an explicit elif
branch in _handle_error() that converts it to a clean ToolError.

Phase 2: Add RBACToolVisibilityMiddleware that intercepts tools/list and removes
tools the calling user lacks permission to execute. Add
is_tool_visible_to_current_user() to auth.py as the single source of truth for
tool visibility, shared by both the new middleware and the existing tool-search
transform. Register the middleware inside StructuredContentStripperMiddleware so
it filters full tool objects before outputSchema stripping. Fail open: if user
resolution fails, all tools are returned (call-time RBAC still enforces).

Also update server instructions to note write tools require write permissions.
This commit is contained in:
Amin Ghadersohi
2026-05-13 20:39:56 +00:00
parent ac5e8f1308
commit 2ccb099450
6 changed files with 380 additions and 28 deletions

View File

@@ -355,6 +355,10 @@ Input format:
{_feature_availability}Permission Awareness:
{_instance_info_role_bullet}- ALWAYS check the user's roles BEFORE suggesting write operations (creating datasets,
charts, dashboards, or running SQL).
- Write tools (generate_chart, generate_dashboard, update_chart, create_virtual_dataset,
save_sql_query, add_chart_to_existing_dashboard, update_chart_preview) require write
permissions. These tools are only listed for users who have the necessary access.
If a write tool does not appear in the tool list, the current user lacks write access.
- Do NOT disclose dashboard access lists, dashboard owners, chart owners, dataset
owners, workspace admins, or other users' names, usernames, email addresses,
contact details, roles, admin status, ownership, or access-list information.

View File

@@ -145,6 +145,56 @@ def check_tool_permission(func: Callable[..., Any]) -> bool:
return False
def is_tool_visible_to_current_user(tool: Any) -> bool:
"""Return whether the current user can see a tool in tools/list.
Checks both RBAC permissions and data-model metadata privacy. The caller
must set ``g.user`` before calling this function.
This is the single source of truth for tool visibility — called from both
``RBACToolVisibilityMiddleware`` (``tools/list``) and
``_tool_allowed_for_current_user()`` (tool search).
Args:
tool: A FastMCP Tool object.
Returns:
True if the tool is visible to the current user, False otherwise.
"""
try:
from flask import current_app
if not current_app.config.get("MCP_RBAC_ENABLED", True):
return True
tool_func = getattr(tool, "fn", None)
if tool_func is None:
return True
from superset.mcp_service.privacy import (
tool_requires_data_model_metadata_access,
user_can_view_data_model_metadata,
)
if (
tool_requires_data_model_metadata_access(tool_func)
and not user_can_view_data_model_metadata()
):
return False
class_permission_name = getattr(tool_func, CLASS_PERMISSION_ATTR, None)
if not class_permission_name:
return True
return check_tool_permission(tool_func)
except (AttributeError, RuntimeError, ValueError):
logger.debug(
"Could not evaluate tool visibility for current user", exc_info=True
)
return False
def load_user_with_relationships(
username: str | None = None, email: str | None = None
) -> User | None:

View File

@@ -37,6 +37,7 @@ from superset.commands.exceptions import (
)
from superset.exceptions import SupersetException, SupersetSecurityException
from superset.extensions import event_logger
from superset.mcp_service.auth import MCPPermissionDeniedError
from superset.mcp_service.constants import (
DEFAULT_TOKEN_LIMIT,
DEFAULT_WARN_THRESHOLD_PCT,
@@ -129,6 +130,7 @@ _USER_ERROR_TYPES = (
ToolError,
ValidationError,
PermissionError,
MCPPermissionDeniedError,
ValueError,
FileNotFoundError,
CommandInvalidError,
@@ -399,6 +401,46 @@ class StructuredContentStripperMiddleware(Middleware):
return result
class RBACToolVisibilityMiddleware(Middleware):
"""Filter tools/list response based on current user's RBAC permissions.
Intercepts every ``tools/list`` request and removes tools the calling user
is not permitted to execute. Public tools (no ``class_permission_name``) and
tools whose permission check passes are included; all others are hidden.
Fails open: if user resolution fails (no auth header, bad credentials) all
tools are returned. Call-time RBAC still enforces permissions — this
middleware only improves the UX by hiding inaccessible tools from the LLM.
"""
async def on_list_tools(
self,
context: MiddlewareContext[mt.ListToolsRequest],
call_next: CallNext[mt.ListToolsRequest, list[Tool]],
) -> list[Tool]:
tools = await call_next(context)
try:
from flask import g
from superset.mcp_service.auth import (
_setup_user_context,
is_tool_visible_to_current_user,
)
from superset.mcp_service.flask_singleton import get_flask_app
with get_flask_app().app_context():
user = _setup_user_context()
if user is None:
return tools # no auth context — fail open
g.user = user
return [t for t in tools if is_tool_visible_to_current_user(t)]
except Exception: # noqa: BLE001
# Invalid credentials / setup failure — fail open
# (call-time RBAC still enforces)
return tools
class GlobalErrorHandlerMiddleware(Middleware):
"""
Global error handler middleware that provides consistent error responses
@@ -507,6 +549,9 @@ class GlobalErrorHandlerMiddleware(Middleware):
raise ToolError(
f"Invalid request for {tool_name}: {_sanitize_error_for_logging(error)}"
) from error
elif isinstance(error, MCPPermissionDeniedError):
# MCP RBAC permission denied — convert to structured ToolError
raise ToolError(str(error)) from error
elif isinstance(error, (ForbiddenError, SupersetSecurityException)):
# Superset access denied — agent tried a tool it can't use
raise ToolError(

View File

@@ -41,12 +41,9 @@ from superset.mcp_service.middleware import (
create_response_size_guard_middleware,
GlobalErrorHandlerMiddleware,
LoggingMiddleware,
RBACToolVisibilityMiddleware,
StructuredContentStripperMiddleware,
)
from superset.mcp_service.privacy import (
tool_requires_data_model_metadata_access,
user_can_view_data_model_metadata,
)
from superset.mcp_service.storage import _create_redis_store
from superset.utils import json
@@ -403,38 +400,20 @@ def _build_summary_serializer(max_desc: int) -> Any:
def _tool_allowed_for_current_user(tool: Any) -> bool:
"""Return whether the current Flask user can see this tool in search results."""
try:
from flask import current_app, g
from flask import g
if not current_app.config.get("MCP_RBAC_ENABLED", True):
return True
from superset import security_manager
from superset.mcp_service.auth import (
CLASS_PERMISSION_ATTR,
get_user_from_request,
METHOD_PERMISSION_ATTR,
PERMISSION_PREFIX,
is_tool_visible_to_current_user,
)
tool_func = getattr(tool, "fn", None)
if tool_requires_data_model_metadata_access(tool_func) and not (
user_can_view_data_model_metadata()
):
return False
class_permission_name = getattr(tool_func, CLASS_PERMISSION_ATTR, None)
if not class_permission_name:
return True
if not getattr(g, "user", None):
try:
g.user = get_user_from_request()
except ValueError:
return False
method_permission_name = getattr(tool_func, METHOD_PERMISSION_ATTR, "read")
permission_name = f"{PERMISSION_PREFIX}{method_permission_name}"
return security_manager.can_access(permission_name, class_permission_name)
return is_tool_visible_to_current_user(tool)
except (AttributeError, RuntimeError, ValueError):
logger.debug("Could not evaluate tool search permission", exc_info=True)
return False
@@ -711,11 +690,15 @@ def build_middleware_list() -> list[Middleware]:
1. StructuredContentStripper — safety net, converts exceptions
to safe ToolResult text for transports that can't encode errors
2. LoggingMiddleware — logs tool calls with success/failure status
3. GlobalErrorHandler — catches tool exceptions, raises ToolError
2. RBACToolVisibilityMiddleware — filters tools/list by RBAC;
positioned inside the Stripper so it sees full tool objects
(with outputSchema) before stripping occurs
3. LoggingMiddleware — logs tool calls with success/failure status
4. GlobalErrorHandler — catches tool exceptions, raises ToolError
"""
return [
StructuredContentStripperMiddleware(),
RBACToolVisibilityMiddleware(),
LoggingMiddleware(),
GlobalErrorHandlerMiddleware(),
]

View File

@@ -25,6 +25,7 @@ from flask import g
from superset.mcp_service.auth import (
check_tool_permission,
CLASS_PERMISSION_ATTR,
is_tool_visible_to_current_user,
MCPPermissionDeniedError,
METHOD_PERMISSION_ATTR,
PERMISSION_PREFIX,
@@ -223,3 +224,122 @@ def app_context(app):
"""Provide Flask app context for tests needing g.user."""
with app.app_context():
yield
# -- is_tool_visible_to_current_user --
def _make_mock_tool(
class_perm: str | None = None,
method_perm: str | None = None,
fn: object | None = None,
) -> MagicMock:
"""Create a mock FastMCP Tool object for visibility tests."""
tool = MagicMock()
if fn is not None:
tool.fn = fn
elif class_perm is not None:
func = _make_tool_func(class_perm, method_perm)
tool.fn = func
else:
tool.fn = None
return tool
def test_visibility_returns_true_when_rbac_disabled(app_context, app) -> None:
"""is_tool_visible_to_current_user returns True when RBAC is disabled."""
app.config["MCP_RBAC_ENABLED"] = False
tool = _make_mock_tool(class_perm="Chart", method_perm="write")
try:
assert is_tool_visible_to_current_user(tool) is True
finally:
app.config["MCP_RBAC_ENABLED"] = True
def test_visibility_returns_true_when_fn_is_none(app_context) -> None:
"""Tools with fn=None (public/synthetic) are always visible."""
tool = _make_mock_tool()
assert is_tool_visible_to_current_user(tool) is True
def test_visibility_public_tool_no_class_permission(app_context) -> None:
"""Tools without class_permission_name are visible to all users."""
g.user = MagicMock(username="viewer")
func = _make_tool_func() # no class permission
tool = MagicMock()
tool.fn = func
assert is_tool_visible_to_current_user(tool) is True
def test_visibility_allowed_tool(app_context) -> None:
"""Tools where security_manager grants access are visible."""
g.user = MagicMock(username="admin")
func = _make_tool_func(class_perm="Chart", method_perm="read")
tool = MagicMock()
tool.fn = func
mock_sm = MagicMock()
mock_sm.can_access = MagicMock(return_value=True)
with patch("superset.security_manager", mock_sm):
result = is_tool_visible_to_current_user(tool)
assert result is True
def test_visibility_denied_tool(app_context) -> None:
"""Tools where security_manager denies access are hidden."""
g.user = MagicMock(username="viewer")
func = _make_tool_func(class_perm="Dashboard", method_perm="write")
tool = MagicMock()
tool.fn = func
mock_sm = MagicMock()
mock_sm.can_access = MagicMock(return_value=False)
with patch("superset.security_manager", mock_sm):
result = is_tool_visible_to_current_user(tool)
assert result is False
def test_visibility_data_model_metadata_denied(app_context) -> None:
"""Tools requiring data-model metadata access are hidden when user lacks it."""
g.user = MagicMock(username="viewer")
func = _make_tool_func(class_perm="Dataset", method_perm="read")
func._requires_data_model_metadata_access = True # type: ignore[attr-defined]
tool = MagicMock()
tool.fn = func
mock_sm = MagicMock()
mock_sm.can_access = MagicMock(return_value=True)
with (
patch("superset.security_manager", mock_sm),
patch(
"superset.mcp_service.privacy.user_can_view_data_model_metadata",
return_value=False,
),
):
result = is_tool_visible_to_current_user(tool)
assert result is False
def test_visibility_data_model_metadata_allowed(app_context) -> None:
"""Tools requiring data-model metadata access are visible when user has it."""
g.user = MagicMock(username="alpha")
func = _make_tool_func(class_perm="Dataset", method_perm="read")
func._requires_data_model_metadata_access = True # type: ignore[attr-defined]
tool = MagicMock()
tool.fn = func
mock_sm = MagicMock()
mock_sm.can_access = MagicMock(return_value=True)
with (
patch("superset.security_manager", mock_sm),
patch(
"superset.mcp_service.privacy.user_can_view_data_model_metadata",
return_value=True,
),
):
result = is_tool_visible_to_current_user(tool)
assert result is True

View File

@@ -1030,12 +1030,162 @@ class TestGlobalErrorHandlerLogLevels:
error.status = 500
call_next = AsyncMock(side_effect=error)
mock_logger = MagicMock()
with (
patch("superset.mcp_service.middleware.get_user_id", return_value=1),
patch("superset.mcp_service.middleware.event_logger"),
patch("superset.mcp_service.middleware.logger") as mock_logger,
patch("superset.mcp_service.middleware.logger", mock_logger),
pytest.raises(ToolError, match="Internal error"),
):
await middleware.on_message(context, call_next)
mock_logger.error.assert_called()
@pytest.mark.asyncio
async def test_mcp_permission_denied_error_becomes_tool_error(self) -> None:
"""MCPPermissionDeniedError must convert to ToolError, not a generic error."""
from superset.mcp_service.auth import MCPPermissionDeniedError
middleware = GlobalErrorHandlerMiddleware()
context = MagicMock()
context.message.name = "generate_dashboard"
context.method = "tools/call"
error = MCPPermissionDeniedError(
permission_name="can_write",
view_name="Dashboard",
user="viewer",
tool_name="generate_dashboard",
)
call_next = AsyncMock(side_effect=error)
with (
patch("superset.mcp_service.middleware.get_user_id", return_value=42),
patch("superset.mcp_service.middleware.event_logger"),
pytest.raises(ToolError) as exc_info,
):
await middleware.on_message(context, call_next)
assert "can_write" in str(exc_info.value)
assert "Dashboard" in str(exc_info.value)
@pytest.mark.asyncio
async def test_mcp_permission_denied_error_is_user_error(self) -> None:
"""MCPPermissionDeniedError must be classified as a user error (WARNING)."""
from superset.mcp_service.auth import MCPPermissionDeniedError
error = MCPPermissionDeniedError(
permission_name="can_write",
view_name="Chart",
)
assert _is_user_error(error) is True
@pytest.mark.asyncio
async def test_mcp_permission_denied_error_logs_at_warning(self) -> None:
"""MCPPermissionDeniedError should log at WARNING, not ERROR."""
from superset.mcp_service.auth import MCPPermissionDeniedError
middleware = GlobalErrorHandlerMiddleware()
context = MagicMock()
context.message.name = "generate_chart"
context.method = "tools/call"
error = MCPPermissionDeniedError(
permission_name="can_write",
view_name="Chart",
user="reader",
)
call_next = AsyncMock(side_effect=error)
mock_logger = MagicMock()
with (
patch("superset.mcp_service.middleware.get_user_id", return_value=5),
patch("superset.mcp_service.middleware.event_logger"),
patch("superset.mcp_service.middleware.logger", mock_logger),
pytest.raises(ToolError),
):
await middleware.on_message(context, call_next)
mock_logger.warning.assert_called()
mock_logger.error.assert_not_called()
class TestRBACToolVisibilityMiddleware:
"""Tests for RBACToolVisibilityMiddleware.on_list_tools."""
def _make_tool(self, name: str = "test_tool") -> Any:
"""Create a minimal mock tool object."""
tool = MagicMock()
tool.name = name
return tool
@pytest.mark.asyncio
async def test_fails_open_on_exception(self) -> None:
"""Returns all tools when get_flask_app raises (fail open)."""
from superset.mcp_service.middleware import RBACToolVisibilityMiddleware
tools = [self._make_tool("list_charts"), self._make_tool("generate_chart")]
call_next = AsyncMock(return_value=tools)
middleware = RBACToolVisibilityMiddleware()
with patch(
"superset.mcp_service.flask_singleton.get_flask_app",
side_effect=RuntimeError("no app"),
):
result = await middleware.on_list_tools(MagicMock(), call_next)
assert result == tools
@pytest.mark.asyncio
async def test_fails_open_when_user_is_none(self, app) -> None:
"""Returns all tools when _setup_user_context returns None."""
from superset.mcp_service.middleware import RBACToolVisibilityMiddleware
tools = [self._make_tool("list_charts"), self._make_tool("generate_chart")]
call_next = AsyncMock(return_value=tools)
middleware = RBACToolVisibilityMiddleware()
with (
patch(
"superset.mcp_service.flask_singleton.get_flask_app", return_value=app
),
patch("superset.mcp_service.auth._setup_user_context", return_value=None),
):
result = await middleware.on_list_tools(MagicMock(), call_next)
assert result == tools
@pytest.mark.asyncio
async def test_filters_tools_by_rbac(self, app) -> None:
"""Tools denied by is_tool_visible_to_current_user are removed."""
from superset.mcp_service.middleware import RBACToolVisibilityMiddleware
read_tool = self._make_tool("list_charts")
write_tool = self._make_tool("generate_chart")
tools = [read_tool, write_tool]
call_next = AsyncMock(return_value=tools)
middleware = RBACToolVisibilityMiddleware()
mock_user = MagicMock()
def _visible(tool: Any) -> bool:
return tool.name == "list_charts"
with (
patch(
"superset.mcp_service.flask_singleton.get_flask_app", return_value=app
),
patch(
"superset.mcp_service.auth._setup_user_context", return_value=mock_user
),
patch(
"superset.mcp_service.auth.is_tool_visible_to_current_user",
side_effect=_visible,
),
):
result = await middleware.on_list_tools(MagicMock(), call_next)
assert read_tool in result
assert write_tool not in result