# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import os from dataclasses import dataclass from typing import Any, Optional from unittest.mock import MagicMock, patch import pandas as pd import pytest from flask import current_app from pytest_mock import MockerFixture from superset.exceptions import SupersetException from superset.utils.core import ( cast_to_boolean, check_is_safe_zip, DateColumn, generic_find_constraint_name, generic_find_fk_constraint_name, get_datasource_full_name, get_query_source_from_request, get_stacktrace, get_user_agent, is_test, merge_extra_filters, merge_request_params, normalize_dttm_col, parse_boolean_string, parse_js_uri_path_item, QueryObjectFilterClause, QuerySource, remove_extra_adhoc_filters, ) ADHOC_FILTER: QueryObjectFilterClause = { "col": "foo", "op": "==", "val": "bar", } EXTRA_FILTER: QueryObjectFilterClause = { "col": "foo", "op": "==", "val": "bar", "isExtra": True, } @dataclass class MockZipInfo: file_size: int compress_size: int @pytest.mark.parametrize( "original,expected", [ ({"foo": "bar"}, {"foo": "bar"}), ( {"foo": "bar", "adhoc_filters": [ADHOC_FILTER]}, {"foo": "bar", "adhoc_filters": [ADHOC_FILTER]}, ), ( {"foo": "bar", "adhoc_filters": [EXTRA_FILTER]}, {"foo": "bar", "adhoc_filters": []}, ), ( { "foo": "bar", "adhoc_filters": [ADHOC_FILTER, EXTRA_FILTER], }, {"foo": "bar", "adhoc_filters": [ADHOC_FILTER]}, ), ( { "foo": "bar", "adhoc_filters_b": [ADHOC_FILTER, EXTRA_FILTER], }, {"foo": "bar", "adhoc_filters_b": [ADHOC_FILTER]}, ), ( { "foo": "bar", "custom_adhoc_filters": [ ADHOC_FILTER, EXTRA_FILTER, ], }, { "foo": "bar", "custom_adhoc_filters": [ ADHOC_FILTER, EXTRA_FILTER, ], }, ), ], ) def test_remove_extra_adhoc_filters( original: dict[str, Any], expected: dict[str, Any] ) -> None: remove_extra_adhoc_filters(original) assert expected == original def test_is_test(): orig_value = os.getenv("SUPERSET_TESTENV") os.environ["SUPERSET_TESTENV"] = "true" assert is_test() os.environ["SUPERSET_TESTENV"] = "false" assert not is_test() os.environ["SUPERSET_TESTENV"] = "" assert not is_test() if orig_value is not None: os.environ["SUPERSET_TESTENV"] = orig_value @pytest.mark.parametrize( "test_input,expected", [ ("y", True), ("Y", True), ("yes", True), ("True", True), ("t", True), ("true", True), ("On", True), ("on", True), ("1", True), ("n", False), ("N", False), ("no", False), ("False", False), ("f", False), ("false", False), ("Off", False), ("off", False), ("0", False), ("foo", False), (None, False), ], ) def test_parse_boolean_string(test_input: Optional[str], expected: bool): assert parse_boolean_string(test_input) == expected def test_int_values(): assert cast_to_boolean(1) is True assert cast_to_boolean(0) is False assert cast_to_boolean(-1) is True assert cast_to_boolean(42) is True assert cast_to_boolean(0) is False def test_float_values(): assert cast_to_boolean(0.5) is True assert cast_to_boolean(3.14) is True assert cast_to_boolean(-2.71) is True assert cast_to_boolean(0.0) is False def test_string_values(): assert cast_to_boolean("true") is True assert cast_to_boolean("TruE") is True assert cast_to_boolean("false") is False assert cast_to_boolean("FaLsE") is False assert cast_to_boolean("") is False def test_none_value(): assert cast_to_boolean(None) is None def test_boolean_values(): assert cast_to_boolean(True) is True assert cast_to_boolean(False) is False def test_other_values(): assert cast_to_boolean([]) is False assert cast_to_boolean({}) is False assert cast_to_boolean(object()) is False def test_normalize_dttm_col() -> None: """ Tests for the ``normalize_dttm_col`` function. In particular, this covers a regression when Pandas was upgraded from 1.5.3 to 2.0.3 and the behavior of ``pd.to_datetime`` changed. """ df = pd.DataFrame({"__time": ["2017-07-01T00:00:00.000Z"]}) assert ( df.to_markdown() == """ | | __time | |---:|:-------------------------| | 0 | 2017-07-01T00:00:00.000Z | """.strip() ) # in 1.5.3 this would return a datetime64[ns] dtype, but in 2.0.3 we had to # add ``exact=False`` since there is a leftover after parsing the format dttm_cols = (DateColumn("__time", "%Y-%m-%d"),) # the function modifies the dataframe in place normalize_dttm_col(df, dttm_cols) assert df["__time"].astype(str).tolist() == ["2017-07-01"] def test_check_if_safe_zip_success(app_context: None) -> None: """ Test if ZIP files are safe """ ZipFile = MagicMock() # noqa: N806 ZipFile.infolist.return_value = [ MockZipInfo(file_size=1000, compress_size=10), MockZipInfo(file_size=1000, compress_size=10), MockZipInfo(file_size=1000, compress_size=10), MockZipInfo(file_size=1000, compress_size=10), MockZipInfo(file_size=1000, compress_size=10), ] check_is_safe_zip(ZipFile) def test_check_if_safe_zip_high_rate(app_context: None) -> None: """ Test if ZIP files is not highly compressed """ ZipFile = MagicMock() # noqa: N806 ZipFile.infolist.return_value = [ MockZipInfo(file_size=1000, compress_size=1), MockZipInfo(file_size=1000, compress_size=1), MockZipInfo(file_size=1000, compress_size=1), MockZipInfo(file_size=1000, compress_size=1), MockZipInfo(file_size=1000, compress_size=1), ] with pytest.raises(SupersetException): check_is_safe_zip(ZipFile) def test_check_if_safe_zip_hidden_bomb(app_context: None) -> None: """ Test if ZIP file does not contain a big file highly compressed """ ZipFile = MagicMock() # noqa: N806 ZipFile.infolist.return_value = [ MockZipInfo(file_size=1000, compress_size=100), MockZipInfo(file_size=1000, compress_size=100), MockZipInfo(file_size=1000, compress_size=100), MockZipInfo(file_size=1000, compress_size=100), MockZipInfo(file_size=1000 * (1024 * 1024), compress_size=100), ] with pytest.raises(SupersetException): check_is_safe_zip(ZipFile) def test_generic_constraint_name_exists(): # Create a mock SQLAlchemy database object database_mock = MagicMock() # Define the table name and constraint details table_name = "my_table" columns = {"column1", "column2"} referenced_table_name = "other_table" constraint_name = "my_constraint" # Create a mock table object with the same structure table_mock = MagicMock() table_mock.name = table_name table_mock.columns = [MagicMock(name=col) for col in columns] # Create a mock for the referred_table with a name attribute referred_table_mock = MagicMock() referred_table_mock.name = referenced_table_name # Create a mock for the foreign key constraint with a name attribute foreign_key_constraint_mock = MagicMock() foreign_key_constraint_mock.name = constraint_name foreign_key_constraint_mock.referred_table = referred_table_mock foreign_key_constraint_mock.column_keys = list(columns) # Set the foreign key constraint mock as part of the table's constraints table_mock.foreign_key_constraints = [foreign_key_constraint_mock] # Configure the autoload behavior for the database mock database_mock.metadata = MagicMock() database_mock.metadata.tables = {table_name: table_mock} # Mock the sa.Table creation with autoload with patch("superset.utils.core.sa.Table") as table_creation_mock: table_creation_mock.return_value = table_mock result = generic_find_constraint_name( table_name, columns, referenced_table_name, database_mock ) assert result == constraint_name def test_generic_constraint_name_not_found(): # Create a mock SQLAlchemy database object database_mock = MagicMock() # Define the table name and constraint details table_name = "my_table" columns = {"column1", "column2"} referenced_table_name = "other_table" # Create a mock table object with the same structure but no matching constraint table_mock = MagicMock() table_mock.name = table_name table_mock.columns = [MagicMock(name=col) for col in columns] table_mock.foreign_key_constraints = [] # Configure the autoload behavior for the database mock database_mock.metadata = MagicMock() database_mock.metadata.tables = {table_name: table_mock} result = generic_find_constraint_name( table_name, columns, referenced_table_name, database_mock ) assert result is None def test_generic_find_fk_constraint_exists(): insp_mock = MagicMock() table_name = "my_table" columns = {"column1", "column2"} referenced_table_name = "other_table" constraint_name = "my_constraint" # Create a mock for the foreign key constraint as a dictionary constraint_mock = { "name": constraint_name, "referred_table": referenced_table_name, "referred_columns": list(columns), } # Configure the Inspector mock to return the list of foreign key constraints insp_mock.get_foreign_keys.return_value = [constraint_mock] result = generic_find_fk_constraint_name( table_name, columns, referenced_table_name, insp_mock ) assert result == constraint_name def test_generic_find_fk_constraint_none_exist(): insp_mock = MagicMock() table_name = "my_table" columns = {"column1", "column2"} referenced_table_name = "other_table" # Configure the Inspector mock to return the list of foreign key constraints insp_mock.get_foreign_keys.return_value = [] result = generic_find_fk_constraint_name( table_name, columns, referenced_table_name, insp_mock ) assert result is None def test_get_datasource_full_name(): """ Test the `get_datasource_full_name` function. This is used to build permissions, so it doesn't really return the datasource full name. Instead, it returns a fully qualified table name that includes the database name and schema, with each part wrapped in square brackets. """ assert ( get_datasource_full_name("db", "table", "catalog", "schema") == "[db].[catalog].[schema].[table]" ) assert get_datasource_full_name("db", "table", None, None) == "[db].[table]" assert ( get_datasource_full_name("db", "table", None, "schema") == "[db].[schema].[table]" ) assert ( get_datasource_full_name("db", "table", "catalog", None) == "[db].[catalog].[table]" ) @pytest.mark.parametrize( "referrer,expected", [ (None, None), ("https://mysuperset.com/abc", None), ("https://mysuperset.com/superset/dashboard/", QuerySource.DASHBOARD), ("https://mysuperset.com/explore/", QuerySource.CHART), ("https://mysuperset.com/sqllab/", QuerySource.SQL_LAB), ], ) def test_get_query_source_from_request( referrer: str | None, expected: QuerySource | None, mocker: MockerFixture, ) -> None: if referrer: request_mock = mocker.patch("superset.utils.core.request") request_mock.referrer = referrer assert get_query_source_from_request() == expected def test_get_user_agent(mocker: MockerFixture) -> None: database_mock = mocker.MagicMock() database_mock.database_name = "mydb" current_app_mock = mocker.patch("superset.utils.core.current_app") current_app_mock.config = {"USER_AGENT_FUNC": None} assert get_user_agent(database_mock, QuerySource.DASHBOARD) == "Apache Superset", ( "The default user agent should be returned" ) current_app_mock.config["USER_AGENT_FUNC"] = ( lambda database, source: f"{database.database_name} {source.name}" ) assert get_user_agent(database_mock, QuerySource.DASHBOARD) == "mydb DASHBOARD", ( "the custom user agent function result should have been returned" ) def test_merge_extra_filters(): # does nothing if no extra filters form_data = {"A": 1, "B": 2, "c": "test"} expected = {**form_data, "adhoc_filters": [], "applied_time_extras": {}} merge_extra_filters(form_data) assert form_data == expected # empty extra_filters form_data = {"A": 1, "B": 2, "c": "test", "extra_filters": []} expected = { "A": 1, "B": 2, "c": "test", "adhoc_filters": [], "applied_time_extras": {}, } merge_extra_filters(form_data) assert form_data == expected # copy over extra filters into empty filters form_data = { "extra_filters": [ {"col": "a", "op": "in", "val": "someval"}, {"col": "B", "op": "==", "val": ["c1", "c2"]}, ] } expected = { "adhoc_filters": [ { "clause": "WHERE", "comparator": "someval", "expressionType": "SIMPLE", "filterOptionName": "90cfb3c34852eb3bc741b0cc20053b46", "isExtra": True, "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "filterOptionName": "6c178d069965f1c02640661280415d96", "isExtra": True, "operator": "==", "subject": "B", }, ], "applied_time_extras": {}, } merge_extra_filters(form_data) assert form_data == expected # adds extra filters to existing filters form_data = { "extra_filters": [ {"col": "a", "op": "in", "val": "someval"}, {"col": "B", "op": "==", "val": ["c1", "c2"]}, ], "adhoc_filters": [ { "clause": "WHERE", "comparator": ["G1", "g2"], "expressionType": "SIMPLE", "operator": "!=", "subject": "D", } ], } expected = { "adhoc_filters": [ { "clause": "WHERE", "comparator": ["G1", "g2"], "expressionType": "SIMPLE", "operator": "!=", "subject": "D", }, { "clause": "WHERE", "comparator": "someval", "expressionType": "SIMPLE", "filterOptionName": "90cfb3c34852eb3bc741b0cc20053b46", "isExtra": True, "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "filterOptionName": "6c178d069965f1c02640661280415d96", "isExtra": True, "operator": "==", "subject": "B", }, ], "applied_time_extras": {}, } merge_extra_filters(form_data) assert form_data == expected # adds extra filters to existing filters and sets time options form_data = { "extra_filters": [ {"col": "__time_range", "op": "in", "val": "1 year ago :"}, {"col": "__time_col", "op": "in", "val": "birth_year"}, {"col": "__time_grain", "op": "in", "val": "years"}, {"col": "A", "op": "like", "val": "hello"}, ] } expected = { "adhoc_filters": [ { "clause": "WHERE", "comparator": "hello", "expressionType": "SIMPLE", "filterOptionName": "e3cbdd92a2ae23ca92c6d7fca42e36a6", "isExtra": True, "operator": "like", "subject": "A", } ], "time_range": "1 year ago :", "granularity_sqla": "birth_year", "time_grain_sqla": "years", "applied_time_extras": { "__time_range": "1 year ago :", "__time_col": "birth_year", "__time_grain": "years", }, } merge_extra_filters(form_data) assert form_data == expected def test_merge_extra_filters_ignores_empty_filters(): form_data = { "extra_filters": [ {"col": "a", "op": "in", "val": ""}, {"col": "B", "op": "==", "val": []}, ] } expected = {"adhoc_filters": [], "applied_time_extras": {}} merge_extra_filters(form_data) assert form_data == expected def test_merge_extra_filters_ignores_nones(): form_data = { "adhoc_filters": [ { "clause": "WHERE", "comparator": "", "expressionType": "SIMPLE", "operator": "in", "subject": None, } ], "extra_filters": [{"col": "B", "op": "==", "val": []}], } expected = { "adhoc_filters": [ { "clause": "WHERE", "comparator": "", "expressionType": "SIMPLE", "operator": "in", "subject": None, } ], "applied_time_extras": {}, } merge_extra_filters(form_data) assert form_data == expected def test_merge_extra_filters_ignores_equal_filters(): form_data = { "extra_filters": [ {"col": "a", "op": "in", "val": "someval"}, {"col": "B", "op": "==", "val": ["c1", "c2"]}, {"col": "c", "op": "in", "val": ["c1", 1, None]}, ], "adhoc_filters": [ { "clause": "WHERE", "comparator": "someval", "expressionType": "SIMPLE", "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "operator": "==", "subject": "B", }, { "clause": "WHERE", "comparator": ["c1", 1, None], "expressionType": "SIMPLE", "operator": "in", "subject": "c", }, ], } expected = { "adhoc_filters": [ { "clause": "WHERE", "comparator": "someval", "expressionType": "SIMPLE", "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "operator": "==", "subject": "B", }, { "clause": "WHERE", "comparator": ["c1", 1, None], "expressionType": "SIMPLE", "operator": "in", "subject": "c", }, ], "applied_time_extras": {}, } merge_extra_filters(form_data) assert form_data == expected def test_merge_extra_filters_merges_different_val_types(): form_data = { "extra_filters": [ {"col": "a", "op": "in", "val": ["g1", "g2"]}, {"col": "B", "op": "==", "val": ["c1", "c2"]}, ], "adhoc_filters": [ { "clause": "WHERE", "comparator": "someval", "expressionType": "SIMPLE", "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "operator": "==", "subject": "B", }, ], } expected = { "adhoc_filters": [ { "clause": "WHERE", "comparator": "someval", "expressionType": "SIMPLE", "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "operator": "==", "subject": "B", }, { "clause": "WHERE", "comparator": ["g1", "g2"], "expressionType": "SIMPLE", "filterOptionName": "c11969c994b40a83a4ae7d48ff1ea28e", "isExtra": True, "operator": "in", "subject": "a", }, ], "applied_time_extras": {}, } merge_extra_filters(form_data) assert form_data == expected form_data = { "extra_filters": [ {"col": "a", "op": "in", "val": "someval"}, {"col": "B", "op": "==", "val": ["c1", "c2"]}, ], "adhoc_filters": [ { "clause": "WHERE", "comparator": ["g1", "g2"], "expressionType": "SIMPLE", "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "operator": "==", "subject": "B", }, ], } expected = { "adhoc_filters": [ { "clause": "WHERE", "comparator": ["g1", "g2"], "expressionType": "SIMPLE", "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "operator": "==", "subject": "B", }, { "clause": "WHERE", "comparator": "someval", "expressionType": "SIMPLE", "filterOptionName": "90cfb3c34852eb3bc741b0cc20053b46", "isExtra": True, "operator": "in", "subject": "a", }, ], "applied_time_extras": {}, } merge_extra_filters(form_data) assert form_data == expected def test_merge_extra_filters_adds_unequal_lists(): form_data = { "extra_filters": [ {"col": "a", "op": "in", "val": ["g1", "g2", "g3"]}, {"col": "B", "op": "==", "val": ["c1", "c2", "c3"]}, ], "adhoc_filters": [ { "clause": "WHERE", "comparator": ["g1", "g2"], "expressionType": "SIMPLE", "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "operator": "==", "subject": "B", }, ], } expected = { "adhoc_filters": [ { "clause": "WHERE", "comparator": ["g1", "g2"], "expressionType": "SIMPLE", "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2"], "expressionType": "SIMPLE", "operator": "==", "subject": "B", }, { "clause": "WHERE", "comparator": ["g1", "g2", "g3"], "expressionType": "SIMPLE", "filterOptionName": "21cbb68af7b17e62b3b2f75e2190bfd7", "isExtra": True, "operator": "in", "subject": "a", }, { "clause": "WHERE", "comparator": ["c1", "c2", "c3"], "expressionType": "SIMPLE", "filterOptionName": "0a8dcb928f1f4bba97643c6e68d672f1", "isExtra": True, "operator": "==", "subject": "B", }, ], "applied_time_extras": {}, } merge_extra_filters(form_data) assert form_data == expected def test_merge_extra_filters_when_applied_time_extras_predefined(): form_data = {"applied_time_extras": {"__time_range": "Last week"}} merge_extra_filters(form_data) assert form_data == { "applied_time_extras": {"__time_range": "Last week"}, "adhoc_filters": [], } def test_merge_request_params_when_url_params_undefined(): form_data = {"since": "2000", "until": "now"} url_params = {"form_data": form_data, "dashboard_ids": "(1,2,3,4,5)"} merge_request_params(form_data, url_params) assert "url_params" in form_data.keys() assert "dashboard_ids" in form_data["url_params"] assert "form_data" not in form_data.keys() def test_merge_request_params_when_url_params_predefined(): form_data = { "since": "2000", "until": "now", "url_params": {"abc": "123", "dashboard_ids": "(1,2,3)"}, } url_params = {"form_data": form_data, "dashboard_ids": "(1,2,3,4,5)"} merge_request_params(form_data, url_params) assert "url_params" in form_data.keys() assert "abc" in form_data["url_params"] assert url_params["dashboard_ids"] == form_data["url_params"]["dashboard_ids"] def test_parse_js_uri_path_items_eval_undefined(): assert parse_js_uri_path_item("undefined", eval_undefined=True) is None assert parse_js_uri_path_item("null", eval_undefined=True) is None assert "undefined" == parse_js_uri_path_item("undefined") assert "null" == parse_js_uri_path_item("null") def test_parse_js_uri_path_items_unquote(): assert "slashed/name" == parse_js_uri_path_item("slashed%2fname") assert "slashed%2fname" == parse_js_uri_path_item("slashed%2fname", unquote=False) def test_parse_js_uri_path_items_item_optional(): assert parse_js_uri_path_item(None) is None assert parse_js_uri_path_item("item") is not None def test_get_stacktrace(): current_app.config["SHOW_STACKTRACE"] = True try: raise Exception("NONONO!") except Exception: stacktrace = get_stacktrace() assert "NONONO" in stacktrace current_app.config["SHOW_STACKTRACE"] = False try: raise Exception("NONONO!") except Exception: stacktrace = get_stacktrace() assert stacktrace is None