fix(mcp): protect data-model metadata from dashboard viewers (#39599)

Co-authored-by: Elizabeth Thompson <eschutho@gmail.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Richard Fogaca Nienkotter
2026-04-24 09:40:39 -03:00
committed by GitHub
parent 970b5bcf75
commit d79eb5842a
22 changed files with 1207 additions and 67 deletions

View File

@@ -17,6 +17,7 @@
"""Tests for current_user in get_instance_info and user-directory filtering."""
import importlib
from unittest.mock import Mock, patch
import pytest
@@ -25,10 +26,25 @@ from pydantic import ValidationError
from superset.mcp_service.app import mcp
from superset.mcp_service.chart.schemas import ChartFilter
from superset.mcp_service.common.schema_discovery import (
ColumnMetadata,
ModelSchemaInfo,
)
from superset.mcp_service.dashboard.schemas import DashboardFilter
from superset.mcp_service.privacy import (
CHART_DATA_MODEL_COLUMNS,
DATA_MODEL_METADATA_ERROR_TYPE,
tool_requires_data_model_metadata_access,
user_can_view_data_model_metadata,
)
from superset.mcp_service.system.schemas import InstanceInfo, UserInfo
from superset.mcp_service.system.tool.get_schema import get_schema
from superset.utils import json
get_schema_module = importlib.import_module(
"superset.mcp_service.system.tool.get_schema"
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@@ -101,6 +117,142 @@ def _make_instance_info(**kwargs):
return InstanceInfo(**defaults)
def test_get_schema_is_not_globally_hidden_from_tool_search() -> None:
"""Per-model privacy is enforced inside get_schema."""
assert tool_requires_data_model_metadata_access(get_schema) is False
def test_redact_data_model_metadata_removes_dataset_and_database_summary():
from superset.mcp_service.system.schemas import (
DatabaseBreakdown,
InstanceSummary,
RecentActivity,
)
from superset.mcp_service.system.tool.get_instance_info import (
_redact_data_model_metadata,
)
instance_info = _make_instance_info(
instance_summary=InstanceSummary(
total_dashboards=2,
total_charts=4,
total_datasets=7,
total_databases=3,
total_users=5,
total_roles=6,
total_tags=8,
avg_charts_per_dashboard=2.0,
),
recent_activity=RecentActivity(
dashboards_created_last_30_days=1,
charts_created_last_30_days=2,
datasets_created_last_30_days=3,
dashboards_modified_last_7_days=4,
charts_modified_last_7_days=5,
datasets_modified_last_7_days=6,
),
database_breakdown=DatabaseBreakdown(by_type={"postgresql": 2}),
)
redacted = _redact_data_model_metadata(instance_info)
assert redacted.instance_summary.total_dashboards == 2
assert redacted.instance_summary.total_charts == 4
assert redacted.instance_summary.total_datasets == 0
assert redacted.instance_summary.total_databases == 0
assert redacted.recent_activity.dashboards_created_last_30_days == 1
assert redacted.recent_activity.charts_created_last_30_days == 2
assert redacted.recent_activity.datasets_created_last_30_days == 0
assert redacted.recent_activity.datasets_modified_last_7_days == 0
assert redacted.database_breakdown.by_type == {}
assert redacted.data_model_metadata_redacted is True
def test_user_can_view_data_model_metadata_requires_stronger_dataset_permission(
app_context,
):
with patch("superset.security_manager", new_callable=Mock) as mock_security_manager:
mock_security_manager.can_access.side_effect = (
lambda permission_name, view_name: permission_name == "can_read"
)
assert user_can_view_data_model_metadata() is False
mock_security_manager.can_access.side_effect = (
lambda permission_name, view_name: (
view_name == "Dataset" and permission_name == "can_get_drill_info"
)
)
assert user_can_view_data_model_metadata() is True
@pytest.mark.asyncio
async def test_get_schema_returns_structured_privacy_error_for_dataset(mcp_server):
with patch.object(
get_schema_module,
"user_can_view_data_model_metadata",
return_value=False,
):
async with Client(mcp_server) as client:
result = await client.call_tool(
"get_schema",
{"request": {"model_type": "dataset"}},
)
data = json.loads(result.content[0].text)
assert data["error_type"] == DATA_MODEL_METADATA_ERROR_TYPE
assert data["privacy_scope"] == "data_model"
@pytest.mark.asyncio
async def test_get_schema_redacts_chart_data_model_fields(mcp_server):
mock_schema = ModelSchemaInfo(
model_type="chart",
select_columns=[
ColumnMetadata(name="id"),
ColumnMetadata(name="datasource_name"),
ColumnMetadata(name="url"),
],
filter_columns={"slice_name": ["eq"], "datasource_name": ["like"]},
sortable_columns=["slice_name", "datasource_name"],
default_select=["id", "datasource_name", "slice_name"],
default_sort="changed_on",
default_sort_direction="desc",
search_columns=["slice_name", "description", "datasource_name"],
)
mock_core = Mock()
mock_core.run_tool.return_value = mock_schema
with (
patch.object(
get_schema_module,
"user_can_view_data_model_metadata",
return_value=False,
),
patch.dict(
get_schema_module._SCHEMA_CORE_FACTORIES,
{"chart": lambda: mock_core},
clear=False,
),
):
async with Client(mcp_server) as client:
result = await client.call_tool(
"get_schema",
{"request": {"model_type": "chart"}},
)
data = json.loads(result.content[0].text)
schema_info = data["schema_info"]
assert all(
column["name"] not in CHART_DATA_MODEL_COLUMNS
for column in schema_info["select_columns"]
)
assert "datasource_name" not in schema_info["filter_columns"]
assert "datasource_name" not in schema_info["sortable_columns"]
assert "datasource_name" not in schema_info["default_select"]
assert "datasource_name" not in schema_info["search_columns"]
# ---------------------------------------------------------------------------
# Schema-level tests: UserInfo
# ---------------------------------------------------------------------------

View File

@@ -19,6 +19,7 @@
Tests for the get_schema unified schema discovery tool.
"""
import importlib
from unittest.mock import patch
import pytest
@@ -40,6 +41,10 @@ from superset.mcp_service.common.schema_discovery import (
)
from superset.utils import json
get_schema_module = importlib.import_module(
"superset.mcp_service.system.tool.get_schema"
)
@pytest.fixture
def mcp_server():
@@ -59,6 +64,17 @@ def mock_auth():
yield mock_get_user
@pytest.fixture(autouse=True)
def allow_data_model_metadata():
"""Keep the standalone get_schema suite in the unrestricted default path."""
with patch.object(
get_schema_module,
"user_can_view_data_model_metadata",
return_value=True,
):
yield
class TestGetSchemaRequest:
"""Test the GetSchemaRequest schema validation."""