# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import annotations import datetime import sqlite3 from unittest.mock import MagicMock import pytest from flask_appbuilder.security.sqla.models import ( Permission, PermissionView, ViewMenu, ) from pytest_mock import MockerFixture from sqlalchemy.orm import Session from superset.commands.database.utils import ( add_perm, add_pvm, add_vm, ping, ) from tests.conftest import with_config @pytest.fixture def mock_engine(mocker: MockerFixture) -> tuple[MagicMock, MagicMock, MagicMock]: mock_connection = mocker.MagicMock() mock_engine = mocker.MagicMock() mock_dialect = mocker.MagicMock() mock_engine.raw_connection.return_value = mock_connection mock_engine.dialect = mock_dialect return mock_engine, mock_connection, mock_dialect @with_config({"TEST_DATABASE_CONNECTION_TIMEOUT": datetime.timedelta(seconds=10)}) def test_ping_success(mock_engine: MockerFixture): """ Test the ``ping`` method. """ mock_engine, mock_connection, mock_dialect = mock_engine mock_dialect.do_ping.return_value = True result = ping(mock_engine) assert result is True mock_engine.raw_connection.assert_called_once() mock_dialect.do_ping.assert_called_once_with(mock_connection) @with_config({"TEST_DATABASE_CONNECTION_TIMEOUT": datetime.timedelta(seconds=10)}) def test_ping_sqlite_exception(mocker: MockerFixture, mock_engine: MockerFixture): """ Test the ``ping`` method when a sqlite3.ProgrammingError is raised. """ mock_engine, mock_connection, mock_dialect = mock_engine mock_dialect.do_ping.side_effect = [sqlite3.ProgrammingError, True] result = ping(mock_engine) assert result is True mock_dialect.do_ping.assert_has_calls( [mocker.call(mock_connection), mocker.call(mock_engine)] ) def test_ping_runtime_exception(mocker: MockerFixture, mock_engine: MockerFixture): """ Test the ``ping`` method when a RuntimeError is raised. """ mock_engine, _, mock_dialect = mock_engine mock_timeout = mocker.patch("superset.commands.database.utils.timeout") mock_timeout.side_effect = RuntimeError("timeout") mock_dialect.do_ping.return_value = True result = ping(mock_engine) assert result is True mock_dialect.do_ping.assert_called_once_with(mock_engine) @pytest.fixture def db_session(mocker: MockerFixture) -> Session: return mocker.MagicMock(spec=Session) def test_add_vm(db_session: Session, mocker: MockerFixture): """ Thest ``add_vm`` when the ViewMenu does not exist. """ sm = mocker.MagicMock() sm.find_view_menu.return_value = None sm.viewmenu_model = ViewMenu result = add_vm(db_session, sm, "new_view_menu") assert result.name == "new_view_menu" sm.find_view_menu.assert_called_once_with("new_view_menu") db_session.add.assert_called_once_with(result) def test_add_vm_existing(db_session: Session, mocker: MockerFixture): """ Thest ``add_vm`` when the ViewMenu already exists. """ mock_vm = mocker.MagicMock() sm = mocker.MagicMock() sm.find_view_menu.return_value = mock_vm result = add_vm(db_session, sm, "existing_view_menu") assert result == mock_vm sm.find_view_menu.assert_called_once_with("existing_view_menu") db_session.add.assert_not_called() def test_add_perm(db_session: Session, mocker: MockerFixture): """ Thest ``add_perm`` when the Permission does not exist. """ sm = mocker.MagicMock() sm.find_permission.return_value = None sm.permission_model = Permission result = add_perm(db_session, sm, "new_perm") assert result.name == "new_perm" sm.find_permission.assert_called_once_with("new_perm") db_session.add.assert_called_once_with(result) def test_add_perm_existing(db_session: Session, mocker: MockerFixture): """ Thest ``add_perm`` when the Permission already exists. """ mock_perm = mocker.MagicMock() sm = mocker.MagicMock() sm.find_permission.return_value = mock_perm result = add_perm(db_session, sm, "existing_perm") assert result == mock_perm sm.find_permission.assert_called_once_with("existing_perm") db_session.add.assert_not_called() def test_add_pvm(db_session: Session, mocker: MockerFixture): """ Thest ``add_pvm`` when the PermissionView does not exist. """ sm = mocker.MagicMock() sm.find_permission_view_menu.return_value = None sm.permissionview_model = PermissionView mock_vm = mocker.MagicMock() mock_perm = mocker.MagicMock() mock_add_vm = mocker.patch("superset.commands.database.utils.add_vm") mock_add_vm.return_value = mock_vm mock_add_perm = mocker.patch("superset.commands.database.utils.add_perm") mock_add_perm.return_value = mock_perm result = add_pvm(db_session, sm, "new_perm", "new_view_menu") assert result is not None assert result.view_menu == mock_vm assert result.permission == mock_perm sm.find_permission_view_menu.assert_called_once_with("new_perm", "new_view_menu") mock_add_vm.assert_called_once_with(db_session, sm, "new_view_menu") mock_add_perm.assert_called_once_with(db_session, sm, "new_perm") db_session.add.assert_called_once_with(result) def test_add_pvm_missing_data(db_session: Session, mocker: MockerFixture): """ Thest ``add_pvm`` when permission_name and view_menu_name are empty. """ sm = mocker.MagicMock() result = add_pvm(db_session, sm, None, None) assert result is None def test_add_pvm_existing(db_session: Session, mocker: MockerFixture): """ Thest ``add_pvm`` when the PermissionView already exists. """ mock_pvm = mocker.MagicMock() sm = mocker.MagicMock() sm.find_permission_view_menu.return_value = mock_pvm result = add_pvm(db_session, sm, "existinf_perm", "existing_vm") assert result == mock_pvm sm.find_permission_view_menu.assert_called_once_with("existinf_perm", "existing_vm") db_session.add.assert_not_called()