diff --git a/superset/charts/schemas.py b/superset/charts/schemas.py index e0cee7758c4..711bc382991 100644 --- a/superset/charts/schemas.py +++ b/superset/charts/schemas.py @@ -1497,6 +1497,11 @@ class ChartDataResponseResult(Schema): required=True, allow_none=None, ) + semantic_cache_hit = fields.Boolean( + metadata={"description": "Whether the semantic layer smart cache was used"}, + required=False, + allow_none=True, + ) query = fields.String( metadata={ "description": "The executed query statement. May be absent when " diff --git a/superset/common/query_context_processor.py b/superset/common/query_context_processor.py index 52fc6d24f28..bad78532051 100644 --- a/superset/common/query_context_processor.py +++ b/superset/common/query_context_processor.py @@ -202,6 +202,7 @@ class QueryContextProcessor: "annotation_data": cache.annotation_data, "error": cache.error_message, "is_cached": cache.is_cached, + "semantic_cache_hit": cache.semantic_cache_hit, "query": cache.query, "status": cache.status, "stacktrace": cache.stacktrace, diff --git a/superset/common/utils/query_cache_manager.py b/superset/common/utils/query_cache_manager.py index da2d668e8c9..bd6a4e1f14e 100644 --- a/superset/common/utils/query_cache_manager.py +++ b/superset/common/utils/query_cache_manager.py @@ -69,6 +69,7 @@ class QueryCacheManager: cache_value: dict[str, Any] | None = None, sql_rowcount: int | None = None, queried_dttm: str | None = None, + semantic_cache_hit: bool | None = None, ) -> None: self.df = df self.query = query @@ -86,6 +87,7 @@ class QueryCacheManager: self.cache_value = cache_value self.sql_rowcount = sql_rowcount self.queried_dttm = queried_dttm + self.semantic_cache_hit = semantic_cache_hit # pylint: disable=too-many-arguments def set_query_result( @@ -110,6 +112,7 @@ class QueryCacheManager: self.error_message = query_result.error_message self.df = query_result.df self.sql_rowcount = query_result.sql_rowcount + self.semantic_cache_hit = query_result.semantic_cache_hit self.annotation_data = {} if annotation_data is None else annotation_data self.queried_dttm = ( datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat() @@ -131,6 +134,7 @@ class QueryCacheManager: "rejected_filter_columns": self.rejected_filter_columns, "annotation_data": self.annotation_data, "sql_rowcount": self.sql_rowcount, + "semantic_cache_hit": self.semantic_cache_hit, "queried_dttm": self.queried_dttm, "dttm": self.queried_dttm, # Backwards compatibility } @@ -186,6 +190,9 @@ class QueryCacheManager: query_cache.is_loaded = True query_cache.is_cached = cache_value is not None query_cache.sql_rowcount = cache_value.get("sql_rowcount", None) + query_cache.semantic_cache_hit = cache_value.get( + "semantic_cache_hit", None + ) query_cache.cache_dttm = ( cache_value["dttm"] if cache_value is not None else None ) diff --git a/superset/models/helpers.py b/superset/models/helpers.py index 6634d25910e..dfbf58c50c8 100644 --- a/superset/models/helpers.py +++ b/superset/models/helpers.py @@ -673,6 +673,7 @@ class QueryResult: # pylint: disable=too-few-public-methods errors: Optional[list[dict[str, Any]]] = None, from_dttm: Optional[datetime] = None, to_dttm: Optional[datetime] = None, + semantic_cache_hit: Optional[bool] = None, ) -> None: self.df = df self.query = query @@ -685,6 +686,7 @@ class QueryResult: # pylint: disable=too-few-public-methods self.errors = errors or [] self.from_dttm = from_dttm self.to_dttm = to_dttm + self.semantic_cache_hit = semantic_cache_hit self.sql_rowcount = len(self.df.index) if not self.df.empty else 0 diff --git a/superset/semantic_layers/cache.py b/superset/semantic_layers/cache.py index a57db368c3f..e45c4717482 100644 --- a/superset/semantic_layers/cache.py +++ b/superset/semantic_layers/cache.py @@ -131,6 +131,13 @@ def try_serve_from_cache( if payload is None: # value evicted but index entry survived; drop it continue + if projection_needed and not _projection_input_complete( + entry, payload + ): + # Cached result may be truncated (top-N). Keep the index + # entry alive but skip reuse for projection. + pruned.append(entry) + continue pruned.append(entry) served = _apply_post_processing( payload, query, leftovers, projection_needed @@ -298,7 +305,7 @@ def can_satisfy( # noqa: C901 projection_needed = False elif cached_dim_keys > new_dim_keys: projection_needed = True - if not _projection_allowed(entry, query, new_dim_keys, cached_dim_keys): + if not _projection_allowed(entry, query): return False, set(), False else: return False, set(), False @@ -363,17 +370,12 @@ def can_satisfy( # noqa: C901 def _projection_allowed( entry: CachedEntry, query: SemanticQuery, - new_dim_keys: frozenset[str], - cached_dim_keys: frozenset[str], ) -> bool: """ Gates for the projection path (above and beyond filter containment). """ if any(m.aggregation not in ADDITIVE_AGGREGATIONS for m in query.metrics): return False - # Cached truncation makes the rollup unsafe (we're missing rows). - if entry.limit is not None: - return False if entry.group_limit_key: return False if query.group_limit is not None: @@ -385,6 +387,19 @@ def _projection_allowed( return True +def _projection_input_complete(entry: CachedEntry, payload: SemanticResult) -> bool: + """ + True when a projection source is guaranteed not to be limit-truncated. + + If a cached query had ``limit=N`` and returned exactly ``N`` rows, there might + be additional source rows that were cut off. We only reuse it for projection + when the payload row count is strictly less than ``N``. + """ + if entry.limit is None: + return True + return payload.results.num_rows < entry.limit + + def _filter_col_id(f: Filter) -> str | None: return f.column.id if f.column is not None else None diff --git a/superset/semantic_layers/mapper.py b/superset/semantic_layers/mapper.py index 515a2d38fc9..a4afff69d51 100644 --- a/superset/semantic_layers/mapper.py +++ b/superset/semantic_layers/mapper.py @@ -264,6 +264,8 @@ def map_semantic_result_to_query_result( f"-- {req.type}\n{req.definition}" for req in semantic_result.requests ) + semantic_cache_hit = any(req.type == "cache" for req in semantic_result.requests) + return QueryResult( # Core data df=semantic_result.results.to_pandas(), @@ -284,6 +286,7 @@ def map_semantic_result_to_query_result( # Time range - pass through from original query_object from_dttm=query_object.from_dttm, to_dttm=query_object.to_dttm, + semantic_cache_hit=semantic_cache_hit, ) diff --git a/tests/unit_tests/semantic_layers/cache_integration_test.py b/tests/unit_tests/semantic_layers/cache_integration_test.py index c028c74f84e..432bbdfba6c 100644 --- a/tests/unit_tests/semantic_layers/cache_integration_test.py +++ b/tests/unit_tests/semantic_layers/cache_integration_test.py @@ -293,3 +293,47 @@ def test_projection_skipped_for_avg( get_results(_qo_dims(ds, ["b", "c"])) get_results(_qo_dims(ds, ["b"])) assert impl.get_table.call_count == 2 + + +def test_projection_reuses_when_cached_limit_not_reached( + fake_cache: _InMemoryCache, +) -> None: + impl, ds = _make_view(AggregationType.SUM) + impl.get_table = MagicMock( + return_value=_result_bc( + [("b1", "c1", 5.0), ("b1", "c2", 3.0), ("b2", "c1", 4.0)] + ) + ) + + first = get_results(_qo_dims(ds, ["b", "c"])) + assert impl.get_table.call_count == 1 + assert len(first.df) == 3 + + second = get_results(_qo_dims(ds, ["b"])) + assert impl.get_table.call_count == 1 # served via projection + df = second.df.sort_values("b").reset_index(drop=True) + assert df["b"].tolist() == ["b1", "b2"] + assert df["x"].tolist() == [8.0, 4.0] + + +def test_projection_skips_when_cached_limit_reached( + fake_cache: _InMemoryCache, +) -> None: + impl, ds = _make_view(AggregationType.SUM) + + first_q = _qo_dims(ds, ["b", "c"]) + first_q.row_limit = 3 + second_q = _qo_dims(ds, ["b"]) + + impl.get_table = MagicMock( + side_effect=[ + _result_bc([("b1", "c1", 5.0), ("b1", "c2", 3.0), ("b2", "c1", 4.0)]), + _result_bc([("b1", "c1", 8.0), ("b2", "c1", 4.0)]), + ] + ) + + get_results(first_q) + assert impl.get_table.call_count == 1 + + get_results(second_q) + assert impl.get_table.call_count == 2 # projection skipped; re-executed diff --git a/tests/unit_tests/semantic_layers/cache_test.py b/tests/unit_tests/semantic_layers/cache_test.py index 6a43168d8d6..985649f111f 100644 --- a/tests/unit_tests/semantic_layers/cache_test.py +++ b/tests/unit_tests/semantic_layers/cache_test.py @@ -40,6 +40,7 @@ from superset_core.semantic_layers.types import ( from superset.semantic_layers.cache import ( _apply_post_processing, _implies, + _projection_input_complete, CachedEntry, can_satisfy, shape_key, @@ -635,15 +636,62 @@ def test_projection_rejected_for_avg() -> None: assert ok is False -def test_projection_rejected_when_cached_has_limit() -> None: +def test_projection_with_cached_limit_defers_to_runtime_rowcount_check() -> None: entry, new_q = _projection_query( metrics=[M_SUM], new_dimensions=[COL_A], cached_dimensions=[COL_A, COL_B], cached_limit=10, ) - ok, _, _ = can_satisfy(entry, new_q) - assert ok is False + ok, leftovers, projection = can_satisfy(entry, new_q) + assert ok is True + assert leftovers == set() + assert projection is True + + +def test_projection_input_complete_unlimited_cached() -> None: + entry = entry_from( + SemanticQuery(metrics=[M_SUM], dimensions=[COL_A, COL_B], limit=None) + ) + payload = SemanticResult( + requests=[], + results=pa.Table.from_pydict({"a": ["x"], "b": [1], "sum_x": [1.0]}), + ) + assert _projection_input_complete(entry, payload) is True + + +def test_projection_input_complete_limited_cached_short_page() -> None: + entry = entry_from( + SemanticQuery(metrics=[M_SUM], dimensions=[COL_A, COL_B], limit=10) + ) + payload = SemanticResult( + requests=[], + results=pa.Table.from_pydict( + { + "a": ["x", "y", "z"], + "b": [1, 1, 1], + "sum_x": [1.0, 2.0, 3.0], + } + ), + ) + assert _projection_input_complete(entry, payload) is True + + +def test_projection_input_complete_limited_cached_full_page() -> None: + entry = entry_from( + SemanticQuery(metrics=[M_SUM], dimensions=[COL_A, COL_B], limit=3) + ) + payload = SemanticResult( + requests=[], + results=pa.Table.from_pydict( + { + "a": ["x", "y", "z"], + "b": [1, 1, 1], + "sum_x": [1.0, 2.0, 3.0], + } + ), + ) + assert _projection_input_complete(entry, payload) is False def test_projection_rejected_when_cached_has_having() -> None: diff --git a/tests/unit_tests/semantic_layers/mapper_test.py b/tests/unit_tests/semantic_layers/mapper_test.py index 7abdce91a6e..3ab64251ae8 100644 --- a/tests/unit_tests/semantic_layers/mapper_test.py +++ b/tests/unit_tests/semantic_layers/mapper_test.py @@ -1251,6 +1251,41 @@ def test_get_results_without_time_offsets( # Verify DataFrame matches main query result pd.testing.assert_frame_equal(result.df, main_df) + assert result.semantic_cache_hit is False + + +def test_get_results_marks_semantic_cache_hit_from_requests( + mock_datasource: MagicMock, + mocker: MockerFixture, +) -> None: + main_df = pd.DataFrame({"category": ["A"], "total_sales": [1.0]}) + cached_result = SemanticResult( + requests=[ + SemanticRequest(type="SQL", definition="SELECT ..."), + SemanticRequest( + type="cache", + definition=( + "Served from semantic view smart cache (re-aggregated locally)" + ), + ), + ], + results=pa.Table.from_pandas(main_df), + ) + + mock_datasource.implementation.get_table = mocker.Mock(return_value=cached_result) + + query_object = ValidatedQueryObject( + datasource=mock_datasource, + from_dttm=datetime(2025, 10, 15), + to_dttm=datetime(2025, 10, 22), + metrics=["total_sales"], + columns=["category"], + granularity="order_date", + ) + + result = get_results(query_object) + + assert result.semantic_cache_hit is True def test_get_results_with_single_time_offset(