fix(mcp): prevent stale g.user from causing user impersonation across tool calls (#38747)

This commit is contained in:
Amin Ghadersohi
2026-03-30 20:23:46 +02:00
committed by GitHub
parent 15bab227bb
commit 38fdfb4ca2
5 changed files with 642 additions and 105 deletions

View File

@@ -35,9 +35,12 @@ MCP_DEV_USERNAME = "admin"
```
**How it works**:
1. The `@mcp_auth_hook` decorator calls `get_user_from_request()`
2. `get_user_from_request()` reads `MCP_DEV_USERNAME` from config
3. User is queried from database and set as `g.user`
1. The `@mcp_auth_hook` decorator clears any stale `g.user` and calls `get_user_from_request()`
2. `get_user_from_request()` resolves the user in priority order:
- **JWT auth context** (per-request ContextVar from MCP SDK) — safest, prevents stale user impersonation
- **`MCP_DEV_USERNAME`** from config — for development/single-user deployments
- **`g.user` fallback** — for external middleware (e.g., Preset's WorkspaceContextMiddleware)
3. User is queried from database (with roles/groups eagerly loaded) and set as `g.user`
4. All subsequent Superset operations use this user's permissions
**Development Use Only**:

View File

@@ -189,14 +189,140 @@ def load_user_with_relationships(
return query.first()
def _resolve_user_from_jwt_context(app: Any) -> User | None:
"""
Resolve the current user from the MCP SDK's per-request JWT context.
Uses FastMCP's ``get_access_token()`` which returns the JWT AccessToken
for the current async task via a ContextVar — safe across concurrent
requests, unlike ``g.user`` which can be stale.
The username is extracted from token claims using a configurable resolver
(``MCP_USER_RESOLVER`` config) or the default ``default_user_resolver()``.
Returns:
User object with relationships loaded, or None if no JWT context
(i.e. no token present — caller should fall through to next source).
Raises:
ValueError: If JWT resolves a username that doesn't exist in the DB
(fail closed — do NOT fall through to weaker auth sources).
"""
try:
from fastmcp.server.dependencies import get_access_token
except ImportError:
logger.debug("fastmcp.server.dependencies not available, skipping JWT context")
return None
access_token = get_access_token()
if access_token is None:
return None
# Use configurable resolver or default
from superset.mcp_service.mcp_config import default_user_resolver
resolver = app.config.get("MCP_USER_RESOLVER", default_user_resolver)
username = resolver(app, access_token)
if not username:
# Fail closed: JWT is present but identity cannot be determined.
# Do NOT fall through to weaker auth sources.
raise ValueError(
"JWT context present but no username could be extracted from claims"
)
# Try username lookup first, then email fallback for OIDC email claims
user = load_user_with_relationships(username)
if not user and "@" in username:
user = load_user_with_relationships(email=username)
if not user:
# Fail closed: JWT says this user should exist but they don't.
# Do NOT fall through to MCP_DEV_USERNAME or stale g.user.
raise ValueError(
f"JWT authenticated user '{username}' not found in Superset database. "
f"Ensure the user exists before granting MCP access."
)
return user
def _resolve_user_from_api_key(app: Any) -> User | None:
"""
Resolve the current user from an API key in the Authorization header.
Uses FAB SecurityManager's API key validation. Only attempts when
FAB_API_KEY_ENABLED is True and a request context is active.
Returns:
User object with relationships loaded, or None if no API key present
or API key auth is not enabled/available.
Raises:
PermissionError: If an API key is present but invalid/expired,
or if validation is not available in this FAB version.
"""
if not app.config.get("FAB_API_KEY_ENABLED", False) or not has_request_context():
return None
sm = app.appbuilder.sm
# _extract_api_key_from_request is FAB's internal method for reading
# the Bearer token from the Authorization header and matching prefixes.
# Not all FAB versions include this method, so guard with hasattr.
if not hasattr(sm, "_extract_api_key_from_request"):
logger.debug(
"FAB SecurityManager does not have _extract_api_key_from_request; "
"API key authentication is not available in this FAB version"
)
return None
api_key_string = sm._extract_api_key_from_request()
if api_key_string is None:
return None
if not hasattr(sm, "validate_api_key"):
logger.warning(
"FAB SecurityManager does not have validate_api_key; "
"cannot validate API key"
)
raise PermissionError(
"API key validation is not available in this FAB version."
)
user = sm.validate_api_key(api_key_string)
if not user:
raise PermissionError(
"Invalid or expired API key. "
"Create a new key at /api/v1/security/api_keys/."
)
# Reload user with all relationships eagerly loaded to avoid
# detached-instance errors during later permission checks.
user_with_rels = load_user_with_relationships(username=user.username)
if user_with_rels is None:
logger.warning(
"Failed to reload API key user %s with relationships; "
"using original user object which may have lazy-loaded "
"relationships",
user.username,
)
return user
return user_with_rels
def get_user_from_request() -> User:
"""
Get the current user for the MCP tool request.
Priority order:
1. g.user if already set (by Preset workspace middleware or FastMCP auth)
1. JWT auth context (per-request ContextVar from MCP SDK) — safest
2. API key from Authorization header (via FAB SecurityManager)
3. MCP_DEV_USERNAME from configuration (for development/testing)
4. g.user fallback (for external middleware like Preset's
WorkspaceContextMiddleware that sets g.user fresh per request)
This ordering prevents stale ``g.user`` from a previous tool call
from being used in open-source deployments where no middleware
refreshes ``g.user`` per request.
Returns:
User object with roles and groups eagerly loaded
@@ -206,95 +332,51 @@ def get_user_from_request() -> User:
"""
from flask import current_app
# First check if user is already set by Preset workspace middleware
# Priority 1: JWT context (per-request safe via ContextVar)
if (jwt_user := _resolve_user_from_jwt_context(current_app)) is not None:
return jwt_user
# Priority 2: API key authentication via FAB SecurityManager
if (api_key_user := _resolve_user_from_api_key(current_app)) is not None:
return api_key_user
# Priority 3: Configured dev username for development/single-user deployments
if username := current_app.config.get("MCP_DEV_USERNAME"):
user = load_user_with_relationships(username)
if not user:
raise ValueError(
f"User '{username}' not found. "
f"Please create admin user with: superset fab create-admin"
)
return user
# Priority 4: g.user fallback (set by external middleware, e.g. Preset)
if hasattr(g, "user") and g.user:
return g.user
# Try API key authentication via FAB SecurityManager
# Only attempt when in a request context (not for MCP internal operations
# like tool discovery that run with only an application context)
# Use the Flask config key FAB_API_KEY_ENABLED (not the feature flag),
# because the config key controls whether FAB registers the API key
# endpoints and validation logic. The feature flag with the same name
# in DEFAULT_FEATURE_FLAGS only controls the frontend UI visibility.
if current_app.config.get("FAB_API_KEY_ENABLED", False) and has_request_context():
sm = current_app.appbuilder.sm
# _extract_api_key_from_request is FAB's internal method for reading
# the Bearer token from the Authorization header and matching prefixes.
# Not all FAB versions include this method, so guard with hasattr.
if not hasattr(sm, "_extract_api_key_from_request"):
logger.debug(
"FAB SecurityManager does not have _extract_api_key_from_request; "
"API key authentication is not available in this FAB version"
)
else:
api_key_string = sm._extract_api_key_from_request()
if api_key_string is not None:
if not hasattr(sm, "validate_api_key"):
logger.warning(
"FAB SecurityManager does not have validate_api_key; "
"cannot validate API key"
)
raise PermissionError(
"API key validation is not available in this FAB version."
)
user = sm.validate_api_key(api_key_string)
if user:
# Reload user with all relationships eagerly loaded to avoid
# detached-instance errors during later permission checks.
user_with_rels = load_user_with_relationships(
username=user.username,
)
if user_with_rels is None:
logger.warning(
"Failed to reload API key user %s with relationships; "
"using original user object which may have lazy-loaded "
"relationships",
user.username,
)
return user
return user_with_rels
raise PermissionError(
"Invalid or expired API key. "
"Create a new key at /api/v1/security/api_keys/."
)
# Fall back to configured username for development/single-user deployments
username = current_app.config.get("MCP_DEV_USERNAME")
if not username:
auth_enabled = current_app.config.get("MCP_AUTH_ENABLED", False)
jwt_configured = bool(
current_app.config.get("MCP_JWKS_URI")
or current_app.config.get("MCP_JWT_PUBLIC_KEY")
or current_app.config.get("MCP_JWT_SECRET")
)
details = []
details.append(
f"g.user was not set by JWT middleware "
f"(MCP_AUTH_ENABLED={auth_enabled}, "
f"JWT keys configured={jwt_configured})"
)
details.append("MCP_DEV_USERNAME is not configured")
configured_prefixes = current_app.config.get("FAB_API_KEY_PREFIXES", ["sst_"])
prefix_example = configured_prefixes[0] if configured_prefixes else "sst_"
raise ValueError(
"No authenticated user found. Tried:\n"
+ "\n".join(f" - {d}" for d in details)
+ f"\n\nEither pass a valid API key (Bearer {prefix_example}...), "
"JWT token, or configure MCP_DEV_USERNAME for development."
)
# Use helper function to load user with all required relationships
user = load_user_with_relationships(username)
if not user:
raise ValueError(
f"User '{username}' not found. "
f"Please create admin user with: superset fab create-admin"
)
return user
# No auth source available — raise with diagnostic details
auth_enabled = current_app.config.get("MCP_AUTH_ENABLED", False)
jwt_configured = bool(
current_app.config.get("MCP_JWKS_URI")
or current_app.config.get("MCP_JWT_PUBLIC_KEY")
or current_app.config.get("MCP_JWT_SECRET")
)
details = [
f"No JWT access token in MCP request context "
f"(MCP_AUTH_ENABLED={auth_enabled}, "
f"JWT keys configured={jwt_configured})",
"No API key in Authorization header",
"MCP_DEV_USERNAME is not configured",
"g.user was not set by external middleware",
]
configured_prefixes = current_app.config.get("FAB_API_KEY_PREFIXES", ["sst_"])
prefix_example = configured_prefixes[0] if configured_prefixes else "sst_"
raise ValueError(
"No authenticated user found. Tried:\n"
+ "\n".join(f" - {d}" for d in details)
+ f"\n\nEither pass a valid API key (Bearer {prefix_example}...), "
"JWT token, or configure MCP_DEV_USERNAME for development."
)
def has_dataset_access(dataset: "SqlaTable") -> bool:
@@ -355,6 +437,15 @@ def _setup_user_context() -> User | None:
Returns:
User object with roles and groups loaded, or None if no Flask context
"""
# Clear stale g.user to prevent user impersonation across
# tool calls when no per-request middleware refreshes it.
# Only clear in app-context-only mode; preserve g.user when
# a request context is active (external middleware set it).
from flask import has_request_context
if not has_request_context():
g.pop("user", None)
try:
user = get_user_from_request()
except RuntimeError as e:

View File

@@ -318,11 +318,30 @@ def create_default_mcp_auth_factory(app: Flask) -> Optional[Any]:
return None
def default_user_resolver(app: Any, access_token: Any) -> Optional[str]:
"""Extract username from JWT token claims."""
if hasattr(access_token, "subject"):
def default_user_resolver(app: Any, access_token: Any) -> str | None:
"""Extract username from JWT token claims.
Checks the ``claims`` dict first (FastMCP's AccessToken format),
then falls back to legacy attribute access for backward compatibility.
"""
# FastMCP AccessToken stores JWT claims in a dict
claims = getattr(access_token, "claims", None)
if isinstance(claims, dict) and claims:
# Prefer human-readable username claims over opaque `sub`
# (OIDC `sub` is often a stable opaque ID, not a Superset username)
username = (
claims.get("preferred_username")
or claims.get("username")
or claims.get("email")
or claims.get("sub")
)
if username:
return username
# Legacy attribute access for backward compatibility
if hasattr(access_token, "subject") and access_token.subject:
return access_token.subject
if hasattr(access_token, "client_id"):
if hasattr(access_token, "client_id") and access_token.client_id:
return access_token.client_id
if hasattr(access_token, "payload") and isinstance(access_token.payload, dict):
return (

View File

@@ -143,24 +143,19 @@ def test_no_request_context_skips_api_key_auth(app) -> None:
mock_sm._extract_api_key_from_request.assert_not_called()
# -- g.user already set -> API key auth skipped (JWT precedence) --
# -- g.user fallback when no higher-priority auth succeeds --
@pytest.mark.usefixtures("_enable_api_keys")
def test_existing_g_user_takes_precedence(app, mock_user) -> None:
"""If g.user is already set (e.g., by JWT middleware), API key auth
should not be attempted."""
mock_sm = MagicMock()
with app.test_request_context(headers={"Authorization": "Bearer sst_abc123"}):
@pytest.mark.usefixtures("_disable_api_keys")
def test_g_user_fallback_when_no_jwt_or_api_key(app, mock_user) -> None:
"""When no JWT or API key auth succeeds and MCP_DEV_USERNAME is not set,
g.user (set by external middleware) is used as fallback."""
with app.test_request_context():
g.user = mock_user
app.appbuilder = MagicMock()
app.appbuilder.sm = mock_sm
result = get_user_from_request()
assert result.username == "api_key_user"
mock_sm._extract_api_key_from_request.assert_not_called()
# -- FAB version without _extract_api_key_from_request --

View File

@@ -0,0 +1,429 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Tests for MCP user resolution priority and stale g.user prevention."""
from unittest.mock import MagicMock, patch
import pytest
from flask import g
from superset.mcp_service.auth import (
_resolve_user_from_jwt_context,
get_user_from_request,
mcp_auth_hook,
)
from superset.mcp_service.mcp_config import default_user_resolver
def _make_mock_user(username: str = "testuser") -> MagicMock:
"""Create a mock User with required attributes."""
user = MagicMock()
user.username = username
user.roles = []
user.groups = []
return user
def _make_access_token(
claims: dict[str, str] | None = None, **kwargs: str
) -> MagicMock:
"""Create a mock AccessToken matching FastMCP's format."""
token = MagicMock()
token.claims = claims or {}
token.client_id = kwargs.get("client_id", "")
token.scopes = kwargs.get("scopes", [])
# Remove auto-created attributes so getattr fallbacks work correctly
for attr in ("subject", "payload"):
if attr not in kwargs:
delattr(token, attr)
for attr in kwargs:
setattr(token, attr, kwargs[attr])
return token
# -- _resolve_user_from_jwt_context --
def test_jwt_context_resolves_correct_user(app) -> None:
"""JWT context with valid claims resolves the correct DB user."""
mock_user = _make_mock_user("alice")
token = _make_access_token(claims={"sub": "alice"})
with app.app_context():
with (
patch("fastmcp.server.dependencies.get_access_token", return_value=token),
patch(
"superset.mcp_service.auth.load_user_with_relationships",
return_value=mock_user,
),
):
result = _resolve_user_from_jwt_context(app)
assert result is not None
assert result.username == "alice"
def test_jwt_context_returns_none_when_no_token(app) -> None:
"""No JWT token present returns None (fall through to next source)."""
with app.app_context():
with patch("fastmcp.server.dependencies.get_access_token", return_value=None):
result = _resolve_user_from_jwt_context(app)
assert result is None
def test_jwt_context_raises_for_unknown_user(app) -> None:
"""JWT resolves a username not in DB — raises ValueError (fail closed)."""
token = _make_access_token(claims={"sub": "nonexistent"})
with app.app_context():
with (
patch("fastmcp.server.dependencies.get_access_token", return_value=token),
patch(
"superset.mcp_service.auth.load_user_with_relationships",
return_value=None,
),
):
with pytest.raises(ValueError, match="not found in Superset database"):
_resolve_user_from_jwt_context(app)
def test_jwt_context_raises_when_no_username_in_claims(app) -> None:
"""JWT present but claims have no extractable username — fails closed."""
token = _make_access_token(claims={"iss": "some-issuer"})
with app.app_context():
with patch("fastmcp.server.dependencies.get_access_token", return_value=token):
with pytest.raises(ValueError, match="no username could be extracted"):
_resolve_user_from_jwt_context(app)
def test_jwt_context_uses_custom_resolver(app) -> None:
"""Custom MCP_USER_RESOLVER config is used when set."""
mock_user = _make_mock_user("custom_user")
token = _make_access_token(claims={"custom_field": "custom_user"})
custom_resolver = MagicMock(return_value="custom_user")
with app.app_context():
app.config["MCP_USER_RESOLVER"] = custom_resolver
try:
with (
patch(
"fastmcp.server.dependencies.get_access_token", return_value=token
),
patch(
"superset.mcp_service.auth.load_user_with_relationships",
return_value=mock_user,
),
):
result = _resolve_user_from_jwt_context(app)
finally:
app.config.pop("MCP_USER_RESOLVER", None)
assert result is not None
assert result.username == "custom_user"
custom_resolver.assert_called_once_with(app, token)
def test_jwt_context_email_fallback_lookup(app) -> None:
"""When resolver returns an email, tries email lookup after username miss."""
mock_user = _make_mock_user("alice")
token = _make_access_token(claims={"email": "alice@example.com"})
def _load_side_effect(username=None, email=None):
if email == "alice@example.com":
return mock_user
return None
with app.app_context():
with (
patch("fastmcp.server.dependencies.get_access_token", return_value=token),
patch(
"superset.mcp_service.auth.load_user_with_relationships",
side_effect=_load_side_effect,
),
):
result = _resolve_user_from_jwt_context(app)
assert result is not None
assert result.username == "alice"
# -- get_user_from_request priority order --
def test_jwt_takes_priority_over_stale_g_user(app) -> None:
"""Core regression test: JWT user wins over stale g.user."""
stale_user = _make_mock_user("stale_bob")
jwt_user = _make_mock_user("jwt_alice")
token = _make_access_token(claims={"sub": "jwt_alice"})
with app.app_context():
g.user = stale_user
with (
patch("fastmcp.server.dependencies.get_access_token", return_value=token),
patch(
"superset.mcp_service.auth.load_user_with_relationships",
return_value=jwt_user,
),
):
result = get_user_from_request()
assert result.username == "jwt_alice"
def test_dev_username_fallback_when_no_jwt(app) -> None:
"""MCP_DEV_USERNAME used when no JWT context available."""
mock_user = _make_mock_user("dev_admin")
with app.app_context():
app.config["MCP_DEV_USERNAME"] = "dev_admin"
try:
with (
patch(
"fastmcp.server.dependencies.get_access_token", return_value=None
),
patch(
"superset.mcp_service.auth.load_user_with_relationships",
return_value=mock_user,
),
):
result = get_user_from_request()
finally:
app.config.pop("MCP_DEV_USERNAME", None)
assert result.username == "dev_admin"
def test_g_user_fallback_when_no_jwt_and_no_dev_username(app) -> None:
"""g.user used as last-resort fallback (Preset middleware compatibility)."""
preset_user = _make_mock_user("preset_user")
with app.app_context():
app.config.pop("MCP_DEV_USERNAME", None)
g.user = preset_user
with patch("fastmcp.server.dependencies.get_access_token", return_value=None):
result = get_user_from_request()
assert result.username == "preset_user"
def test_raises_when_no_auth_source(app) -> None:
"""ValueError raised when no auth source is available."""
with app.app_context():
app.config.pop("MCP_DEV_USERNAME", None)
g.pop("user", None)
with patch("fastmcp.server.dependencies.get_access_token", return_value=None):
with pytest.raises(ValueError, match="No authenticated user found"):
get_user_from_request()
def test_dev_username_not_found_raises(app) -> None:
"""MCP_DEV_USERNAME configured but user not in DB raises ValueError."""
with app.app_context():
app.config["MCP_DEV_USERNAME"] = "ghost"
try:
with (
patch(
"fastmcp.server.dependencies.get_access_token", return_value=None
),
patch(
"superset.mcp_service.auth.load_user_with_relationships",
return_value=None,
),
):
with pytest.raises(ValueError, match="not found"):
get_user_from_request()
finally:
app.config.pop("MCP_DEV_USERNAME", None)
# -- g.user clearing in mcp_auth_hook --
def test_mcp_auth_hook_clears_stale_g_user(app) -> None:
"""mcp_auth_hook clears g.user before setting up user context.
Uses a side_effect that asserts g.user was cleared before user
resolution runs, so the test fails if g.pop("user") is removed.
"""
stale_user = _make_mock_user("stale")
fresh_user = _make_mock_user("fresh")
def dummy_tool():
"""Dummy tool."""
return g.user.username
wrapped = mcp_auth_hook(dummy_tool)
def _assert_cleared_then_return():
"""Verify stale g.user was cleared before returning fresh user."""
assert not hasattr(g, "user") or g.user is None, (
"g.user should have been cleared before get_user_from_request() "
f"but found g.user={getattr(g, 'user', '<missing>')}"
)
return fresh_user
with app.app_context():
g.user = stale_user
# Explicitly mock has_request_context to False because the test
# framework's autouse app_context fixture may implicitly provide
# a request context in some CI environments.
with (
patch("flask.has_request_context", return_value=False),
patch(
"superset.mcp_service.auth.get_user_from_request",
side_effect=lambda: _assert_cleared_then_return(),
),
):
result = wrapped()
assert result == "fresh"
def test_mcp_auth_hook_clears_stale_g_user_async(app) -> None:
"""mcp_auth_hook clears g.user before setting up user context (async).
Uses a side_effect that asserts g.user was cleared before user
resolution runs, so the test fails if g.pop("user") is removed.
"""
import asyncio
stale_user = _make_mock_user("stale")
fresh_user = _make_mock_user("fresh")
async def dummy_tool():
"""Dummy tool."""
return g.user.username
wrapped = mcp_auth_hook(dummy_tool)
def _assert_cleared_then_return():
"""Verify stale g.user was cleared before returning fresh user."""
assert not hasattr(g, "user") or g.user is None, (
"g.user should have been cleared before get_user_from_request() "
f"but found g.user={getattr(g, 'user', '<missing>')}"
)
return fresh_user
with app.app_context():
g.user = stale_user
with (
patch("flask.has_request_context", return_value=False),
patch(
"superset.mcp_service.auth.get_user_from_request",
side_effect=lambda: _assert_cleared_then_return(),
),
):
result = asyncio.run(wrapped())
assert result == "fresh"
def test_mcp_auth_hook_preserves_g_user_in_request_context(app) -> None:
"""g.user is NOT cleared when a request context is active (middleware compat).
Uses a side_effect that asserts g.user is still the middleware-set
user when get_user_from_request() is called, proving the hook did
NOT clear it.
"""
middleware_user = _make_mock_user("middleware_user")
def dummy_tool():
"""Dummy tool."""
return g.user.username
wrapped = mcp_auth_hook(dummy_tool)
def _assert_preserved_then_return():
"""Verify g.user was preserved (not cleared) before returning."""
assert hasattr(g, "user"), (
"g.user should be preserved in request context but was removed"
)
assert g.user is middleware_user, (
"g.user should be preserved in request context but was changed; "
f"g.user={g.user}"
)
return middleware_user
with app.test_request_context():
g.user = middleware_user
with patch(
"superset.mcp_service.auth.get_user_from_request",
side_effect=lambda: _assert_preserved_then_return(),
):
result = wrapped()
assert result == "middleware_user"
# -- default_user_resolver --
def test_default_resolver_extracts_sub_from_claims() -> None:
"""Extracts 'sub' claim as last-resort from AccessToken.claims dict."""
token = _make_access_token(claims={"sub": "alice"})
assert default_user_resolver(None, token) == "alice"
def test_default_resolver_extracts_preferred_username() -> None:
"""Extracts 'preferred_username' claim (common OIDC claim)."""
token = _make_access_token(claims={"preferred_username": "alice"})
assert default_user_resolver(None, token) == "alice"
def test_default_resolver_extracts_email_from_claims() -> None:
"""Falls back to 'email' claim when 'sub' is absent."""
token = _make_access_token(claims={"email": "alice@example.com"})
assert default_user_resolver(None, token) == "alice@example.com"
def test_default_resolver_extracts_username_from_claims() -> None:
"""Falls back to 'username' claim."""
token = _make_access_token(claims={"username": "alice"})
assert default_user_resolver(None, token) == "alice"
def test_default_resolver_falls_back_to_subject_attr() -> None:
"""Falls back to legacy .subject attribute when claims empty."""
token = _make_access_token(claims={}, subject="legacy_user")
assert default_user_resolver(None, token) == "legacy_user"
def test_default_resolver_falls_back_to_client_id() -> None:
"""Falls back to .client_id when claims empty and no subject."""
token = _make_access_token(claims={}, client_id="service-account")
assert default_user_resolver(None, token) == "service-account"
def test_default_resolver_returns_none_for_empty_token() -> None:
"""Returns None when no claims or attributes have a username."""
token = _make_access_token(claims={}, client_id="")
assert default_user_resolver(None, token) is None
def test_default_resolver_preferred_username_takes_priority() -> None:
"""'preferred_username' takes priority over 'sub' and 'email' in claims."""
token = _make_access_token(
claims={
"sub": "opaque-id-123",
"preferred_username": "alice",
"email": "alice@example.com",
}
)
assert default_user_resolver(None, token) == "alice"