mirror of
https://github.com/apache/superset.git
synced 2026-06-11 10:39:15 +00:00
Compare commits
3 Commits
fix/chart-
...
feat/force
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d1427ffb1 | ||
|
|
d649f4eb13 | ||
|
|
4f67a038c1 |
@@ -362,6 +362,11 @@ RATELIMIT_ENABLED = os.environ.get("SUPERSET_ENV") == "production"
|
||||
RATELIMIT_APPLICATION = "50 per second"
|
||||
AUTH_RATE_LIMITED = True
|
||||
AUTH_RATE_LIMIT = "5 per second"
|
||||
|
||||
# When enabled, users whose account is flagged with ``password_must_change``
|
||||
# (e.g. accounts provisioned by an administrator) are redirected to the
|
||||
# password-reset page until they set a new password. Off by default.
|
||||
ENABLE_FORCE_PASSWORD_CHANGE = False
|
||||
# A storage location conforming to the scheme in storage-scheme. See the limits
|
||||
# library for allowed values: https://limits.readthedocs.io/en/stable/storage.html
|
||||
# RATELIMIT_STORAGE_URI = "redis://host:port"
|
||||
|
||||
@@ -729,6 +729,14 @@ class SupersetAppInitializer: # pylint: disable=too-many-public-methods
|
||||
"""Register app-level request handlers"""
|
||||
from flask import request, Response
|
||||
|
||||
from superset.security.password_change import (
|
||||
register_password_change_enforcement,
|
||||
)
|
||||
|
||||
# Redirect users with a pending forced password change to the reset
|
||||
# page (no-op unless ENABLE_FORCE_PASSWORD_CHANGE is enabled).
|
||||
register_password_change_enforcement(self.superset_app)
|
||||
|
||||
@self.superset_app.after_request
|
||||
def apply_http_headers(response: Response) -> Response:
|
||||
"""Applies the configuration's http headers to all responses"""
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""add password_must_change to user_attribute
|
||||
|
||||
Revision ID: b7c9d1e2f3a4
|
||||
Revises: 31dae2559c05
|
||||
Create Date: 2026-06-01 00:00:00.000000
|
||||
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from superset.migrations.shared.utils import add_columns, drop_columns
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "b7c9d1e2f3a4"
|
||||
down_revision = "31dae2559c05"
|
||||
|
||||
|
||||
def upgrade():
|
||||
add_columns(
|
||||
"user_attribute",
|
||||
sa.Column(
|
||||
"password_must_change",
|
||||
sa.Boolean(),
|
||||
nullable=False,
|
||||
server_default=sa.false(),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
drop_columns("user_attribute", "password_must_change")
|
||||
@@ -16,7 +16,7 @@
|
||||
# under the License.
|
||||
|
||||
from flask_appbuilder import Model
|
||||
from sqlalchemy import Column, ForeignKey, Integer, String
|
||||
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
from superset import security_manager
|
||||
@@ -42,3 +42,7 @@ class UserAttribute(Model, AuditMixinNullable):
|
||||
welcome_dashboard_id = Column(Integer, ForeignKey("dashboards.id"))
|
||||
welcome_dashboard = relationship("Dashboard")
|
||||
avatar_url = Column(String(100))
|
||||
# When True, the user must change their password before they can use the
|
||||
# rest of the app (enforced by the before-request hook in
|
||||
# superset.security.password_change when ENABLE_FORCE_PASSWORD_CHANGE is on).
|
||||
password_must_change = Column(Boolean, nullable=False, default=False)
|
||||
|
||||
@@ -22,7 +22,7 @@ import re
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from types import SimpleNamespace
|
||||
from typing import Any, Callable, cast, NamedTuple, Optional, TYPE_CHECKING
|
||||
from typing import Any, Callable, cast, NamedTuple, Optional, TYPE_CHECKING, Union
|
||||
|
||||
from flask import current_app, Flask, g, Request
|
||||
from flask_appbuilder import Model
|
||||
@@ -635,6 +635,49 @@ class SupersetSecurityManager( # pylint: disable=too-many-public-methods
|
||||
lm.request_loader(self.request_loader)
|
||||
return lm
|
||||
|
||||
def reset_password(self, userid: Union[int, str], password: str) -> None:
|
||||
"""Reset a user's password, clearing the forced-change flag only on a
|
||||
self-service reset.
|
||||
|
||||
Both the self-service reset (``ResetMyPasswordView``) and the admin
|
||||
"Reset Password" action (``ResetPasswordView``) route through this
|
||||
method. The forced-password-change flag must only be cleared when the
|
||||
user resets *their own* password — an admin-initiated reset sets a
|
||||
temporary password and must preserve the "must change at next login"
|
||||
requirement, otherwise the first-use lifecycle would be silently
|
||||
bypassed. We distinguish the two by comparing the acting user
|
||||
(``g.user``) against the target ``userid``: they match for a
|
||||
self-service reset and differ for an admin reset.
|
||||
"""
|
||||
super().reset_password(userid, password)
|
||||
|
||||
acting_user = getattr(g, "user", None)
|
||||
acting_user_id = getattr(acting_user, "id", None)
|
||||
# ``userid`` arrives as a string (the ``pk`` request arg) on the admin
|
||||
# path, so coerce both sides before comparing.
|
||||
is_self_service = acting_user_id is not None and self._same_user(
|
||||
acting_user_id, userid
|
||||
)
|
||||
if is_self_service:
|
||||
from superset.security.password_change import (
|
||||
clear_password_must_change,
|
||||
)
|
||||
|
||||
clear_password_must_change(int(userid))
|
||||
|
||||
@staticmethod
|
||||
def _same_user(left: Any, right: Any) -> bool:
|
||||
"""Return True if two user identifiers refer to the same user.
|
||||
|
||||
Identifiers may be ints or numeric strings (FAB passes the admin-reset
|
||||
target as a ``pk`` request arg string), so compare them as integers and
|
||||
fall back to a string comparison if coercion fails.
|
||||
"""
|
||||
try:
|
||||
return int(left) == int(right)
|
||||
except (TypeError, ValueError):
|
||||
return str(left) == str(right)
|
||||
|
||||
def on_user_login(self, user: Any) -> None:
|
||||
_log_audit_event(
|
||||
"UserLoggedIn",
|
||||
|
||||
173
superset/security/password_change.py
Normal file
173
superset/security/password_change.py
Normal file
@@ -0,0 +1,173 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Force-password-change-on-first-use enforcement.
|
||||
|
||||
A per-user ``password_must_change`` flag (on ``UserAttribute``) marks accounts —
|
||||
typically created by an administrator — that must set a new password before
|
||||
they can use the rest of the application. When ``ENABLE_FORCE_PASSWORD_CHANGE``
|
||||
is enabled, a ``before_request`` hook redirects such users to the password-reset
|
||||
page until they change it; the flag is cleared automatically on a successful
|
||||
*self-service* password reset (see ``SupersetSecurityManager.reset_password``).
|
||||
An admin-initiated reset deliberately preserves the flag so the target user is
|
||||
still forced to change the temporary password at next login.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Optional
|
||||
|
||||
from flask import current_app, flash, g, redirect, request, url_for
|
||||
from flask_babel import gettext as __
|
||||
|
||||
from superset.utils.decorators import transaction
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Flask endpoints take the form ``<ViewClass>.<method>`` (or a bare name for
|
||||
# function views). The following must remain reachable while a password change
|
||||
# is pending, otherwise the redirect would loop: the auth views (login/logout
|
||||
# for every auth backend), the password-reset and user-info-edit views, static
|
||||
# assets, and the health check. We match the *view-class* component (the part
|
||||
# before the dot) exactly against the allow-list below rather than doing a
|
||||
# substring search, so unrelated endpoints that merely share a substring (e.g.
|
||||
# an "Author"-named view, or any name containing "health"/"static") are not
|
||||
# accidentally exempted from enforcement.
|
||||
_EXEMPT_VIEW_CLASSES = frozenset(
|
||||
{
|
||||
"AuthDBView",
|
||||
"AuthLDAPView",
|
||||
"AuthOAuthView",
|
||||
"AuthOIDView",
|
||||
"AuthRemoteUserView",
|
||||
"ResetMyPasswordView",
|
||||
"ResetPasswordView",
|
||||
"UserInfoEditView",
|
||||
}
|
||||
)
|
||||
|
||||
# Exact endpoint names (function views / Flask built-ins) that are always exempt.
|
||||
_EXEMPT_ENDPOINTS = frozenset({"static", "appbuilder.static", "health", "healthcheck"})
|
||||
|
||||
|
||||
def _get_user_attribute(user_id: int) -> Optional[Any]:
|
||||
# Imported lazily to avoid import cycles at app-init time.
|
||||
from superset.extensions import db
|
||||
from superset.models.user_attributes import UserAttribute
|
||||
|
||||
# The ``user_attribute`` table does not enforce uniqueness on ``user_id``,
|
||||
# so a user could have more than one row. ``.one_or_none()`` would raise
|
||||
# ``MultipleResultsFound`` (a 500) in that case; fetch deterministically by
|
||||
# ordering on the primary key and taking the first row instead.
|
||||
return (
|
||||
db.session.query(UserAttribute)
|
||||
.filter(UserAttribute.user_id == user_id)
|
||||
.order_by(UserAttribute.id)
|
||||
.first()
|
||||
)
|
||||
|
||||
|
||||
def password_change_required(user: Any) -> bool:
|
||||
"""Return True if ``user`` has a pending forced password change."""
|
||||
user_id = getattr(user, "id", None)
|
||||
if not user_id:
|
||||
return False
|
||||
attr = _get_user_attribute(user_id)
|
||||
return bool(attr and attr.password_must_change)
|
||||
|
||||
|
||||
@transaction()
|
||||
def set_password_must_change(user_id: int, value: bool = True) -> None:
|
||||
"""Set (or clear) the forced-password-change flag for a user.
|
||||
|
||||
Intended to be called by administrative flows when provisioning an account
|
||||
that should require a password change on first use.
|
||||
"""
|
||||
from superset.extensions import db
|
||||
from superset.models.user_attributes import UserAttribute
|
||||
|
||||
attr = _get_user_attribute(user_id)
|
||||
if attr is None:
|
||||
attr = UserAttribute(user_id=user_id)
|
||||
db.session.add(attr)
|
||||
attr.password_must_change = value
|
||||
|
||||
|
||||
@transaction()
|
||||
def clear_password_must_change(user_id: int) -> None:
|
||||
"""Clear the forced-password-change flag for a user, if set."""
|
||||
attr = _get_user_attribute(user_id)
|
||||
if attr and attr.password_must_change:
|
||||
attr.password_must_change = False
|
||||
|
||||
|
||||
def _is_exempt_endpoint(endpoint: Optional[str]) -> bool:
|
||||
# A missing endpoint (e.g. an unmatched URL) is left to normal 404 handling.
|
||||
if not endpoint:
|
||||
return True
|
||||
if endpoint in _EXEMPT_ENDPOINTS:
|
||||
return True
|
||||
# Any blueprint's static route, e.g. "<blueprint>.static".
|
||||
if endpoint.endswith(".static"):
|
||||
return True
|
||||
# Match the view-class component exactly, so e.g. "AuthDBView.login" is
|
||||
# exempt but an unrelated "AuthorView.list" is not.
|
||||
view_class = endpoint.split(".", 1)[0]
|
||||
return view_class in _EXEMPT_VIEW_CLASSES
|
||||
|
||||
|
||||
def register_password_change_enforcement(app: Any) -> None:
|
||||
"""Register the before-request hook that enforces pending password changes.
|
||||
|
||||
No-op unless ``ENABLE_FORCE_PASSWORD_CHANGE`` is enabled, so there is zero
|
||||
per-request overhead in the default configuration.
|
||||
"""
|
||||
|
||||
@app.before_request
|
||||
def _enforce_password_change() -> Any: # pylint: disable=unused-variable
|
||||
if not current_app.config.get("ENABLE_FORCE_PASSWORD_CHANGE"):
|
||||
return None
|
||||
|
||||
user = getattr(g, "user", None)
|
||||
if not user or getattr(user, "is_anonymous", True):
|
||||
return None
|
||||
|
||||
if _is_exempt_endpoint(request.endpoint):
|
||||
return None
|
||||
|
||||
if not password_change_required(user):
|
||||
return None
|
||||
|
||||
flash(__("You must change your password before continuing."), "warning")
|
||||
# Resolve the password-reset page. If that endpoint can't be resolved
|
||||
# (e.g. a custom security manager without ``ResetMyPasswordView``), fall
|
||||
# back to logout, which is always exempt from this enforcement. We must
|
||||
# NOT fall back to "/" or any other non-exempt route: the index re-runs
|
||||
# this same hook and would trap the user in an infinite 302 loop. If no
|
||||
# exempt target can be resolved at all, return an error response rather
|
||||
# than redirect, so a flagged user can never get stuck looping.
|
||||
for endpoint in ("ResetMyPasswordView.this_form_get", "AuthDBView.logout"):
|
||||
try:
|
||||
return redirect(url_for(endpoint))
|
||||
except Exception: # noqa: BLE001, S112 # pylint: disable=broad-except
|
||||
# Try the next exempt fallback; a failed url_for resolution here
|
||||
# is expected/benign and not worth logging per attempt.
|
||||
continue
|
||||
return (
|
||||
__("You must change your password before continuing."),
|
||||
503,
|
||||
)
|
||||
@@ -1619,3 +1619,76 @@ def test_user_view_menu_names_for_guest_user_no_roles(
|
||||
|
||||
assert result == set()
|
||||
mock_get_user_id.assert_not_called()
|
||||
|
||||
|
||||
def test_reset_password_self_service_clears_flag(
|
||||
mocker: MockerFixture,
|
||||
app_context: None,
|
||||
) -> None:
|
||||
"""A user resetting their own password clears the forced-change flag."""
|
||||
sm = SupersetSecurityManager(appbuilder)
|
||||
# The target user (id 5) is the same as the acting user -> self-service.
|
||||
mock_g = SimpleNamespace(user=SimpleNamespace(id=5))
|
||||
mocker.patch("superset.security.manager.g", new=mock_g)
|
||||
# Avoid touching the real DB in the FAB base implementation.
|
||||
mocker.patch(
|
||||
"flask_appbuilder.security.manager.BaseSecurityManager.reset_password",
|
||||
return_value=None,
|
||||
)
|
||||
mock_clear = mocker.patch(
|
||||
"superset.security.password_change.clear_password_must_change"
|
||||
)
|
||||
|
||||
sm.reset_password(5, "new-password")
|
||||
|
||||
mock_clear.assert_called_once_with(5)
|
||||
|
||||
|
||||
def test_reset_password_admin_does_not_clear_flag(
|
||||
mocker: MockerFixture,
|
||||
app_context: None,
|
||||
) -> None:
|
||||
"""An admin-initiated reset must preserve the forced-change requirement.
|
||||
|
||||
FAB's ``ResetPasswordView`` passes the target as the ``pk`` request-arg
|
||||
string while ``g.user`` remains the admin, so the acting user differs from
|
||||
the target and the flag must NOT be cleared.
|
||||
"""
|
||||
sm = SupersetSecurityManager(appbuilder)
|
||||
# Acting user is admin (id 1); target is a different user ("5" as a string,
|
||||
# as FAB passes it from request args).
|
||||
mock_g = SimpleNamespace(user=SimpleNamespace(id=1))
|
||||
mocker.patch("superset.security.manager.g", new=mock_g)
|
||||
mocker.patch(
|
||||
"flask_appbuilder.security.manager.BaseSecurityManager.reset_password",
|
||||
return_value=None,
|
||||
)
|
||||
mock_clear = mocker.patch(
|
||||
"superset.security.password_change.clear_password_must_change"
|
||||
)
|
||||
|
||||
sm.reset_password("5", "temp-password")
|
||||
|
||||
mock_clear.assert_not_called()
|
||||
|
||||
|
||||
def test_reset_password_self_service_pk_string_clears_flag(
|
||||
mocker: MockerFixture,
|
||||
app_context: None,
|
||||
) -> None:
|
||||
"""Self-service identity holds even if ids arrive as mixed int/str types."""
|
||||
sm = SupersetSecurityManager(appbuilder)
|
||||
mock_g = SimpleNamespace(user=SimpleNamespace(id=5))
|
||||
mocker.patch("superset.security.manager.g", new=mock_g)
|
||||
mocker.patch(
|
||||
"flask_appbuilder.security.manager.BaseSecurityManager.reset_password",
|
||||
return_value=None,
|
||||
)
|
||||
mock_clear = mocker.patch(
|
||||
"superset.security.password_change.clear_password_must_change"
|
||||
)
|
||||
|
||||
sm.reset_password("5", "new-password")
|
||||
|
||||
# Coerced to int when clearing, regardless of the inbound id type.
|
||||
mock_clear.assert_called_once_with(5)
|
||||
|
||||
214
tests/unit_tests/security/test_password_change.py
Normal file
214
tests/unit_tests/security/test_password_change.py
Normal file
@@ -0,0 +1,214 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from superset.security.password_change import (
|
||||
_get_user_attribute,
|
||||
_is_exempt_endpoint,
|
||||
password_change_required,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"endpoint,expected",
|
||||
[
|
||||
(None, True), # static file serving etc.
|
||||
("ResetMyPasswordView.this_form_get", True),
|
||||
("AuthDBView.login", True),
|
||||
("AuthDBView.logout", True),
|
||||
("appbuilder.static", True),
|
||||
("UserInfoEditView.this_form_post", True),
|
||||
("AuthOAuthView.login", True),
|
||||
("SomeBlueprint.static", True),
|
||||
("health", True),
|
||||
("SupersetIndexView.index", False),
|
||||
("Superset.dashboard", False),
|
||||
# Substring over-matching must NOT exempt these (they merely share a
|
||||
# substring with an exempt token).
|
||||
("AuthorView.list", False),
|
||||
("HealthDashboardView.show", False),
|
||||
("StaticAssetReportView.list", False),
|
||||
("UserInfoFancyView.show", False),
|
||||
],
|
||||
)
|
||||
def test_is_exempt_endpoint(endpoint, expected) -> None:
|
||||
# The password-reset / auth / static endpoints must stay reachable to avoid
|
||||
# a redirect loop while a change is pending.
|
||||
assert _is_exempt_endpoint(endpoint) is expected
|
||||
|
||||
|
||||
def test_password_change_required() -> None:
|
||||
user = MagicMock()
|
||||
user.id = 5
|
||||
|
||||
with patch(
|
||||
"superset.security.password_change._get_user_attribute"
|
||||
) as mock_get_attr:
|
||||
mock_get_attr.return_value = MagicMock(password_must_change=True)
|
||||
assert password_change_required(user) is True
|
||||
|
||||
mock_get_attr.return_value = MagicMock(password_must_change=False)
|
||||
assert password_change_required(user) is False
|
||||
|
||||
mock_get_attr.return_value = None
|
||||
assert password_change_required(user) is False
|
||||
|
||||
|
||||
def test_password_change_required_no_user_id() -> None:
|
||||
user = MagicMock()
|
||||
user.id = None
|
||||
assert password_change_required(user) is False
|
||||
|
||||
|
||||
def test_get_user_attribute_deterministic_with_duplicates() -> None:
|
||||
# The ``user_attribute`` table does not enforce uniqueness on ``user_id``,
|
||||
# so duplicate rows are possible. The query must not raise (which
|
||||
# ``.one_or_none()`` would have done via ``MultipleResultsFound``); it must
|
||||
# fetch a single row deterministically via ``order_by(id).first()``.
|
||||
query = MagicMock()
|
||||
db = MagicMock()
|
||||
db.session.query.return_value = query
|
||||
query.filter.return_value = query
|
||||
query.order_by.return_value = query
|
||||
sentinel = MagicMock(name="first_row")
|
||||
query.first.return_value = sentinel
|
||||
|
||||
with (
|
||||
patch("superset.extensions.db", db),
|
||||
patch("superset.models.user_attributes.UserAttribute") as user_attribute,
|
||||
):
|
||||
result = _get_user_attribute(5)
|
||||
|
||||
# Deterministic ordering on the primary key, then ``.first()`` — never
|
||||
# ``.one_or_none()``, which could 500 on duplicate rows.
|
||||
query.order_by.assert_called_once_with(user_attribute.id)
|
||||
query.first.assert_called_once_with()
|
||||
assert not query.one_or_none.called
|
||||
assert result is sentinel
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enforcement_app():
|
||||
"""A minimal Flask app with the enforcement hook registered and a flagged
|
||||
user, used to exercise the before-request redirect behavior end to end."""
|
||||
from flask import Flask, g
|
||||
|
||||
from superset.security.password_change import (
|
||||
register_password_change_enforcement,
|
||||
)
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config["ENABLE_FORCE_PASSWORD_CHANGE"] = True
|
||||
app.secret_key = "test" # noqa: S105
|
||||
|
||||
user = MagicMock()
|
||||
user.id = 5
|
||||
user.is_anonymous = False
|
||||
|
||||
@app.before_request
|
||||
def _set_user() -> None: # pylint: disable=unused-variable
|
||||
g.user = user
|
||||
|
||||
# A non-exempt route that, if redirected to, would re-trigger enforcement.
|
||||
@app.route("/")
|
||||
def index() -> str: # pylint: disable=unused-variable
|
||||
return "index"
|
||||
|
||||
register_password_change_enforcement(app)
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _no_babel_flash():
|
||||
"""The minimal test app has no babel/flash messaging set up; stub them so
|
||||
the enforcement hook's translation + flash calls don't blow up. These are
|
||||
incidental to the redirect-target logic under test."""
|
||||
with (
|
||||
patch("superset.security.password_change.flash"),
|
||||
patch("superset.security.password_change.__", side_effect=lambda s: s),
|
||||
):
|
||||
yield
|
||||
|
||||
|
||||
def test_enforcement_redirects_to_reset_view(enforcement_app) -> None:
|
||||
# Happy path: the reset endpoint resolves, so flagged users are redirected
|
||||
# there (an exempt route) — no loop.
|
||||
with (
|
||||
patch(
|
||||
"superset.security.password_change.password_change_required",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"superset.security.password_change.url_for",
|
||||
return_value="/resetmypassword/form",
|
||||
),
|
||||
):
|
||||
resp = enforcement_app.test_client().get("/")
|
||||
assert resp.status_code == 302
|
||||
assert resp.headers["Location"].endswith("/resetmypassword/form")
|
||||
|
||||
|
||||
def test_enforcement_falls_back_to_exempt_logout_not_index(enforcement_app) -> None:
|
||||
# If the reset endpoint can't be resolved, the fallback must be an exempt
|
||||
# route (logout) — never "/" / the index, which would loop. We make the
|
||||
# reset endpoint fail and the logout endpoint resolve.
|
||||
def fake_url_for(endpoint: str, *args, **kwargs) -> str:
|
||||
if endpoint == "ResetMyPasswordView.this_form_get":
|
||||
raise RuntimeError("no such endpoint")
|
||||
if endpoint == "AuthDBView.logout":
|
||||
return "/logout"
|
||||
raise AssertionError(f"unexpected endpoint {endpoint}")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"superset.security.password_change.password_change_required",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"superset.security.password_change.url_for",
|
||||
side_effect=fake_url_for,
|
||||
),
|
||||
):
|
||||
resp = enforcement_app.test_client().get("/")
|
||||
assert resp.status_code == 302
|
||||
location = resp.headers["Location"]
|
||||
assert location.endswith("/logout")
|
||||
# Crucially, the fallback is NOT a redirect back to the non-exempt index.
|
||||
assert not location.endswith("/")
|
||||
|
||||
|
||||
def test_enforcement_no_resolvable_target_returns_error_not_loop(
|
||||
enforcement_app,
|
||||
) -> None:
|
||||
# If NO exempt target can be resolved, we must return an error response
|
||||
# rather than redirect, so the flagged user can never get stuck in a loop.
|
||||
with (
|
||||
patch(
|
||||
"superset.security.password_change.password_change_required",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"superset.security.password_change.url_for",
|
||||
side_effect=RuntimeError("no endpoints"),
|
||||
),
|
||||
):
|
||||
resp = enforcement_app.test_client().get("/")
|
||||
assert resp.status_code == 503
|
||||
assert "Location" not in resp.headers
|
||||
Reference in New Issue
Block a user