From d8dd2d99b3f3e2a37ea922a9f598dc5dd1cc4387 Mon Sep 17 00:00:00 2001 From: jesperct Date: Fri, 1 May 2026 20:24:59 -0300 Subject: [PATCH] fix(time-comparison): use chart row_limit instead of instance config in offset queries (#39490) --- superset/models/helpers.py | 14 +- .../integration_tests/query_context_tests.py | 226 ++++++++++++++--- .../common/test_query_context_processor.py | 227 ++++++++++++++++++ 3 files changed, 433 insertions(+), 34 deletions(-) diff --git a/superset/models/helpers.py b/superset/models/helpers.py index 89d3fb8b219..965692e2248 100644 --- a/superset/models/helpers.py +++ b/superset/models/helpers.py @@ -1585,11 +1585,17 @@ class ExploreMixin: # pylint: disable=too-many-public-methods for metric in metric_names } - # When the original query has limit or offset we wont apply those - # to the subquery so we prevent data inconsistency due to missing records - # in the dataframes when performing the join + # The subquery drops row_offset (the offset period's own row ordering + # differs from the main query's, so applying the same offset would skew + # the join). It must still fetch enough rows to cover the main query's + # window, hence row_limit + row_offset when a chart limit is set. if query_object.row_limit or query_object.row_offset: - query_object_clone_dct["row_limit"] = app.config["ROW_LIMIT"] + if query_object.row_limit: + query_object_clone_dct["row_limit"] = ( + query_object.row_limit + query_object.row_offset + ) + else: + query_object_clone_dct["row_limit"] = app.config["ROW_LIMIT"] query_object_clone_dct["row_offset"] = 0 # Call the unified query method on the datasource diff --git a/tests/integration_tests/query_context_tests.py b/tests/integration_tests/query_context_tests.py index 7e2352532d0..4ab741cbb81 100644 --- a/tests/integration_tests/query_context_tests.py +++ b/tests/integration_tests/query_context_tests.py @@ -23,7 +23,6 @@ from unittest.mock import Mock, patch import numpy as np import pandas as pd import pytest -from flask import current_app from pandas import DateOffset from superset import db @@ -43,6 +42,7 @@ from superset.utils.core import ( QueryStatus, ) from superset.utils.pandas_postprocessing.utils import FLAT_COLUMN_SEPARATOR +from tests.conftest import with_config from tests.integration_tests.base_tests import SupersetTestCase from tests.integration_tests.conftest import ( only_postgresql, @@ -68,6 +68,130 @@ def get_sql_text(payload: dict[str, Any]) -> str: return response["query"] +def _time_comparison_offset_queries_payload() -> dict[str, Any]: + """Birth-names chart payload with time comparison and x-axis suitable for tests.""" + payload = get_query_context("birth_names") + payload["queries"][0]["columns"] = [ + { + "timeGrain": "P1D", + "columnType": "BASE_AXIS", + "sqlExpression": "ds", + "label": "ds", + "expressionType": "SQL", + } + ] + payload["queries"][0]["metrics"] = ["sum__num"] + payload["queries"][0]["groupby"] = ["name"] + payload["queries"][0]["is_timeseries"] = True + payload["queries"][0]["time_range"] = "1990 : 1991" + return payload + + +@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") +@patch("superset.common.query_context.QueryContext.get_query_result") +def test_time_offset_comparison_queries_use_chart_row_limit( + query_result_mock: Mock, +) -> None: + """Comparison SQL covers the main query's window (row_limit + row_offset).""" + payload = _time_comparison_offset_queries_payload() + payload["queries"][0]["row_limit"] = 100 + payload["queries"][0]["row_offset"] = 10 + + initial_df = pd.DataFrame( + { + "__timestamp": ["1990-01-01", "1990-01-01"], + "name": ["zban", "ahwb"], + "sum__num": [43571, 27225], + } + ) + mock_query_result = Mock() + mock_query_result.df = initial_df + query_result_mock.side_effect = [mock_query_result] + + query_context = ChartDataQueryContextSchema().load(payload) + query_object = query_context.queries[0] + df = query_context.get_query_result(query_object).df + + payload["queries"][0]["time_offsets"] = ["1 year ago", "1 year later"] + query_context = ChartDataQueryContextSchema().load(payload) + query_object = query_context.queries[0] + + def cache_key_fn(qo: QueryObject, time_offset: str, time_grain: Any) -> str | None: + return query_context._processor.query_cache_key( + qo, time_offset=time_offset, time_grain=time_grain + ) + + def cache_timeout_fn() -> int: + return query_context._processor.get_cache_timeout() + + time_offsets_obj = query_context.datasource.processing_time_offsets( + df, query_object, cache_key_fn, cache_timeout_fn, query_context.force + ) + sqls = time_offsets_obj["queries"] + assert len(sqls) == 2 + assert re.search(r"1989-01-01.+1990-01-01", sqls[0], re.S) + assert re.search(r"LIMIT 110", sqls[0], re.S) + assert not re.search(r"OFFSET 10", sqls[0], re.S) + assert re.search(r"1991-01-01.+1992-01-01", sqls[1], re.S) + assert re.search(r"LIMIT 110", sqls[1], re.S) + assert not re.search(r"OFFSET 10", sqls[1], re.S) + + +@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") +@with_config({"ROW_LIMIT": 4242}) +@patch("superset.common.query_context.QueryContext.get_query_result") +def test_time_offset_comparison_queries_use_config_row_limit_without_chart_limit( + query_result_mock: Mock, +) -> None: + """Chart with row_offset only: subquery widens the config ROW_LIMIT by row_offset. + + The schema fills `row_limit` with `app.config["ROW_LIMIT"]` when the payload + omits it, so the query_object arrives with row_limit=4242. The subquery then + covers the window via row_limit + row_offset = 4252. + """ + payload = _time_comparison_offset_queries_payload() + del payload["queries"][0]["row_limit"] + payload["queries"][0]["row_offset"] = 10 + + initial_df = pd.DataFrame( + { + "__timestamp": ["1990-01-01", "1990-01-01"], + "name": ["zban", "ahwb"], + "sum__num": [43571, 27225], + } + ) + mock_query_result = Mock() + mock_query_result.df = initial_df + query_result_mock.side_effect = [mock_query_result] + + query_context = ChartDataQueryContextSchema().load(payload) + query_object = query_context.queries[0] + df = query_context.get_query_result(query_object).df + + payload["queries"][0]["time_offsets"] = ["1 year ago", "1 year later"] + query_context = ChartDataQueryContextSchema().load(payload) + query_object = query_context.queries[0] + + def cache_key_fn(qo: QueryObject, time_offset: str, time_grain: Any) -> str | None: + return query_context._processor.query_cache_key( + qo, time_offset=time_offset, time_grain=time_grain + ) + + def cache_timeout_fn() -> int: + return query_context._processor.get_cache_timeout() + + time_offsets_obj = query_context.datasource.processing_time_offsets( + df, query_object, cache_key_fn, cache_timeout_fn, query_context.force + ) + sqls = time_offsets_obj["queries"] + limit_pattern = re.compile(r"LIMIT\s+4252\b") + assert len(sqls) == 2 + assert limit_pattern.search(sqls[0]) + assert not re.search(r"OFFSET 10", sqls[0], re.S) + assert limit_pattern.search(sqls[1]) + assert not re.search(r"OFFSET 10", sqls[1], re.S) + + @pytest.mark.skip( reason=( "TODO: Fix test class to work with DuckDB example data format. " @@ -794,28 +918,17 @@ class TestQueryContext(SupersetTestCase): @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") @patch("superset.common.query_context.QueryContext.get_query_result") - def test_time_offsets_in_query_object_no_limit(self, query_result_mock): + def test_time_offsets_in_query_object_uses_chart_row_limit(self, query_result_mock): """ - Ensure that time_offsets can generate the correct queries and - it doesnt use the row_limit nor row_offset from the original - query object + Subquery honors the chart's row_limit (widened by row_offset so the + LEFT JOIN covers the main query's paginated window) and drops + row_offset. Before this fix, row_limit was replaced with + app.config["ROW_LIMIT"], which caused the main query and offset + subquery to fetch different row counts. """ - payload = get_query_context("birth_names") - payload["queries"][0]["columns"] = [ - { - "timeGrain": "P1D", - "columnType": "BASE_AXIS", - "sqlExpression": "ds", - "label": "ds", - "expressionType": "SQL", - } - ] - payload["queries"][0]["metrics"] = ["sum__num"] - payload["queries"][0]["groupby"] = ["name"] - payload["queries"][0]["is_timeseries"] = True + payload = _time_comparison_offset_queries_payload() payload["queries"][0]["row_limit"] = 100 payload["queries"][0]["row_offset"] = 10 - payload["queries"][0]["time_range"] = "1990 : 1991" initial_data = { "__timestamp": ["1990-01-01", "1990-01-01"], @@ -839,33 +952,86 @@ class TestQueryContext(SupersetTestCase): query_context = ChartDataQueryContextSchema().load(payload) query_object = query_context.queries[0] - def cache_key_fn(qo, time_offset, time_grain): + def cache_key_fn( + qo: QueryObject, time_offset: str, time_grain: Any + ) -> str | None: return query_context._processor.query_cache_key( qo, time_offset=time_offset, time_grain=time_grain ) - def cache_timeout_fn(): + def cache_timeout_fn() -> int: return query_context._processor.get_cache_timeout() time_offsets_obj = query_context.datasource.processing_time_offsets( df, query_object, cache_key_fn, cache_timeout_fn, query_context.force ) sqls = time_offsets_obj["queries"] - row_limit_value = current_app.config["ROW_LIMIT"] - row_limit_pattern_with_config_value = r"LIMIT " + re.escape( - str(row_limit_value) - ) assert len(sqls) == 2 - # 1 year ago + # 1 year ago — subquery widens row_limit to cover main window (100 + 10) assert re.search(r"1989-01-01.+1990-01-01", sqls[0], re.S) - assert not re.search(r"LIMIT 100", sqls[0], re.S) + assert re.search(r"LIMIT 110", sqls[0], re.S) assert not re.search(r"OFFSET 10", sqls[0], re.S) - assert re.search(row_limit_pattern_with_config_value, sqls[0], re.S) # 1 year later assert re.search(r"1991-01-01.+1992-01-01", sqls[1], re.S) - assert not re.search(r"LIMIT 100", sqls[1], re.S) + assert re.search(r"LIMIT 110", sqls[1], re.S) + assert not re.search(r"OFFSET 10", sqls[1], re.S) + + @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") + @with_config({"ROW_LIMIT": 4242}) + @patch("superset.common.query_context.QueryContext.get_query_result") + def test_time_offsets_use_config_row_limit_when_chart_has_offset_only( + self, query_result_mock + ): + """ + Chart with row_offset only: subquery widens the config ROW_LIMIT by row_offset. + + The schema fills row_limit with app.config["ROW_LIMIT"] (4242) when the + payload omits it, so the subquery covers the window via 4242 + 10 = 4252. + """ + payload = _time_comparison_offset_queries_payload() + del payload["queries"][0]["row_limit"] + payload["queries"][0]["row_offset"] = 10 + + initial_data = { + "__timestamp": ["1990-01-01", "1990-01-01"], + "name": ["zban", "ahwb"], + "sum__num": [43571, 27225], + } + initial_df = pd.DataFrame(initial_data) + + mock_query_result = Mock() + mock_query_result.df = initial_df + query_result_mock.side_effect = [mock_query_result] + + query_context = ChartDataQueryContextSchema().load(payload) + query_object = query_context.queries[0] + query_result = query_context.get_query_result(query_object) + df = query_result.df + + payload["queries"][0]["time_offsets"] = ["1 year ago", "1 year later"] + query_context = ChartDataQueryContextSchema().load(payload) + query_object = query_context.queries[0] + + def cache_key_fn( + qo: QueryObject, time_offset: str, time_grain: Any + ) -> str | None: + return query_context._processor.query_cache_key( + qo, time_offset=time_offset, time_grain=time_grain + ) + + def cache_timeout_fn() -> int: + return query_context._processor.get_cache_timeout() + + time_offsets_obj = query_context.datasource.processing_time_offsets( + df, query_object, cache_key_fn, cache_timeout_fn, query_context.force + ) + sqls = time_offsets_obj["queries"] + limit_pattern = re.compile(r"LIMIT\s+4252\b") + assert len(sqls) == 2 + assert limit_pattern.search(sqls[0]) + assert not re.search(r"OFFSET 10", sqls[0], re.S) + assert limit_pattern.search(sqls[1]) assert not re.search(r"OFFSET 10", sqls[1], re.S) - assert re.search(row_limit_pattern_with_config_value, sqls[1], re.S) def test_get_label_map(app_context, virtual_dataset_comma_in_column_value): diff --git a/tests/unit_tests/common/test_query_context_processor.py b/tests/unit_tests/common/test_query_context_processor.py index 3bf558072b1..8e33b4d448f 100644 --- a/tests/unit_tests/common/test_query_context_processor.py +++ b/tests/unit_tests/common/test_query_context_processor.py @@ -700,6 +700,233 @@ def test_processing_time_offsets_date_range_enabled(processor): assert isinstance(result["cache_keys"], list) +def test_processing_time_offsets_uses_chart_row_limit(processor): + """Offset subquery inherits the chart's row_limit when one is set.""" + from superset.common.query_object import QueryObject + from superset.models.helpers import ExploreMixin + + processor._qc_datasource.processing_time_offsets = ( + ExploreMixin.processing_time_offsets.__get__(processor._qc_datasource) + ) + + df = pd.DataFrame({"__timestamp": ["1990-01-01"], "sum__num": [100]}) + + query_object = QueryObject( + datasource=MagicMock(), + granularity="ds", + columns=[], + metrics=["sum__num"], + is_timeseries=True, + row_limit=100, + row_offset=0, + time_offsets=["1 year ago"], + filters=[ + { + "col": "ds", + "op": "TEMPORAL_RANGE", + "val": "1990-01-01 : 1991-01-01", + } + ], + ) + + captured: list[dict[str, Any]] = [] + + def fake_query(dct: dict[str, Any]) -> MagicMock: + captured.append(dct) + result = MagicMock() + result.df = pd.DataFrame() + result.query = "SELECT 1" + return result + + processor._qc_datasource.query = fake_query + processor._qc_datasource.normalize_df = MagicMock(return_value=pd.DataFrame()) + + with ( + patch( + "superset.models.helpers.get_since_until_from_query_object", + return_value=(pd.Timestamp("1990-01-01"), pd.Timestamp("1991-01-01")), + ), + patch( + "superset.common.utils.query_cache_manager.QueryCacheManager" + ) as mock_cache_manager, + patch.object( + processor._qc_datasource, + "get_time_grain", + return_value=None, + ), + patch.object( + processor._qc_datasource, + "join_offset_dfs", + return_value=df, + ), + ): + mock_cache = MagicMock() + mock_cache.is_loaded = False + mock_cache_manager.get.return_value = mock_cache + + processor._qc_datasource.processing_time_offsets( + df, query_object, None, None, False + ) + + assert len(captured) == 1 + assert captured[0]["row_limit"] == 100 + assert captured[0]["row_offset"] == 0 + + +def test_processing_time_offsets_row_offset_extends_window(processor): + """Offset subquery limit covers the main query's window (row_limit + row_offset). + + When the chart has pagination (row_offset > 0), fetching only row_limit rows + in the offset period would likely miss the dimensions present in the main + query's page, yielding null comparison values. The subquery instead drops + row_offset and widens row_limit to cover the full window. + """ + from superset.common.query_object import QueryObject + from superset.models.helpers import ExploreMixin + + processor._qc_datasource.processing_time_offsets = ( + ExploreMixin.processing_time_offsets.__get__(processor._qc_datasource) + ) + + df = pd.DataFrame({"__timestamp": ["1990-01-01"], "sum__num": [100]}) + + query_object = QueryObject( + datasource=MagicMock(), + granularity="ds", + columns=[], + metrics=["sum__num"], + is_timeseries=True, + row_limit=100, + row_offset=10, + time_offsets=["1 year ago"], + filters=[ + { + "col": "ds", + "op": "TEMPORAL_RANGE", + "val": "1990-01-01 : 1991-01-01", + } + ], + ) + + captured: list[dict[str, Any]] = [] + + def fake_query(dct: dict[str, Any]) -> MagicMock: + captured.append(dct) + result = MagicMock() + result.df = pd.DataFrame() + result.query = "SELECT 1" + return result + + processor._qc_datasource.query = fake_query + processor._qc_datasource.normalize_df = MagicMock(return_value=pd.DataFrame()) + + with ( + patch( + "superset.models.helpers.get_since_until_from_query_object", + return_value=(pd.Timestamp("1990-01-01"), pd.Timestamp("1991-01-01")), + ), + patch( + "superset.common.utils.query_cache_manager.QueryCacheManager" + ) as mock_cache_manager, + patch.object( + processor._qc_datasource, + "get_time_grain", + return_value=None, + ), + patch.object( + processor._qc_datasource, + "join_offset_dfs", + return_value=df, + ), + ): + mock_cache = MagicMock() + mock_cache.is_loaded = False + mock_cache_manager.get.return_value = mock_cache + + processor._qc_datasource.processing_time_offsets( + df, query_object, None, None, False + ) + + assert len(captured) == 1 + assert captured[0]["row_limit"] == 110 + assert captured[0]["row_offset"] == 0 + + +def test_processing_time_offsets_falls_back_to_config_row_limit(processor): + """Offset subquery uses app config ROW_LIMIT when chart has offset but no limit.""" + from superset.common.query_object import QueryObject + from superset.models.helpers import ExploreMixin + + processor._qc_datasource.processing_time_offsets = ( + ExploreMixin.processing_time_offsets.__get__(processor._qc_datasource) + ) + + df = pd.DataFrame({"__timestamp": ["1990-01-01"], "sum__num": [100]}) + + query_object = QueryObject( + datasource=MagicMock(), + granularity="ds", + columns=[], + metrics=["sum__num"], + is_timeseries=True, + row_limit=None, + row_offset=10, + time_offsets=["1 year ago"], + filters=[ + { + "col": "ds", + "op": "TEMPORAL_RANGE", + "val": "1990-01-01 : 1991-01-01", + } + ], + ) + + captured: list[dict[str, Any]] = [] + + def fake_query(dct: dict[str, Any]) -> MagicMock: + captured.append(dct) + result = MagicMock() + result.df = pd.DataFrame() + result.query = "SELECT 1" + return result + + processor._qc_datasource.query = fake_query + processor._qc_datasource.normalize_df = MagicMock(return_value=pd.DataFrame()) + + with ( + patch( + "superset.models.helpers.get_since_until_from_query_object", + return_value=(pd.Timestamp("1990-01-01"), pd.Timestamp("1991-01-01")), + ), + patch( + "superset.common.utils.query_cache_manager.QueryCacheManager" + ) as mock_cache_manager, + patch.object( + processor._qc_datasource, + "get_time_grain", + return_value=None, + ), + patch.object( + processor._qc_datasource, + "join_offset_dfs", + return_value=df, + ), + patch("superset.models.helpers.app") as mock_app, + ): + mock_app.config = {"ROW_LIMIT": 4242} + mock_cache = MagicMock() + mock_cache.is_loaded = False + mock_cache_manager.get.return_value = mock_cache + + processor._qc_datasource.processing_time_offsets( + df, query_object, None, None, False + ) + + assert len(captured) == 1 + assert captured[0]["row_limit"] == 4242 + assert captured[0]["row_offset"] == 0 + + def test_ensure_totals_available_updates_cache_values(): """ Test that ensure_totals_available() updates the query objects AND