fix(mcp): add permission checks to generate_dashboard and update_chart tools (#38845)

This commit is contained in:
Amin Ghadersohi
2026-03-25 16:37:48 -04:00
committed by GitHub
parent 16f5a2a41a
commit 23a5e95884
6 changed files with 303 additions and 2 deletions

View File

@@ -53,6 +53,7 @@ from flask_appbuilder.security.sqla.models import Group, User
if TYPE_CHECKING:
from superset.connectors.sqla.models import SqlaTable
from superset.mcp_service.chart.chart_utils import DatasetValidationResult
# Type variable for decorated functions
F = TypeVar("F", bound=Callable[..., Any])
@@ -329,6 +330,24 @@ def has_dataset_access(dataset: "SqlaTable") -> bool:
return False # Deny access on error
def check_chart_data_access(chart: Any) -> "DatasetValidationResult":
"""Validate that the current user can access a chart's underlying dataset.
This extends the RBAC system: ``mcp_auth_hook`` enforces class-level
permissions before tool execution; this function enforces data-level
permissions inside tools after retrieving specific objects.
Args:
chart: A Slice ORM object with datasource_id attribute
Returns:
DatasetValidationResult with is_valid, error, etc.
"""
from superset.mcp_service.chart.chart_utils import validate_chart_dataset
return validate_chart_dataset(chart, check_access=True)
def _setup_user_context() -> User | None:
"""
Set up user context for MCP tool execution.

View File

@@ -25,6 +25,7 @@ import time
from fastmcp import Context
from superset_core.mcp.decorators import tool, ToolAnnotations
from superset.commands.exceptions import CommandException
from superset.extensions import event_logger
from superset.mcp_service.chart.chart_utils import (
analyze_chart_capabilities,
@@ -134,6 +135,29 @@ async def update_chart(
}
)
# Validate dataset access before allowing update.
# check_chart_data_access is the centralized data-level
# permission check that complements the class-level RBAC
# enforced by mcp_auth_hook.
from superset.mcp_service.auth import check_chart_data_access
validation_result = check_chart_data_access(chart)
if not validation_result.is_valid:
error_msg = validation_result.error or "Chart's dataset is not accessible"
return GenerateChartResponse.model_validate(
{
"chart": None,
"error": {
"error_type": "DatasetNotAccessible",
"message": error_msg,
"details": error_msg,
},
"success": False,
"schema_version": "2.0",
"api_version": "v1",
}
)
# Map the new config to form_data format
# Get dataset_id from existing chart for column type checking
dataset_id = chart.datasource_id if chart.datasource_id else None
@@ -246,7 +270,7 @@ async def update_chart(
}
return GenerateChartResponse.model_validate(result)
except Exception as e:
except (CommandException, ValueError, KeyError, AttributeError) as e:
execution_time = int((time.time() - start_time) * 1000)
return GenerateChartResponse.model_validate(
{

View File

@@ -350,6 +350,24 @@ def add_chart_to_existing_dashboard(
error=f"Chart with ID {request.chart_id} not found",
)
# Validate dataset access for the chart.
# check_chart_data_access is the centralized data-level
# permission check that complements the class-level RBAC
# enforced by mcp_auth_hook.
from superset.mcp_service.auth import check_chart_data_access
validation = check_chart_data_access(new_chart)
if not validation.is_valid:
return AddChartToDashboardResponse(
dashboard=None,
dashboard_url=None,
position=None,
error=(
f"Chart {request.chart_id} is not accessible: "
f"{validation.error}"
),
)
# Check if chart is already in dashboard
current_chart_ids = [slice.id for slice in dashboard.slices]
if request.chart_id in current_chart_ids:

View File

@@ -227,6 +227,23 @@ def generate_dashboard(
error=f"Charts not found: {list(missing_chart_ids)}",
)
# Validate dataset access for each chart.
# check_chart_data_access is the centralized data-level
# permission check that complements the class-level RBAC
# enforced by mcp_auth_hook.
from superset.mcp_service.auth import check_chart_data_access
for chart in chart_objects:
validation = check_chart_data_access(chart)
if not validation.is_valid:
return GenerateDashboardResponse(
dashboard=None,
dashboard_url=None,
error=(
f"Chart {chart.id} is not accessible: {validation.error}"
),
)
# Create dashboard layout with chart objects
with event_logger.log_context(action="mcp.generate_dashboard.layout"):
layout = _create_dashboard_layout(chart_objects)

View File

@@ -19,8 +19,13 @@
Unit tests for update_chart MCP tool
"""
import pytest
from unittest.mock import Mock, patch
import pytest
from fastmcp import Client
from superset.mcp_service.app import mcp
from superset.mcp_service.chart.chart_utils import DatasetValidationResult
from superset.mcp_service.chart.schemas import (
AxisConfig,
ColumnRef,
@@ -383,3 +388,106 @@ class TestUpdateChart:
assert request2.use_cache is False
assert request2.force_refresh is True
assert request2.cache_timeout == 300
@pytest.fixture
def mcp_server():
return mcp
@pytest.fixture(autouse=True)
def mock_auth():
"""Mock authentication for all tests."""
with patch("superset.mcp_service.auth.get_user_from_request") as mock_get_user:
mock_user = Mock()
mock_user.id = 1
mock_user.username = "admin"
mock_get_user.return_value = mock_user
yield mock_get_user
class TestUpdateChartDatasetAccess:
"""Tests for dataset access validation in update_chart."""
@patch(
"superset.mcp_service.auth.check_chart_data_access",
new_callable=Mock,
)
@patch("superset.daos.chart.ChartDAO.find_by_id", new_callable=Mock)
@patch("superset.db.session")
@pytest.mark.asyncio
async def test_update_chart_dataset_access_denied(
self, mock_db_session, mock_find_by_id, mock_validate, mcp_server
):
"""Test that update_chart returns error when dataset is inaccessible."""
mock_chart = Mock()
mock_chart.id = 1
mock_chart.datasource_id = 10
mock_find_by_id.return_value = mock_chart
mock_validate.return_value = DatasetValidationResult(
is_valid=False,
dataset_id=10,
dataset_name="restricted_dataset",
warnings=[],
error="Access denied to dataset 'restricted_dataset' (ID: 10). "
"You do not have permission to view this dataset.",
)
request = {
"identifier": 1,
"config": {
"chart_type": "table",
"columns": [{"name": "col1"}],
},
}
async with Client(mcp_server) as client:
result = await client.call_tool("update_chart", {"request": request})
assert result.structured_content["success"] is False
assert result.structured_content["chart"] is None
error = result.structured_content["error"]
assert error["error_type"] == "DatasetNotAccessible"
assert "Access denied" in error["message"]
@patch(
"superset.mcp_service.auth.check_chart_data_access",
new_callable=Mock,
)
@patch("superset.daos.chart.ChartDAO.find_by_id", new_callable=Mock)
@patch("superset.db.session")
@pytest.mark.asyncio
async def test_update_chart_dataset_not_found(
self, mock_db_session, mock_find_by_id, mock_validate, mcp_server
):
"""Test that update_chart returns error when dataset is deleted."""
mock_chart = Mock()
mock_chart.id = 1
mock_chart.datasource_id = 99
mock_find_by_id.return_value = mock_chart
mock_validate.return_value = DatasetValidationResult(
is_valid=False,
dataset_id=99,
dataset_name=None,
warnings=[],
error="Dataset (ID: 99) has been deleted or does not exist.",
)
request = {
"identifier": 1,
"config": {
"chart_type": "table",
"columns": [{"name": "col1"}],
},
}
async with Client(mcp_server) as client:
result = await client.call_tool("update_chart", {"request": request})
assert result.structured_content["success"] is False
assert result.structured_content["chart"] is None
error = result.structured_content["error"]
assert error["error_type"] == "DatasetNotAccessible"
assert "deleted" in error["message"]

View File

@@ -26,6 +26,7 @@ import pytest
from fastmcp import Client
from superset.mcp_service.app import mcp
from superset.mcp_service.chart.chart_utils import DatasetValidationResult
from superset.mcp_service.dashboard.constants import generate_id
from superset.mcp_service.dashboard.tool.add_chart_to_existing_dashboard import (
_add_chart_to_layout,
@@ -58,6 +59,22 @@ def mock_auth():
yield mock_get_user
@pytest.fixture(autouse=True)
def mock_chart_access():
"""Mock chart dataset validation so tests don't hit real security manager."""
with patch(
"superset.mcp_service.auth.check_chart_data_access",
return_value=DatasetValidationResult(
is_valid=True,
dataset_id=1,
dataset_name="test_dataset",
warnings=[],
error=None,
),
):
yield
def _mock_chart(id: int = 1, slice_name: str = "Test Chart") -> Mock:
"""Create a mock chart object."""
chart = Mock()
@@ -202,6 +219,67 @@ class TestGenerateDashboard:
assert result.structured_content["dashboard"] is None
assert result.structured_content["dashboard_url"] is None
@patch("superset.db.session")
@pytest.mark.asyncio
async def test_generate_dashboard_inaccessible_charts(
self, mock_db_session, mock_chart_access, mcp_server
):
"""Test error when user lacks access to some charts."""
charts = [
_mock_chart(id=1, slice_name="Accessible"),
_mock_chart(id=2, slice_name="Restricted"),
_mock_chart(id=3, slice_name="Also Restricted"),
]
mock_query = Mock()
mock_filter = Mock()
mock_query.filter.return_value = mock_filter
mock_filter.order_by.return_value = mock_filter
mock_filter.all.return_value = charts
mock_db_session.query.return_value = mock_query
# Override the autouse fixture: chart 2 has inaccessible dataset
def mock_validate(chart, check_access=False):
if chart.id == 2:
return DatasetValidationResult(
is_valid=False,
dataset_id=10,
dataset_name="restricted_dataset",
warnings=[],
error=(
"Access denied to dataset 'restricted_dataset' "
"(ID: 10). You do not have permission to view "
"this dataset."
),
)
return DatasetValidationResult(
is_valid=True,
dataset_id=chart.id,
dataset_name=f"dataset_{chart.id}",
warnings=[],
error=None,
)
with patch(
"superset.mcp_service.auth.check_chart_data_access",
side_effect=mock_validate,
):
request = {
"chart_ids": [1, 2, 3],
"dashboard_title": "Test Dashboard",
}
async with Client(mcp_server) as client:
result = await client.call_tool(
"generate_dashboard", {"request": request}
)
assert result.structured_content["error"] is not None
assert "not accessible" in result.structured_content["error"]
assert "2" in result.structured_content["error"]
assert result.structured_content["dashboard"] is None
assert result.structured_content["dashboard_url"] is None
@patch("superset.models.dashboard.Dashboard")
@patch("superset.daos.dashboard.DashboardDAO.find_by_id")
@patch("superset.db.session")
@@ -551,6 +629,43 @@ class TestAddChartToExistingDashboard:
assert result.structured_content["error"] is not None
assert "Chart with ID 999 not found" in result.structured_content["error"]
@patch("superset.daos.dashboard.DashboardDAO.find_by_id")
@patch("superset.db.session")
@pytest.mark.asyncio
async def test_add_chart_dataset_not_accessible(
self, mock_db_session, mock_find_dashboard, mcp_server
):
"""Test error when chart's dataset is not accessible."""
mock_find_dashboard.return_value = _mock_dashboard()
mock_chart = _mock_chart(id=7)
mock_db_session.get.return_value = mock_chart
# Override autouse fixture: chart 7 has inaccessible dataset
with patch(
"superset.mcp_service.auth.check_chart_data_access",
return_value=DatasetValidationResult(
is_valid=False,
dataset_id=10,
dataset_name="restricted_dataset",
warnings=[],
error=(
"Access denied to dataset 'restricted_dataset' "
"(ID: 10). You do not have permission to view "
"this dataset."
),
),
):
request = {"dashboard_id": 1, "chart_id": 7}
async with Client(mcp_server) as client:
result = await client.call_tool(
"add_chart_to_existing_dashboard", {"request": request}
)
assert result.structured_content["error"] is not None
assert "not accessible" in result.structured_content["error"]
assert "7" in result.structured_content["error"]
assert result.structured_content["dashboard"] is None
@patch("superset.daos.dashboard.DashboardDAO.find_by_id")
@patch("superset.db.session")
@pytest.mark.asyncio