# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """Tests for celery_task.py - async SQL execution via Celery.""" from typing import Any from unittest.mock import MagicMock import msgpack import pytest from celery.exceptions import SoftTimeLimitExceeded from flask import current_app from pytest_mock import MockerFixture from superset.common.db_query_status import QueryStatus as QueryStatusEnum from superset.errors import ErrorLevel, SupersetError, SupersetErrorType from superset.exceptions import SupersetErrorException, SupersetErrorsException # Note: mock_query, mock_database, mock_result_set, and mock_db_session # fixtures are imported from conftest.py # ============================================================================= # Query Retrieval Tests # ============================================================================= def test_get_query_success( mocker: MockerFixture, app_context: None, mock_query: MagicMock ) -> None: """Test successful query retrieval.""" from superset.sql.execution.celery_task import _get_query mock_session = mocker.patch("superset.sql.execution.celery_task.db.session") mock_session.query.return_value.filter_by.return_value.one.return_value = mock_query result = _get_query(123) assert result == mock_query # ============================================================================= # Error Handling Tests # ============================================================================= def test_handle_query_error_basic( mocker: MockerFixture, app_context: None, mock_query: MagicMock ) -> None: """Test basic error handling.""" from superset.sql.execution.celery_task import _handle_query_error mocker.patch("superset.sql.execution.celery_task.db.session") ex = Exception("Something went wrong") payload = _handle_query_error(ex, mock_query) assert payload["status"] == QueryStatusEnum.FAILED assert "Something went wrong" in payload["error"] def test_handle_query_error_with_end_time_set( mocker: MockerFixture, app_context: None, mock_query: MagicMock ) -> None: """Test error handling when end_time is already set (line 116->120).""" from superset.sql.execution.celery_task import _handle_query_error mocker.patch("superset.sql.execution.celery_task.db.session") # Set end_time to trigger the branch skip mock_query.end_time = 12345.0 ex = Exception("Error with end_time set") payload = _handle_query_error(ex, mock_query) assert payload["status"] == QueryStatusEnum.FAILED # end_time should not be modified assert mock_query.end_time == 12345.0 def test_handle_query_error_sets_end_time( mocker: MockerFixture, app_context: None, mock_query: MagicMock ) -> None: """Test error handling sets end_time when not set.""" from superset.sql.execution.celery_task import _handle_query_error mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch( "superset.sql.execution.celery_task.now_as_float", return_value=99999.0 ) # end_time is None mock_query.end_time = None ex = Exception("Error") _handle_query_error(ex, mock_query) # Should set end_time assert mock_query.end_time == 99999.0 def test_handle_query_error_superset_error_exception( mocker: MockerFixture, app_context: None, mock_query: MagicMock ) -> None: """Test error handling with SupersetErrorException.""" from superset.sql.execution.celery_task import _handle_query_error mocker.patch("superset.sql.execution.celery_task.db.session") error = SupersetError( message="Test error", error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR, level=ErrorLevel.ERROR, ) ex = SupersetErrorException(error) payload = _handle_query_error(ex, mock_query) assert len(payload["errors"]) == 1 assert payload["errors"][0]["message"] == "Test error" def test_handle_query_error_superset_errors_exception( mocker: MockerFixture, app_context: None, mock_query: MagicMock ) -> None: """Test error handling with SupersetErrorsException.""" from superset.sql.execution.celery_task import _handle_query_error mocker.patch("superset.sql.execution.celery_task.db.session") errors = [ SupersetError( message="Error 1", error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR, level=ErrorLevel.ERROR, ), SupersetError( message="Error 2", error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR, level=ErrorLevel.ERROR, ), ] ex = SupersetErrorsException(errors) payload = _handle_query_error(ex, mock_query) assert len(payload["errors"]) == 2 def test_handle_query_error_with_troubleshooting_link( mocker: MockerFixture, app_context: None, mock_query: MagicMock ) -> None: """Test error handling includes troubleshooting link.""" from superset.sql.execution.celery_task import _handle_query_error mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch.dict( current_app.config, {"TROUBLESHOOTING_LINK": "https://help.example.com"} ) ex = Exception("Error") payload = _handle_query_error(ex, mock_query) assert payload["link"] == "https://help.example.com" # ============================================================================= # Serialization Tests # ============================================================================= def test_serialize_payload_json(mocker: MockerFixture, app_context: None) -> None: """Test JSON serialization when msgpack config is False.""" from superset.sql.execution.celery_task import _serialize_payload mocker.patch("superset.results_backend_use_msgpack", False) payload = {"status": "success", "data": [1, 2, 3]} result = _serialize_payload(payload) # Now always returns bytes (encoded UTF-8) assert isinstance(result, bytes) assert b"success" in result def test_serialize_payload_msgpack(mocker: MockerFixture, app_context: None) -> None: """Test msgpack serialization when msgpack config is True.""" from superset.sql.execution.celery_task import _serialize_payload mocker.patch("superset.results_backend_use_msgpack", True) payload = {"status": "success", "data": [1, 2, 3]} result = _serialize_payload(payload) assert isinstance(result, bytes) unpacked = msgpack.loads(result) assert unpacked["status"] == "success" # ============================================================================= # Statement Preparation Tests # ============================================================================= def test_prepare_statement_blocks_single_statement( app_context: None, mock_database: MagicMock ) -> None: """Test statement block preparation for single statement.""" from superset.sql.execution.celery_task import _prepare_statement_blocks sql = "SELECT * FROM users" script, blocks = _prepare_statement_blocks(sql, mock_database.db_engine_spec) assert len(blocks) == 1 def test_prepare_statement_blocks_multiple_statements( app_context: None, mock_database: MagicMock ) -> None: """Test statement block preparation for multiple statements.""" from superset.sql.execution.celery_task import _prepare_statement_blocks sql = "SELECT * FROM users; SELECT * FROM orders;" script, blocks = _prepare_statement_blocks(sql, mock_database.db_engine_spec) assert len(blocks) == 2 def test_prepare_statement_blocks_run_as_one( app_context: None, mock_database: MagicMock ) -> None: """Test statement block preparation when engine runs multiple as one.""" from superset.sql.execution.celery_task import _prepare_statement_blocks mock_database.db_engine_spec.run_multiple_statements_as_one = True sql = "SELECT * FROM users; SELECT * FROM orders;" script, blocks = _prepare_statement_blocks(sql, mock_database.db_engine_spec) assert len(blocks) == 1 # ============================================================================= # Result Finalization Tests # ============================================================================= def test_finalize_successful_query( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_result_set: MagicMock, mock_database: MagicMock, ) -> None: """Test successful query finalization.""" from superset.sql.execution.celery_task import _finalize_successful_query from superset.sql.parse import SQLScript mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch("superset.dataframe.df_to_records", return_value=[{"id": 1}]) payload: dict[str, Any] = {} # Create original script original_script = SQLScript( "SELECT * FROM users", mock_database.db_engine_spec.engine ) # New signature: (query, original_script, execution_results, payload, # total_execution_time_ms). execution_results is a list of tuples: # (executed_sql, result_set, exec_time, rowcount) execution_results = [ ("SELECT * FROM users WHERE rls_filter", mock_result_set, 10.5, 2) ] _finalize_successful_query( mock_query, original_script, execution_results, # type: ignore[arg-type] payload, 10.5, ) assert mock_query.rows == 2 assert mock_query.progress == 100 assert payload["status"] == QueryStatusEnum.SUCCESS assert "statements" in payload # SQL is formatted by SQLScript, so we can't compare exact whitespace assert "SELECT" in payload["statements"][0]["original_sql"] assert "FROM users" in payload["statements"][0]["original_sql"] assert ( payload["statements"][0]["executed_sql"] == "SELECT * FROM users WHERE rls_filter" ) def test_finalize_successful_query_with_msgpack( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_result_set: MagicMock, mock_database: MagicMock, ) -> None: """Test successful query finalization with Arrow/msgpack.""" from superset.sql.execution.celery_task import _finalize_successful_query from superset.sql.parse import SQLScript mocker.patch("superset.results_backend_use_msgpack", True) mock_buffer = MagicMock() mock_buffer.to_pybytes.return_value = b"arrow_data" mocker.patch( "superset.sql.execution.celery_task.write_ipc_buffer", return_value=mock_buffer ) # Mock stats_logger to cover the stats timing branch mock_stats = MagicMock() mocker.patch.dict(current_app.config, {"STATS_LOGGER": mock_stats}) payload: dict[str, Any] = {} # Create original script original_script = SQLScript( "SELECT * FROM users", mock_database.db_engine_spec.engine ) execution_results = [ ("SELECT * FROM users WHERE rls_filter", mock_result_set, 10.5, 2) ] _finalize_successful_query( mock_query, original_script, execution_results, # type: ignore[arg-type] payload, 10.5, ) assert payload["statements"][0]["data"] == b"arrow_data" def test_finalize_successful_query_msgpack_no_stats( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_result_set: MagicMock, mock_database: MagicMock, ) -> None: """Test finalization with msgpack when has_app_context() is False.""" from superset.sql.execution.celery_task import _finalize_successful_query from superset.sql.parse import SQLScript mocker.patch("superset.results_backend_use_msgpack", True) mocker.patch( "superset.sql.execution.celery_task.has_app_context", return_value=False ) mock_buffer = MagicMock() mock_buffer.to_pybytes.return_value = b"arrow_data" mocker.patch( "superset.sql.execution.celery_task.write_ipc_buffer", return_value=mock_buffer ) payload: dict[str, Any] = {} # Create original script original_script = SQLScript( "SELECT * FROM users", mock_database.db_engine_spec.engine ) execution_results = [ ("SELECT * FROM users WHERE rls_filter", mock_result_set, 10.5, 2) ] _finalize_successful_query( mock_query, original_script, execution_results, # type: ignore[arg-type] payload, 10.5, ) assert payload["statements"][0]["data"] == b"arrow_data" def test_finalize_successful_query_with_dml( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, ) -> None: """Test successful query finalization with DML statement (no result_set).""" from superset.sql.execution.celery_task import _finalize_successful_query from superset.sql.parse import SQLScript payload: dict[str, Any] = {} # Create original script original_script = SQLScript( "INSERT INTO users VALUES (1)", mock_database.db_engine_spec.engine ) # DML statement: result_set is None, rowcount indicates affected rows execution_results: list[tuple[str, None, float, int]] = [ ("INSERT INTO users VALUES (1)", None, 5.0, 1) ] _finalize_successful_query( mock_query, original_script, execution_results, # type: ignore[arg-type] payload, 5.0, ) assert mock_query.rows == 0 # No rows returned for DML assert mock_query.progress == 100 assert payload["status"] == QueryStatusEnum.SUCCESS assert payload["statements"][0]["data"] is None assert payload["statements"][0]["row_count"] == 1 assert payload["statements"][0]["columns"] == [] # ============================================================================= # Results Storage Tests # ============================================================================= def test_store_results_in_backend_success( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, ) -> None: """Test successful results storage.""" from superset.sql.execution.celery_task import _store_results_in_backend mock_results_backend = MagicMock() mock_results_backend.set.return_value = True mocker.patch( "superset.sql.execution.celery_task.results_backend", mock_results_backend ) mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch( "superset.sql.execution.celery_task.zlib_compress", return_value=b"compressed" ) mocker.patch("superset.sql.execution.celery_task.db.session") payload = {"status": "success", "data": [], "query": {}} _store_results_in_backend(mock_query, payload, mock_database) assert mock_query.results_key is not None mock_results_backend.set.assert_called_once() def test_store_results_in_backend_with_size_check( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, ) -> None: """Test results storage with payload size check (covers lines 232-247).""" from superset.sql.execution.celery_task import _store_results_in_backend mock_results_backend = MagicMock() mock_results_backend.set.return_value = True mocker.patch( "superset.sql.execution.celery_task.results_backend", mock_results_backend ) mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch( "superset.sql.execution.celery_task.zlib_compress", return_value=b"compressed" ) mocker.patch("superset.sql.execution.celery_task.db.session") # Set a high payload max to pass the size check mocker.patch.dict(current_app.config, {"SQLLAB_PAYLOAD_MAX_MB": 100}) payload = {"status": "success", "data": [], "query": {}} _store_results_in_backend(mock_query, payload, mock_database) mock_results_backend.set.assert_called_once() def test_store_results_in_backend_payload_too_large( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, ) -> None: """Test results storage with payload exceeding size limit.""" from superset.sql.execution.celery_task import _store_results_in_backend mocker.patch("superset.results_backend_use_msgpack", False) # Set very low limit mocker.patch.dict(current_app.config, {"SQLLAB_PAYLOAD_MAX_MB": 0.000001}) large_payload = {"data": "x" * 1000, "query": {}} with pytest.raises(SupersetErrorException) as exc_info: _store_results_in_backend(mock_query, large_payload, mock_database) assert exc_info.value.error.error_type == SupersetErrorType.RESULT_TOO_LARGE_ERROR def test_store_results_in_backend_default_cache_timeout( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, ) -> None: """Test storage uses default cache timeout when database timeout is None.""" from superset.sql.execution.celery_task import _store_results_in_backend mock_results_backend = MagicMock() mock_results_backend.set.return_value = True mocker.patch( "superset.sql.execution.celery_task.results_backend", mock_results_backend ) mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch( "superset.sql.execution.celery_task.zlib_compress", return_value=b"compressed" ) mocker.patch("superset.sql.execution.celery_task.db.session") # Set database cache_timeout to None mock_database.cache_timeout = None payload = {"status": "success", "data": [], "query": {}} _store_results_in_backend(mock_query, payload, mock_database) mock_results_backend.set.assert_called_once() def test_store_results_in_backend_write_failure( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, ) -> None: """Test results storage write failure.""" from superset.sql.execution.celery_task import _store_results_in_backend mock_results_backend = MagicMock() mock_results_backend.set.return_value = False mocker.patch( "superset.sql.execution.celery_task.results_backend", mock_results_backend ) mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch( "superset.sql.execution.celery_task.zlib_compress", return_value=b"compressed" ) mocker.patch("superset.sql.execution.celery_task.db.session") payload = {"status": "success", "data": [], "query": {}} with pytest.raises(SupersetErrorException) as exc_info: _store_results_in_backend(mock_query, payload, mock_database) assert exc_info.value.error.error_type == SupersetErrorType.RESULTS_BACKEND_ERROR # ============================================================================= # Data Serialization Tests # ============================================================================= def test_serialize_result_set_msgpack( mocker: MockerFixture, app_context: None, mock_result_set: MagicMock ) -> None: """Test result set serialization with msgpack/Arrow when config is True.""" from superset.sql.execution.celery_task import _serialize_result_set mocker.patch("superset.results_backend_use_msgpack", True) mock_buffer = MagicMock() mock_buffer.to_pybytes.return_value = b"arrow_data" mocker.patch( "superset.sql.execution.celery_task.write_ipc_buffer", return_value=mock_buffer ) mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) data, columns = _serialize_result_set(mock_result_set) assert data == b"arrow_data" assert columns == mock_result_set.columns def test_serialize_result_set_json( mocker: MockerFixture, app_context: None, mock_result_set: MagicMock ) -> None: """Test result set serialization with JSON when msgpack config is False.""" from superset.sql.execution.celery_task import _serialize_result_set mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch( "superset.dataframe.df_to_records", return_value=[{"id": 1, "name": "Alice"}], ) data, columns = _serialize_result_set(mock_result_set) assert data == [{"id": 1, "name": "Alice"}] assert columns == mock_result_set.columns # ============================================================================= # Helper Function Tests # ============================================================================= @pytest.mark.parametrize( "query_status,expected_result", [ (QueryStatusEnum.STOPPED, True), (QueryStatusEnum.RUNNING, False), ], ) def test_make_check_stopped_fn( mocker: MockerFixture, app_context: None, mock_query: MagicMock, query_status: QueryStatusEnum, expected_result: bool, ) -> None: """Test check_stopped function returns correct value based on query status.""" from superset.sql.execution.celery_task import _make_check_stopped_fn mocker.patch("superset.sql.execution.celery_task.db.session") mock_query.status = query_status check_stopped = _make_check_stopped_fn(mock_query) result = check_stopped() assert result is expected_result def test_make_execute_fn( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, ) -> None: """Test execute function creation.""" from superset.sql.execution.celery_task import _make_execute_fn mock_cursor = MagicMock() mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) execute_fn = _make_execute_fn(mock_query, mock_database.db_engine_spec) execute_fn(mock_cursor, "SELECT * FROM users") assert mock_query.executed_sql == "SELECT * FROM users" @pytest.mark.parametrize( "logger_configured,should_be_called", [ (True, True), (False, False), ], ids=["with_logger", "no_logger"], ) def test_make_log_query_fn( mocker: MockerFixture, app_context: None, mock_database: MagicMock, logger_configured: bool, should_be_called: bool, ) -> None: """Test log query function with and without logger configured.""" from superset.sql.execution.celery_task import _make_log_query_fn mock_logger = MagicMock() if logger_configured else None mocker.patch.dict(current_app.config, {"QUERY_LOGGER": mock_logger}) mocker.patch("superset.sql.execution.celery_task.security_manager", MagicMock()) log_fn = _make_log_query_fn(mock_database) log_fn("SELECT * FROM users", "public") if should_be_called: assert mock_logger is not None mock_logger.assert_called_once() # If no logger, the function should complete without error # ============================================================================= # Main Task Execution Tests # ============================================================================= def test_execute_sql_task_success( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, mock_result_set: MagicMock, ) -> None: """Test successful execute_sql_task (covers lines 339-352, 400-473).""" from superset.sql.execution.celery_task import execute_sql_task from .conftest import setup_mock_raw_connection mock_query.database = mock_database mock_query.status = QueryStatusEnum.PENDING mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) # execute_sql_with_cursor returns (exec_sql, result_set, time, rowcount) mocker.patch( "superset.sql.execution.celery_task.execute_sql_with_cursor", return_value=[("SELECT * FROM users", mock_result_set, 10.5, 2)], ) mocker.patch("superset.sql.execution.celery_task.results_backend", None) mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.dataframe.df_to_records", return_value=[]) mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) setup_mock_raw_connection(mock_database) result = execute_sql_task(123, "SELECT * FROM users", username="admin") assert result is not None # Success returns payload assert result["status"] == QueryStatusEnum.SUCCESS assert mock_query.status == QueryStatusEnum.SUCCESS def test_execute_sql_task_with_start_time( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, mock_result_set: MagicMock, ) -> None: """Test execute_sql_task accepts start_time parameter (covers line 400-402).""" import time from superset.sql.execution.celery_task import execute_sql_task mock_query.database = mock_database mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) # execute_sql_with_cursor returns (exec_sql, result_set, time, rowcount) mocker.patch( "superset.sql.execution.celery_task.execute_sql_with_cursor", return_value=[("SELECT * FROM users", mock_result_set, 10.5, 2)], ) mocker.patch("superset.sql.execution.celery_task.results_backend", None) mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.dataframe.df_to_records", return_value=[]) mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) from .conftest import setup_mock_raw_connection setup_mock_raw_connection(mock_database) start_time = time.time() - 1.0 result = execute_sql_task(123, "SELECT * FROM users", start_time=start_time) # Verify task completes successfully with start_time assert result is not None assert result["status"] == QueryStatusEnum.SUCCESS def test_execute_sql_task_with_cancel_query_id( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, mock_result_set: MagicMock, ) -> None: """Test execute_sql_task sets cancel_query_id when available.""" from superset.sql.execution.celery_task import execute_sql_task mock_query.database = mock_database mock_database.db_engine_spec.get_cancel_query_id.return_value = "cancel_123" mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) # execute_sql_with_cursor returns (exec_sql, result_set, time, rowcount) mocker.patch( "superset.sql.execution.celery_task.execute_sql_with_cursor", return_value=[("SELECT * FROM users", mock_result_set, 10.5, 2)], ) mocker.patch("superset.sql.execution.celery_task.results_backend", None) mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.dataframe.df_to_records", return_value=[]) mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) from .conftest import setup_mock_raw_connection setup_mock_raw_connection(mock_database) execute_sql_task(123, "SELECT * FROM users") # Verify cancel_query_id was set via db_engine_spec mock_database.db_engine_spec.get_cancel_query_id.assert_called_once() mock_query.set_extra_json_key.assert_any_call("cancel_query", "cancel_123") def test_execute_sql_task_stopped( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, ) -> None: """Test execute_sql_task when query is stopped (covers lines 456-458).""" from superset.sql.execution.celery_task import execute_sql_task mock_query.database = mock_database mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) # Empty list indicates stopped (check_stopped_fn returned True mid-execution) mocker.patch( "superset.sql.execution.celery_task.execute_sql_with_cursor", return_value=[], ) mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) from .conftest import setup_mock_raw_connection setup_mock_raw_connection(mock_database) result = execute_sql_task(123, "SELECT * FROM users") assert result["status"] == QueryStatusEnum.STOPPED def test_execute_sql_task_with_mutation( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, mock_result_set: MagicMock, ) -> None: """Test execute_sql_task commits for mutations (covers lines 461-462).""" from superset.sql.execution.celery_task import execute_sql_task mock_query.database = mock_database mock_query.select_as_cta = True # Trigger mutation commit mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) # execute_sql_with_cursor returns (exec_sql, result_set, time, rowcount) mocker.patch( "superset.sql.execution.celery_task.execute_sql_with_cursor", return_value=[("INSERT INTO users VALUES (1)", mock_result_set, 5.0, 1)], ) mocker.patch("superset.sql.execution.celery_task.results_backend", None) mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.dataframe.df_to_records", return_value=[]) mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) from .conftest import setup_mock_raw_connection mock_conn = setup_mock_raw_connection(mock_database) execute_sql_task(123, "INSERT INTO users VALUES (1)") mock_conn.commit.assert_called() def test_execute_sql_task_with_results_backend( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, mock_result_set: MagicMock, ) -> None: """Test execute_sql_task stores results in backend (covers lines 466-467).""" from superset.sql.execution.celery_task import execute_sql_task mock_query.database = mock_database mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) # execute_sql_with_cursor returns (exec_sql, result_set, time, rowcount) mocker.patch( "superset.sql.execution.celery_task.execute_sql_with_cursor", return_value=[("SELECT * FROM users", mock_result_set, 10.5, 2)], ) mock_results_backend = MagicMock() mock_results_backend.set.return_value = True mocker.patch( "superset.sql.execution.celery_task.results_backend", mock_results_backend ) mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch( "superset.sql.execution.celery_task.zlib_compress", return_value=b"data" ) mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.dataframe.df_to_records", return_value=[]) mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) from .conftest import setup_mock_raw_connection setup_mock_raw_connection(mock_database) result = execute_sql_task(123, "SELECT * FROM users") mock_results_backend.set.assert_called_once() assert result is not None assert result["status"] == QueryStatusEnum.SUCCESS def test_execute_sql_task_timeout( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, ) -> None: """Test execute_sql_task handles timeout (covers lines 438-453).""" from superset.sql.execution.celery_task import execute_sql_task mock_query.database = mock_database mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) mocker.patch( "superset.sql.execution.celery_task.execute_sql_with_cursor", side_effect=SoftTimeLimitExceeded(), ) mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict( current_app.config, {"STATS_LOGGER": MagicMock(), "SQLLAB_ASYNC_TIME_LIMIT_SEC": 300}, ) from .conftest import setup_mock_raw_connection setup_mock_raw_connection(mock_database) result = execute_sql_task(123, "SELECT * FROM users") # TIMED_OUT status is preserved (not overwritten to FAILED) assert result["status"] == QueryStatusEnum.TIMED_OUT.value def test_execute_sql_task_unhandled_exception( mocker: MockerFixture, app_context: None, mock_query: MagicMock, ) -> None: """Test execute_sql_task handles unhandled exceptions (covers lines 347-352).""" from superset.sql.execution.celery_task import execute_sql_task # Mock _get_query to succeed first time (for override_user), then return mock_query mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) mocker.patch( "superset.sql.execution.celery_task._execute_sql_statements", side_effect=Exception("Unexpected error"), ) mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) result = execute_sql_task(123, "SELECT * FROM users") assert result["status"] == QueryStatusEnum.FAILED def test_execute_sql_task_success_final_commit( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, mock_result_set: MagicMock, ) -> None: """Test execute_sql_task final success path (covers lines 469-473).""" from superset.sql.execution.celery_task import execute_sql_task mock_query.database = mock_database mock_query.status = QueryStatusEnum.RUNNING # Will be changed to SUCCESS mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) # execute_sql_with_cursor returns (exec_sql, result_set, time, rowcount) mocker.patch( "superset.sql.execution.celery_task.execute_sql_with_cursor", return_value=[("SELECT * FROM users", mock_result_set, 10.5, 2)], ) mocker.patch("superset.sql.execution.celery_task.results_backend", None) mocker.patch("superset.results_backend_use_msgpack", False) mock_session = mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.dataframe.df_to_records", return_value=[]) mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) from .conftest import setup_mock_raw_connection setup_mock_raw_connection(mock_database) result = execute_sql_task(123, "SELECT * FROM users") assert result is not None assert result["status"] == QueryStatusEnum.SUCCESS assert mock_query.status == QueryStatusEnum.SUCCESS mock_session.commit.assert_called() def test_execute_sql_task_with_failed_status_before_final_commit( mocker: MockerFixture, app_context: None, mock_query: MagicMock, mock_database: MagicMock, mock_result_set: MagicMock, ) -> None: """Test execute_sql_task final commit when query.status is already FAILED.""" from superset.sql.execution.celery_task import execute_sql_task mock_query.database = mock_database mocker.patch( "superset.sql.execution.celery_task._get_query", return_value=mock_query ) # execute_sql_with_cursor returns (exec_sql, result_set, time, rowcount) mocker.patch( "superset.sql.execution.celery_task.execute_sql_with_cursor", return_value=[("SELECT * FROM users", mock_result_set, 10.5, 2)], ) # Mock _store_results_in_backend to set status to FAILED without raising def mock_store_results(query, payload, database): query.status = QueryStatusEnum.FAILED mocker.patch("superset.sql.execution.celery_task.results_backend", MagicMock()) mocker.patch("superset.results_backend_use_msgpack", False) mocker.patch( "superset.sql.execution.celery_task._store_results_in_backend", side_effect=mock_store_results, ) mocker.patch("superset.sql.execution.celery_task.db.session") mocker.patch("superset.dataframe.df_to_records", return_value=[]) mocker.patch("superset.sql.execution.celery_task.security_manager") mocker.patch.dict(current_app.config, {"STATS_LOGGER": MagicMock()}) from .conftest import setup_mock_raw_connection setup_mock_raw_connection(mock_database) result = execute_sql_task(123, "SELECT * FROM users") # Verify query status remains FAILED and is not changed to SUCCESS assert result is not None assert mock_query.status == QueryStatusEnum.FAILED