feat(mcp): add event_logger instrumentation to MCP tools (#37859)

This commit is contained in:
Amin Ghadersohi
2026-02-12 10:50:20 -05:00
committed by GitHub
parent 3f64c25712
commit 4dfece9ee5
23 changed files with 931 additions and 508 deletions

View File

@@ -33,6 +33,7 @@ from superset_core.mcp import tool
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetErrorException, SupersetSecurityException
from superset.extensions import event_logger
from superset.mcp_service.sql_lab.schemas import (
ColumnInfo,
ExecuteSqlRequest,
@@ -72,28 +73,29 @@ async def execute_sql(request: ExecuteSqlRequest, ctx: Context) -> ExecuteSqlRes
from superset.models.core import Database
# 1. Get database and check access
database = db.session.query(Database).filter_by(id=request.database_id).first()
if not database:
raise SupersetErrorException(
SupersetError(
message=f"Database with ID {request.database_id} not found",
error_type=SupersetErrorType.DATABASE_NOT_FOUND_ERROR,
level=ErrorLevel.ERROR,
)
with event_logger.log_context(action="mcp.execute_sql.db_validation"):
database = (
db.session.query(Database).filter_by(id=request.database_id).first()
)
if not security_manager.can_access_database(database):
raise SupersetSecurityException(
SupersetError(
message=f"Access denied to database {database.database_name}",
error_type=SupersetErrorType.DATABASE_SECURITY_ACCESS_ERROR,
level=ErrorLevel.ERROR,
if not database:
raise SupersetErrorException(
SupersetError(
message=f"Database with ID {request.database_id} not found",
error_type=SupersetErrorType.DATABASE_NOT_FOUND_ERROR,
level=ErrorLevel.ERROR,
)
)
)
# 2. Build QueryOptions
# Caching is enabled by default to reduce database load.
# force_refresh bypasses cache when user explicitly requests fresh data.
if not security_manager.can_access_database(database):
raise SupersetSecurityException(
SupersetError(
message=(f"Access denied to database {database.database_name}"),
error_type=SupersetErrorType.DATABASE_SECURITY_ACCESS_ERROR,
level=ErrorLevel.ERROR,
)
)
# 2. Build QueryOptions and execute query
cache_opts = CacheOptions(force_refresh=True) if request.force_refresh else None
options = QueryOptions(
catalog=request.catalog,
@@ -106,10 +108,12 @@ async def execute_sql(request: ExecuteSqlRequest, ctx: Context) -> ExecuteSqlRes
)
# 3. Execute query
result = database.execute(request.sql, options)
with event_logger.log_context(action="mcp.execute_sql.query_execution"):
result = database.execute(request.sql, options)
# 4. Convert to MCP response format
response = _convert_to_response(result)
with event_logger.log_context(action="mcp.execute_sql.response_conversion"):
response = _convert_to_response(result)
# Log successful execution
if response.success: