# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import pytest from flask_appbuilder.security.sqla.models import Role, User from pytest_mock import MockFixture from superset.common.query_object import QueryObject from superset.connectors.sqla.models import Database, SqlaTable from superset.exceptions import SupersetSecurityException from superset.extensions import appbuilder from superset.models.slice import Slice from superset.security.manager import SupersetSecurityManager from superset.utils.core import override_user def test_security_manager(app_context: None) -> None: """ Test that the security manager can be built. """ sm = SupersetSecurityManager(appbuilder) assert sm def test_raise_for_access_guest_user( mocker: MockFixture, app_context: None, ) -> None: """ Test that guest user can't modify chart payload. """ sm = SupersetSecurityManager(appbuilder) mocker.patch.object(sm, "is_guest_user", return_value=True) mocker.patch.object(sm, "can_access", return_value=True) query_context = mocker.MagicMock() query_context.slice_.id = 42 stored_metrics = [ { "aggregate": None, "column": None, "datasourceWarning": False, "expressionType": "SQL", "hasCustomLabel": False, "label": "COUNT(*) + 1", "optionName": "metric_ssa1gwimio_cxpyjc7vj3s", "sqlExpression": "COUNT(*) + 1", } ] query_context.slice_.params_dict = { "metrics": stored_metrics, } # normal request query_context.form_data = { "slice_id": 42, "metrics": stored_metrics, } query_context.queries = [QueryObject(metrics=stored_metrics)] # type: ignore sm.raise_for_access(query_context=query_context) # tampered requests query_context.form_data = { "slice_id": 43, "metrics": stored_metrics, } query_context.queries = [QueryObject(metrics=stored_metrics)] # type: ignore with pytest.raises(SupersetSecurityException): sm.raise_for_access(query_context=query_context) tampered_metrics = [ { "aggregate": None, "column": None, "datasourceWarning": False, "expressionType": "SQL", "hasCustomLabel": False, "label": "COUNT(*) + 2", "optionName": "metric_ssa1gwimio_cxpyjc7vj3s", "sqlExpression": "COUNT(*) + 2", } ] query_context.form_data = { "slice_id": 42, "metrics": tampered_metrics, } with pytest.raises(SupersetSecurityException): sm.raise_for_access(query_context=query_context) query_context.form_data = { "slice_id": 42, "metrics": stored_metrics, } query_context.queries = [QueryObject(metrics=tampered_metrics)] # type: ignore with pytest.raises(SupersetSecurityException): sm.raise_for_access(query_context=query_context) def test_raise_for_access_query_default_schema( mocker: MockFixture, app_context: None, ) -> None: """ Test that the DB default schema is used in non-qualified table names. For example, in Postgres, for the following query: > SELECT * FROM foo; We should check that the user has access to the `public` schema, regardless of the schema set in the query. """ sm = SupersetSecurityManager(appbuilder) mocker.patch.object(sm, "can_access_database", return_value=False) mocker.patch.object(sm, "get_schema_perm", return_value="[PostgreSQL].[public]") SqlaTable = mocker.patch("superset.connectors.sqla.models.SqlaTable") SqlaTable.query_datasources_by_name.return_value = [] database = mocker.MagicMock() database.get_default_schema_for_query.return_value = "public" query = mocker.MagicMock() query.database = database query.sql = "SELECT * FROM ab_user" # user has access to `public` schema mocker.patch.object(sm, "can_access", return_value=True) assert ( sm.raise_for_access( # type: ignore database=None, datasource=None, query=query, query_context=None, table=None, viz=None, ) is None ) sm.can_access.assert_called_with("schema_access", "[PostgreSQL].[public]") # type: ignore # user has only access to `secret` schema mocker.patch.object(sm, "can_access", return_value=False) with pytest.raises(SupersetSecurityException) as excinfo: sm.raise_for_access( database=None, datasource=None, query=query, query_context=None, table=None, viz=None, ) assert ( str(excinfo.value) == """You need access to the following tables: `public.ab_user`, `all_database_access` or `all_datasource_access` permission""" ) def test_raise_for_access_chart_for_datasource_permission( mocker: MockFixture, app_context: None, ) -> None: """ Test that the security manager can raise an exception for chart access, when the user does not have access to the chart datasource """ sm = SupersetSecurityManager(appbuilder) session = sm.get_session engine = session.get_bind() Slice.metadata.create_all(engine) # pylint: disable=no-member alpha = User( first_name="Alice", last_name="Doe", email="adoe@example.org", username="admin", roles=[Role(name="Alpha")], ) dataset = SqlaTable( table_name="test_table", metrics=[], main_dttm_col=None, database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"), ) session.add(dataset) session.flush() slice = Slice( id=1, datasource_id=dataset.id, datasource_type="table", datasource_name="tmp_perm_table", slice_name="slice_name", ) session.add(slice) session.flush() mocker.patch.object(sm, "can_access_datasource", return_value=False) with override_user(alpha): with pytest.raises(SupersetSecurityException) as excinfo: sm.raise_for_access( chart=slice, ) assert str(excinfo.value) == "You don't have access to this chart." mocker.patch.object(sm, "can_access_datasource", return_value=True) with override_user(alpha): sm.raise_for_access( chart=slice, ) def test_raise_for_access_chart_on_admin( app_context: None, ) -> None: """ Test that the security manager can raise an exception for chart access, when the user does not have access to the chart datasource """ from flask_appbuilder.security.sqla.models import Role, User from superset.models.slice import Slice from superset.utils.core import override_user sm = SupersetSecurityManager(appbuilder) session = sm.get_session engine = session.get_bind() Slice.metadata.create_all(engine) # pylint: disable=no-member admin = User( first_name="Alice", last_name="Doe", email="adoe@example.org", username="admin", roles=[Role(name="Admin")], ) slice = Slice( id=1, datasource_id=1, datasource_type="table", datasource_name="tmp_perm_table", slice_name="slice_name", ) session.add(slice) session.flush() with override_user(admin): sm.raise_for_access( chart=slice, ) def test_raise_for_access_chart_owner( app_context: None, ) -> None: """ Test that the security manager can raise an exception for chart access, when the user does not have access to the chart datasource """ sm = SupersetSecurityManager(appbuilder) session = sm.get_session engine = session.get_bind() Slice.metadata.create_all(engine) # pylint: disable=no-member alpha = User( first_name="Alice", last_name="Doe", email="adoe@example.org", username="admin", roles=[Role(name="Alpha")], ) slice = Slice( id=1, datasource_id=1, datasource_type="table", datasource_name="tmp_perm_table", slice_name="slice_name", owners=[alpha], ) session.add(slice) session.flush() with override_user(alpha): sm.raise_for_access( chart=slice, )