Files
superset2/tests/unit_tests/views/datasource/views_test.py
Shaitan a93e319716 fix(datasource): align access validation in legacy views (#38647)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Daniel Vaz Gaspar <danielvazgaspar@gmail.com>
2026-03-25 14:56:59 +00:00

315 lines
11 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Unit tests for resource-level authorization in superset/views/datasource/views.py.
Tests use ``inspect.unwrap`` to call the underlying view logic directly,
bypassing the Flask-AppBuilder permission decorator machinery.
"""
import inspect
from unittest.mock import MagicMock, patch
import pytest
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetSecurityException
from superset.utils import json as superset_json
def _security_exception() -> SupersetSecurityException:
return SupersetSecurityException(
SupersetError(
message="Access denied",
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
level=ErrorLevel.WARNING,
)
)
def _get_view_func(name: str):
"""Return the unwrapped body of a Datasource view method."""
from superset.views.datasource.views import Datasource
return inspect.unwrap(getattr(Datasource, name))
def _view_self() -> MagicMock:
"""Create a minimal stand-in for a Datasource view instance."""
self = MagicMock()
self.json_response = MagicMock(return_value="ok")
return self
# ---------------------------------------------------------------------------
# Datasource.get
# ---------------------------------------------------------------------------
@patch("superset.views.datasource.views.security_manager", new_callable=MagicMock)
@patch("superset.views.datasource.views.DatasourceDAO.get_datasource")
def test_get_raises_when_access_denied(
mock_get_datasource: MagicMock,
mock_security_manager: MagicMock,
) -> None:
"""raise_for_access is called and propagates for unauthorised callers."""
mock_datasource = MagicMock()
mock_get_datasource.return_value = mock_datasource
mock_security_manager.raise_for_access.side_effect = _security_exception()
raw_get = _get_view_func("get")
with pytest.raises(SupersetSecurityException):
raw_get(_view_self(), "table", 1)
mock_security_manager.raise_for_access.assert_called_once_with(
datasource=mock_datasource
)
@patch("superset.views.datasource.views.sanitize_datasource_data")
@patch("superset.views.datasource.views.security_manager", new_callable=MagicMock)
@patch("superset.views.datasource.views.DatasourceDAO.get_datasource")
def test_get_succeeds_for_authorised_user(
mock_get_datasource: MagicMock,
mock_security_manager: MagicMock,
mock_sanitize: MagicMock,
) -> None:
"""raise_for_access is called without raising; sanitized data is returned."""
mock_datasource = MagicMock()
mock_datasource.data = {"id": 1}
mock_get_datasource.return_value = mock_datasource
mock_security_manager.raise_for_access.return_value = None
mock_sanitize.return_value = {"id": 1}
view = _view_self()
raw_get = _get_view_func("get")
raw_get(view, "table", 1)
mock_security_manager.raise_for_access.assert_called_once_with(
datasource=mock_datasource
)
view.json_response.assert_called_once_with({"id": 1})
# ---------------------------------------------------------------------------
# Datasource.external_metadata
# ---------------------------------------------------------------------------
@patch("superset.views.datasource.views.security_manager", new_callable=MagicMock)
@patch("superset.views.datasource.views.DatasourceDAO.get_datasource")
def test_external_metadata_raises_when_access_denied(
mock_get_datasource: MagicMock,
mock_security_manager: MagicMock,
) -> None:
mock_datasource = MagicMock()
mock_get_datasource.return_value = mock_datasource
mock_security_manager.raise_for_access.side_effect = _security_exception()
raw_fn = _get_view_func("external_metadata")
with pytest.raises(SupersetSecurityException):
raw_fn(_view_self(), "table", 1)
mock_security_manager.raise_for_access.assert_called_once_with(
datasource=mock_datasource
)
@patch("superset.views.datasource.views.security_manager", new_callable=MagicMock)
@patch("superset.views.datasource.views.DatasourceDAO.get_datasource")
def test_external_metadata_succeeds_for_authorised_user(
mock_get_datasource: MagicMock,
mock_security_manager: MagicMock,
) -> None:
mock_datasource = MagicMock()
mock_datasource.external_metadata.return_value = [{"name": "col1"}]
mock_get_datasource.return_value = mock_datasource
mock_security_manager.raise_for_access.return_value = None
view = _view_self()
raw_fn = _get_view_func("external_metadata")
raw_fn(view, "table", 1)
mock_security_manager.raise_for_access.assert_called_once_with(
datasource=mock_datasource
)
view.json_response.assert_called_once_with([{"name": "col1"}])
# ---------------------------------------------------------------------------
# Datasource.external_metadata_by_name
# ---------------------------------------------------------------------------
@patch("superset.views.datasource.views.security_manager", new_callable=MagicMock)
@patch("superset.views.datasource.views.SqlaTable.get_datasource_by_name")
@patch("superset.views.datasource.views.ExternalMetadataSchema")
def test_external_metadata_by_name_known_datasource_raises_when_access_denied(
mock_schema_cls: MagicMock,
mock_get_by_name: MagicMock,
mock_security_manager: MagicMock,
) -> None:
"""When a datasource exists, raise_for_access(datasource=...) is enforced."""
params = {
"database_name": "mydb",
"schema_name": "public",
"table_name": "private_table",
}
mock_schema_cls.return_value.load.return_value = params
mock_datasource = MagicMock()
mock_get_by_name.return_value = mock_datasource
mock_security_manager.raise_for_access.side_effect = _security_exception()
raw_fn = _get_view_func("external_metadata_by_name")
with pytest.raises(SupersetSecurityException):
raw_fn(_view_self(), rison=params)
mock_security_manager.raise_for_access.assert_called_once_with(
datasource=mock_datasource
)
@patch("superset.views.datasource.views.security_manager", new_callable=MagicMock)
@patch("superset.views.datasource.views.SqlaTable.get_datasource_by_name")
@patch("superset.views.datasource.views.ExternalMetadataSchema")
@patch("superset.views.datasource.views.db")
def test_external_metadata_by_name_no_datasource_raises_when_access_denied(
mock_db: MagicMock,
mock_schema_cls: MagicMock,
mock_get_by_name: MagicMock,
mock_security_manager: MagicMock,
) -> None:
"""When no datasource exists, raise_for_access(database=..., table=...) runs."""
params = {
"database_name": "mydb",
"schema_name": "public",
"table_name": "new_table",
}
mock_schema_cls.return_value.load.return_value = params
mock_get_by_name.return_value = None
mock_database = MagicMock()
mock_db.session.query.return_value.filter_by.return_value.one.return_value = (
mock_database
)
mock_security_manager.raise_for_access.side_effect = _security_exception()
raw_fn = _get_view_func("external_metadata_by_name")
with pytest.raises(SupersetSecurityException):
raw_fn(_view_self(), rison=params)
mock_security_manager.raise_for_access.assert_called_once()
call_kwargs = mock_security_manager.raise_for_access.call_args.kwargs
assert call_kwargs["database"] is mock_database
assert call_kwargs["table"].table == "new_table"
assert call_kwargs["table"].schema == "public"
# ---------------------------------------------------------------------------
# Datasource.save — ownership bypass prevention
# ---------------------------------------------------------------------------
@patch("superset.views.datasource.views.security_manager", new_callable=MagicMock)
@patch("superset.views.datasource.views.DatasourceDAO.get_datasource")
def test_save_always_checks_ownership_even_without_owners_field(
mock_get_datasource: MagicMock,
mock_security_manager: MagicMock,
) -> None:
"""Ownership check runs even when 'owners' is absent from the payload."""
mock_orm = MagicMock()
mock_orm.owner_class = MagicMock() # not None — model supports ownership
mock_get_datasource.return_value = mock_orm
mock_security_manager.raise_for_ownership.side_effect = SupersetSecurityException(
SupersetError(
message="Not an owner",
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
level=ErrorLevel.WARNING,
)
)
from flask import Flask
from superset.commands.dataset.exceptions import DatasetForbiddenError
raw_save = _get_view_func("save")
app = Flask(__name__)
with app.test_request_context(
"/datasource/save/",
method="POST",
data={
"data": superset_json.dumps(
{
"id": 1,
"type": "table",
"database": {"id": 1},
"columns": [],
# 'owners' intentionally omitted
}
)
},
):
with pytest.raises(DatasetForbiddenError):
raw_save(_view_self())
mock_security_manager.raise_for_ownership.assert_called_once_with(mock_orm)
@patch("superset.views.datasource.views.security_manager", new_callable=MagicMock)
@patch("superset.views.datasource.views.DatasourceDAO.get_datasource")
def test_save_non_owner_with_owners_field_is_rejected(
mock_get_datasource: MagicMock,
mock_security_manager: MagicMock,
) -> None:
"""A non-owner cannot use the save endpoint even when supplying an owners list."""
mock_orm = MagicMock()
mock_orm.owner_class = MagicMock()
mock_get_datasource.return_value = mock_orm
mock_security_manager.raise_for_ownership.side_effect = SupersetSecurityException(
SupersetError(
message="Not an owner",
error_type=SupersetErrorType.DATASOURCE_SECURITY_ACCESS_ERROR,
level=ErrorLevel.WARNING,
)
)
from flask import Flask
from superset.commands.dataset.exceptions import DatasetForbiddenError
raw_save = _get_view_func("save")
app = Flask(__name__)
with app.test_request_context(
"/datasource/save/",
method="POST",
data={
"data": superset_json.dumps(
{
"id": 1,
"type": "table",
"database": {"id": 1},
"columns": [],
"owners": [99], # attacker-supplied owners list
}
)
},
):
with pytest.raises(DatasetForbiddenError):
raw_save(_view_self())
mock_security_manager.raise_for_ownership.assert_called_once_with(mock_orm)