# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # isort:skip_file """Tests for security api methods""" import jwt import pytest from flask.ctx import AppContext from flask_wtf.csrf import generate_csrf from superset import db, security_manager from superset.daos.dashboard import EmbeddedDashboardDAO from superset.models.dashboard import Dashboard from superset.utils.urls import get_url_host from superset.utils import json from tests.conftest import with_config from tests.integration_tests.base_tests import SupersetTestCase from tests.integration_tests.constants import ADMIN_USERNAME, GAMMA_USERNAME from tests.integration_tests.fixtures.birth_names_dashboard import ( load_birth_names_dashboard_with_slices, # noqa: F401 load_birth_names_data, # noqa: F401 ) @pytest.fixture def create_test_roles_with_users(app_context: AppContext): """ Fixture that creates two test roles with specific users, permissions, and groups. """ user1, user2, user3 = [ security_manager.add_user( username=f"test_user_{i}", first_name="Test", last_name=f"User{i}", email=f"test_user_{i}@test.com", role=[], password="password", # noqa: S106 ) for i in range(3) ] test_group = security_manager.add_group( name="test_group_1", label="Test Group 1", description="Test group for role testing", roles=[], ) pvm1 = security_manager.add_permission_view_menu("can_read", "Dashboard") pvm2 = security_manager.add_permission_view_menu("can_write", "Dashboard") pvm3 = security_manager.add_permission_view_menu("can_read", "Chart") test_role_1 = security_manager.add_role("test_role_1", [pvm1, pvm2]) test_role_1.user.append(user1) test_role_1.user.append(user2) test_role_1.groups.append(test_group) test_role_2 = security_manager.add_role("test_role_2", [pvm3]) test_role_2.user.append(user3) db.session.commit() test_data = { "test_role_1": { "role": test_role_1, "user_ids": sorted([user1.id, user2.id]), "permission_ids": sorted([pvm1.id, pvm2.id]), "group_ids": [test_group.id], }, "test_role_2": { "role": test_role_2, "user_ids": [user3.id], "permission_ids": [pvm3.id], "group_ids": [], }, } yield test_data # Cleanup db.session.delete(test_role_1) db.session.delete(test_role_2) db.session.delete(user1) db.session.delete(user2) db.session.delete(user3) db.session.delete(test_group) db.session.commit() @pytest.fixture def inject_test_roles_data(request, create_test_roles_with_users): request.instance.test_roles_data = create_test_roles_with_users class TestSecurityCsrfApi(SupersetTestCase): resource_name = "security" def _assert_get_csrf_token(self): uri = f"api/v1/{self.resource_name}/csrf_token/" response = self.client.get(uri) self.assert200(response) data = json.loads(response.data.decode("utf-8")) assert generate_csrf() == data["result"] def test_get_csrf_token(self): """ Security API: Test get CSRF token """ self.login(ADMIN_USERNAME) self._assert_get_csrf_token() def test_get_csrf_token_gamma(self): """ Security API: Test get CSRF token by gamma """ self.login(GAMMA_USERNAME) self._assert_get_csrf_token() def test_get_csrf_unauthorized(self): """ Security API: Test get CSRF no login """ uri = f"api/v1/{self.resource_name}/csrf_token/" response = self.client.get(uri) self.assert401(response) def test_login(self): """ Security API: Test get login """ uri = f"api/v1/{self.resource_name}/login" response = self.client.post( uri, json={"username": ADMIN_USERNAME, "password": "general", "provider": "db"}, ) assert response.status_code == 200 assert "access_token" in response.json class TestSecurityGuestTokenApi(SupersetTestCase): uri = "api/v1/security/guest_token/" def test_post_guest_token_unauthenticated(self): """ Security API: Cannot create a guest token without authentication """ response = self.client.post(self.uri) self.assert401(response) def test_post_guest_token_unauthorized(self): """ Security API: Cannot create a guest token without authorization """ self.login(GAMMA_USERNAME) response = self.client.post(self.uri) self.assert403(response) @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") def test_post_guest_token_authorized(self): self.dash = db.session.query(Dashboard).filter_by(slug="births").first() self.embedded = EmbeddedDashboardDAO.upsert(self.dash, []) self.login(ADMIN_USERNAME) user = {"username": "bob", "first_name": "Bob", "last_name": "Also Bob"} resource = {"type": "dashboard", "id": str(self.embedded.uuid)} rls_rule = {"dataset": 1, "clause": "1=1"} params = {"user": user, "resources": [resource], "rls": [rls_rule]} response = self.client.post( self.uri, data=json.dumps(params), content_type="application/json" ) self.assert200(response) token = json.loads(response.data)["token"] decoded_token = jwt.decode( token, self.app.config["GUEST_TOKEN_JWT_SECRET"], audience=get_url_host(), algorithms=["HS256"], ) assert user == decoded_token["user"] assert resource == decoded_token["resources"][0] @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") def test_post_guest_token_bad_resources(self): self.login(ADMIN_USERNAME) user = {"username": "bob", "first_name": "Bob", "last_name": "Also Bob"} resource = {"type": "dashboard", "id": "bad-id"} rls_rule = {"dataset": 1, "clause": "1=1"} params = {"user": user, "resources": [resource], "rls": [rls_rule]} response = self.client.post( self.uri, data=json.dumps(params), content_type="application/json" ) self.assert400(response) @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices", scope="class") class TestSecurityGuestTokenApiTokenValidator(SupersetTestCase): uri = "api/v1/security/guest_token/" def _get_guest_token_with_rls(self, rls_rule): dash = db.session.query(Dashboard).filter_by(slug="births").first() self.embedded = EmbeddedDashboardDAO.upsert(dash, []) self.login(ADMIN_USERNAME) user = {"username": "bob", "first_name": "Bob", "last_name": "Also Bob"} resource = {"type": "dashboard", "id": str(self.embedded.uuid)} params = {"user": user, "resources": [resource], "rls": [rls_rule]} return self.client.post( self.uri, data=json.dumps(params), content_type="application/json" ) @with_config({"GUEST_TOKEN_VALIDATOR_HOOK": lambda x: False}) def test_guest_token_validator_hook_denied(self): """ Security API: Test False case from validator - should be 400 """ rls_rule = {"dataset": 1, "clause": "tenant_id=123"} self.assert400(self._get_guest_token_with_rls(rls_rule)) @with_config({"GUEST_TOKEN_VALIDATOR_HOOK": lambda x: True}) def test_guest_token_validator_hook_denied_allowed(self): """ Security API: Test True case from validator - should be 200 """ rls_rule = {"dataset": 1, "clause": "tenant_id=123"} self.assert200(self._get_guest_token_with_rls(rls_rule)) @with_config({"GUEST_TOKEN_VALIDATOR_HOOK": 123}) def test_guest_validator_hook_not_callable(self): """ Security API: Test validator throws exception when isn't callable should be 500 """ rls_rule = {"dataset": 1, "clause": "tenant_id=123"} self.assert500(self._get_guest_token_with_rls(rls_rule)) @with_config({"GUEST_TOKEN_VALIDATOR_HOOK": lambda x: [][0]}) def test_guest_validator_hook_throws_exception(self): """ Security API: Test validator throws exception - should be 500 """ rls_rule = {"dataset": 1, "clause": "tenant_id=123"} self.assert500(self._get_guest_token_with_rls(rls_rule)) @with_config( { "GUEST_TOKEN_VALIDATOR_HOOK": lambda x: len(x["rls"]) == 1 and "tenant_id=" in x["rls"][0]["clause"] } ) def test_guest_validator_hook_real_world_example_positive(self): """ Security API: Test validator real world example, check tenant_id is in clause Positive case """ # Test validator real world example, check tenant_id is in clause # Should be 200. rls_rule = {"dataset": 1, "clause": "tenant_id=123"} self.assert200(self._get_guest_token_with_rls(rls_rule)) @with_config( { "GUEST_TOKEN_VALIDATOR_HOOK": lambda x: len(x["rls"]) == 1 and "tenant_id=" in x["rls"][0]["clause"] } ) def test_guest_validator_hook_real_world_example_negative(self): """ Security API: Test validator real world example, check tenant_id is in clause Negative case """ rls_rule = {} self.assert400(self._get_guest_token_with_rls(rls_rule)) class TestSecurityRolesApi(SupersetTestCase): uri = "api/v1/security/roles/" # noqa: F541 show_uri = "api/v1/security/roles/search/" @with_config({"FAB_ADD_SECURITY_API": True}) def test_get_security_roles_admin(self): """ Security API: Admin should be able to get roles """ self.login(ADMIN_USERNAME) response = self.client.get(self.uri) self.assert200(response) @with_config({"FAB_ADD_SECURITY_API": True}) def test_get_security_roles_gamma(self): """ Security API: Gamma should not be able to get roles """ self.login(GAMMA_USERNAME) response = self.client.get(self.uri) self.assert403(response) @with_config({"FAB_ADD_SECURITY_API": True}) def test_post_security_roles_gamma(self): """ Security API: Gamma should not be able to create roles """ self.login(GAMMA_USERNAME) response = self.client.post( self.uri, data=json.dumps({"name": "new_role"}), content_type="application/json", ) self.assert403(response) @with_config({"FAB_ADD_SECURITY_API": True}) def test_put_security_roles_gamma(self): """ Security API: Gamma shouldnt be able to update roles """ self.login(GAMMA_USERNAME) response = self.client.put( f"{self.uri}1", data=json.dumps({"name": "new_role"}), content_type="application/json", ) self.assert403(response) @with_config({"FAB_ADD_SECURITY_API": True}) def test_delete_security_roles_gamma(self): """ Security API: Gamma shouldnt be able to delete roles """ self.login(GAMMA_USERNAME) response = self.client.delete( f"{self.uri}1", data=json.dumps({"name": "new_role"}), content_type="application/json", ) self.assert403(response) def test_show_roles_admin(self): """ Security API: Admin should be able to show roles with permissions and users """ self.login(ADMIN_USERNAME) response = self.client.get(self.show_uri) self.assert200(response) def test_show_roles_gamma(self): """ Security API: Gamma should not be able to show roles """ self.login(GAMMA_USERNAME) response = self.client.get(self.show_uri) self.assert403(response) @pytest.mark.usefixtures("inject_test_roles_data") def test_get_roles_with_specific_test_data(self): """ Security API: Test roles endpoint with specific test data """ self.login(ADMIN_USERNAME) response = self.client.get(f"{self.show_uri}?q=(page_size:100)") self.assert200(response) data = json.loads(response.data.decode("utf-8")) # Create a mapping of role names to API response api_roles_by_name = {role["name"]: role for role in data["result"]} # Verify test_role_1 assert "test_role_1" in api_roles_by_name, ( f"test_role_1 not found in API response. " f"Available roles: {list(api_roles_by_name.keys())}" ) role1_api = api_roles_by_name["test_role_1"] role1_expected = self.test_roles_data["test_role_1"] assert sorted(role1_api["user_ids"]) == role1_expected["user_ids"] assert sorted(role1_api["permission_ids"]) == role1_expected["permission_ids"] assert sorted(role1_api["group_ids"]) == role1_expected["group_ids"] # Verify test_role_2 assert "test_role_2" in api_roles_by_name, ( f"test_role_2 not found in API response. " f"Available roles: {list(api_roles_by_name.keys())}" ) role2_api = api_roles_by_name["test_role_2"] role2_expected = self.test_roles_data["test_role_2"] assert sorted(role2_api["user_ids"]) == role2_expected["user_ids"] assert sorted(role2_api["permission_ids"]) == role2_expected["permission_ids"] assert role2_api["group_ids"] == role2_expected["group_ids"]