From 044e8afed66e52f023fc00edc8b3b386de0fc1ba Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Tue, 12 May 2026 17:17:09 -0400 Subject: [PATCH] Leverage additive metrics --- .../superset_core/semantic_layers/types.py | 21 ++ superset/semantic_layers/cache.py | 182 ++++++++-- .../semantic_layers/cache_integration_test.py | 104 ++++++ .../unit_tests/semantic_layers/cache_test.py | 317 ++++++++++++++++-- 4 files changed, 572 insertions(+), 52 deletions(-) diff --git a/superset-core/src/superset_core/semantic_layers/types.py b/superset-core/src/superset_core/semantic_layers/types.py index 1239c1303be..3bfa9e8c315 100644 --- a/superset-core/src/superset_core/semantic_layers/types.py +++ b/superset-core/src/superset_core/semantic_layers/types.py @@ -92,6 +92,26 @@ class Dimension: grain: Grain | None = None +class AggregationType(str, enum.Enum): + """ + Aggregation function applied by a metric. + + Additivity (across an arbitrary set of grouping dimensions): + * ``SUM``, ``COUNT``: fully additive — sub-group sums roll up via ``sum``. + * ``MIN``, ``MAX``: roll up via ``min`` / ``max`` of sub-group values. + * ``AVG``, ``COUNT_DISTINCT``, ``OTHER``: not safely roll-uppable from + sub-aggregates without auxiliary data. + """ + + SUM = "SUM" + COUNT = "COUNT" + MIN = "MIN" + MAX = "MAX" + AVG = "AVG" + COUNT_DISTINCT = "COUNT_DISTINCT" + OTHER = "OTHER" + + @dataclass(frozen=True) class Metric: id: str @@ -100,6 +120,7 @@ class Metric: definition: str description: str | None = None + aggregation: AggregationType | None = None @dataclass(frozen=True) diff --git a/superset/semantic_layers/cache.py b/superset/semantic_layers/cache.py index b69a8803cbf..ef5cdc68eb3 100644 --- a/superset/semantic_layers/cache.py +++ b/superset/semantic_layers/cache.py @@ -50,10 +50,12 @@ import pyarrow as pa from flask import current_app from superset_core.semantic_layers.types import ( AdhocExpression, + AggregationType, Dimension, Filter, Metric, Operator, + OrderDirection, OrderTuple, PredicateType, SemanticQuery, @@ -64,6 +66,7 @@ from superset_core.semantic_layers.types import ( from superset.extensions import cache_manager from superset.utils import json from superset.utils.hashing import hash_from_str +from superset.utils.pandas_postprocessing.aggregate import aggregate logger = logging.getLogger(__name__) @@ -71,6 +74,14 @@ INDEX_KEY_PREFIX = "sv:idx:" VALUE_KEY_PREFIX = "sv:val:" MAX_ENTRIES_PER_SHAPE = 32 +_AGGREGATION_TO_PANDAS: dict[AggregationType, str] = { + AggregationType.SUM: "sum", + AggregationType.COUNT: "sum", + AggregationType.MIN: "min", + AggregationType.MAX: "max", +} +ADDITIVE_AGGREGATIONS = frozenset(_AGGREGATION_TO_PANDAS) + @dataclass(frozen=True) class ViewMeta: @@ -84,6 +95,7 @@ class ViewMeta: @dataclass(frozen=True) class CachedEntry: filters: frozenset[Filter] + dimension_keys: frozenset[str] limit: int | None offset: int order_key: str @@ -113,14 +125,16 @@ def try_serve_from_cache( served: SemanticResult | None = None for entry in entries: if served is None: - ok, leftovers = can_satisfy(entry, query) + ok, leftovers, projection_needed = can_satisfy(entry, query) if ok: payload = cache.get(entry.value_key) if payload is None: # value evicted but index entry survived; drop it continue pruned.append(entry) - served = _apply_post_processing(payload, query, leftovers) + served = _apply_post_processing( + payload, query, leftovers, projection_needed + ) continue # keep entry; verify its value is still alive if cache.get(entry.value_key) is not None: @@ -150,6 +164,7 @@ def store_result( entries: list[CachedEntry] = list(cache.get(idx_key) or []) entry = CachedEntry( filters=frozenset(query.filters or set()), + dimension_keys=frozenset(_dimension_key(d) for d in query.dimensions), limit=query.limit, offset=query.offset or 0, order_key=_order_key(query.order), @@ -173,10 +188,10 @@ def store_result( def shape_key(view_meta: ViewMeta, query: SemanticQuery) -> str: - shape = { - "m": sorted(m.id for m in query.metrics), - "d": sorted(_dimension_key(d) for d in query.dimensions), - } + # The shape key buckets entries by metric set only; dimensions live on each + # ``CachedEntry`` so we can find broader (dimension-superset) entries via the + # projection path. + shape = {"m": sorted(m.id for m in query.metrics)} digest = hash_from_str(json.dumps(shape, sort_keys=True))[:16] return f"{INDEX_KEY_PREFIX}{view_meta.uuid}:{view_meta.changed_on_iso}:{digest}" @@ -270,26 +285,42 @@ def _timeout(view_meta: ViewMeta) -> int | None: def can_satisfy( # noqa: C901 entry: CachedEntry, query: SemanticQuery, -) -> tuple[bool, set[Filter]]: - """Return ``(reusable, leftover_filters_to_apply)`` for ``entry`` vs ``query``.""" +) -> tuple[bool, set[Filter], bool]: + """ + Return ``(reusable, leftover_filters, projection_needed)`` for ``entry`` vs + ``query``. ``projection_needed`` is True when the cached entry has a strict + superset of the new dimensions and a pandas rollup is required. + """ + new_dim_keys = frozenset(_dimension_key(d) for d in query.dimensions) + cached_dim_keys = entry.dimension_keys + + if cached_dim_keys == new_dim_keys: + projection_needed = False + elif cached_dim_keys > new_dim_keys: + projection_needed = True + if not _projection_allowed(entry, query, new_dim_keys, cached_dim_keys): + return False, set(), False + else: + return False, set(), False + new_filters = frozenset(query.filters or set()) c_adhoc, c_having, c_where = _split(entry.filters) n_adhoc, n_having, n_where = _split(new_filters) if c_adhoc != n_adhoc: - return False, set() + return False, set(), False if c_having != n_having: - return False, set() + return False, set(), False c_by_col = _group_by_column(c_where) n_by_col = _group_by_column(n_where) - for col_id, c_list in c_by_col.items(): - n_list = n_by_col.get(col_id, []) + for c_list in c_by_col.values(): for c in c_list: + n_list = n_by_col.get(_filter_col_id(c), []) if not any(_implies(n, c) for n in n_list): - return False, set() + return False, set(), False leftovers: set[Filter] = set() for col_id, n_list in n_by_col.items(): @@ -297,27 +328,72 @@ def can_satisfy( # noqa: C901 for n in n_list: if not any(_implies(c, n) for c in c_list): if n.column is None or n.operator == Operator.ADHOC: - return False, set() + return False, set(), False leftovers.add(n) - projection_ids = _projection_ids(query) + # Leftover filters are applied to the cached DataFrame BEFORE the optional + # rollup, so their columns must be present in the cached projection. + allowed_ids = _cached_column_ids(entry, query) for leftover in leftovers: - if leftover.column is None or leftover.column.id not in projection_ids: - return False, set() + if leftover.column is None or leftover.column.id not in allowed_ids: + return False, set(), False if entry.offset != 0 or (query.offset or 0) != 0: - return False, set() + return False, set(), False + if projection_needed: + # Re-aggregation will re-order by ``query.order`` after rollup, so the + # cached order is irrelevant. We do require the new order (if any) to + # reference only surviving columns; otherwise sort would fail post-rollup. + if not _order_uses_only(query.order, _projection_ids(query)): + return False, set(), False + else: + if entry.limit is not None: + if query.limit is None or query.limit > entry.limit: + return False, set(), False + if entry.order_key != _order_key(query.order): + return False, set(), False + + if entry.group_limit_key != _group_limit_key(query.group_limit): + return False, set(), False + + return True, leftovers, projection_needed + + +def _projection_allowed( + entry: CachedEntry, + query: SemanticQuery, + new_dim_keys: frozenset[str], + cached_dim_keys: frozenset[str], +) -> bool: + """ + Gates for the projection path (above and beyond filter containment). + """ + if any(m.aggregation not in ADDITIVE_AGGREGATIONS for m in query.metrics): + return False + # Cached truncation makes the rollup unsafe (we're missing rows). if entry.limit is not None: - if query.limit is None or query.limit > entry.limit: - return False, set() - if entry.order_key != _order_key(query.order): - return False, set() + return False + if entry.group_limit_key: + return False + # Cached HAVING dropped sub-aggregate rows; the rolled-up totals would be + # off. Conservative: skip the projection path when cached has any HAVING. + if any(f.type == PredicateType.HAVING for f in entry.filters): + return False + return True - if entry.group_limit_key != _group_limit_key(query.group_limit): - return False, set() - return True, leftovers +def _filter_col_id(f: Filter) -> str | None: + return f.column.id if f.column is not None else None + + +def _order_uses_only( + order: list[OrderTuple] | None, + allowed_ids: set[str], +) -> bool: + if not order: + return True + return all(_orderable_id(element) in allowed_ids for element, _ in order) def _split( @@ -348,6 +424,12 @@ def _projection_ids(query: SemanticQuery) -> set[str]: return {d.id for d in query.dimensions} | {m.id for m in query.metrics} +def _cached_column_ids(entry: CachedEntry, query: SemanticQuery) -> set[str]: + """Column ids available in the cached DataFrame (cached dims + shared metrics).""" + cached_dim_ids = {key.rsplit("@", 1)[0] for key in entry.dimension_keys} + return cached_dim_ids | {m.id for m in query.metrics} + + # --------------------------------------------------------------------------- # Pairwise implication # --------------------------------------------------------------------------- @@ -522,9 +604,10 @@ def _apply_post_processing( cached: SemanticResult, query: SemanticQuery, leftovers: set[Filter], + projection_needed: bool, ) -> SemanticResult: - """Apply leftover filters and the new limit to a cached result.""" - if not leftovers and query.limit is None: + """Apply leftover filters, projection (re-aggregation), order, and limit.""" + if not leftovers and not projection_needed and query.limit is None: return cached df = cached.results.to_pandas() @@ -533,17 +616,54 @@ def _apply_post_processing( for f in leftovers: mask &= _mask_for(df, f) df = df[mask] + + note_def = "Served from semantic view smart cache (post-processed locally)" + if projection_needed: + groupby = [d.name for d in query.dimensions] + aggregates = { + m.name: { + "column": m.name, + "operator": _AGGREGATION_TO_PANDAS[ + # Guarded by ``_projection_allowed`` — non-None and additive. + m.aggregation # type: ignore[index] + ], + } + for m in query.metrics + } + df = aggregate(df, groupby=groupby, aggregates=aggregates) + df = _apply_order(df, query.order) + note_def = "Served from semantic view smart cache (re-aggregated locally)" + if query.limit is not None: df = df.head(query.limit) table = pa.Table.from_pandas(df, preserve_index=False) - note = SemanticRequest( - type="cache", - definition="Served from semantic view smart cache (post-processed locally)", - ) + note = SemanticRequest(type="cache", definition=note_def) return SemanticResult(requests=list(cached.requests) + [note], results=table) +def _apply_order( + df: pd.DataFrame, + order: list[OrderTuple] | None, +) -> pd.DataFrame: + if not order: + return df + available: list[tuple[str, bool]] = [] + for element, direction in order: + col = _orderable_id_name(element) + if col in df.columns: + available.append((col, direction == OrderDirection.ASC)) + if not available: + return df + cols = [col for col, _ in available] + asc = [a for _, a in available] + return df.sort_values(by=cols, ascending=asc).reset_index(drop=True) + + +def _orderable_id_name(element: Metric | Dimension | AdhocExpression) -> str: + return getattr(element, "name", element.id) + + def _mask_for(df: pd.DataFrame, f: Filter) -> pd.Series: # noqa: C901 if f.column is None: return pd.Series(True, index=df.index) diff --git a/tests/unit_tests/semantic_layers/cache_integration_test.py b/tests/unit_tests/semantic_layers/cache_integration_test.py index 87b5e659042..c028c74f84e 100644 --- a/tests/unit_tests/semantic_layers/cache_integration_test.py +++ b/tests/unit_tests/semantic_layers/cache_integration_test.py @@ -28,6 +28,7 @@ import pyarrow as pa import pytest from pytest_mock import MockerFixture from superset_core.semantic_layers.types import ( + AggregationType, Dimension, Metric, SemanticRequest, @@ -189,3 +190,106 @@ def test_changed_on_invalidates_cache( datasource.changed_on = datetime(2026, 2, 1, 0, 0, 0) get_results(_qo(datasource, ">", 1)) assert view_implementation.get_table.call_count == 2 + + +# --------------------------------------------------------------------------- +# Projection (v2) — dropping a dimension and re-aggregating +# --------------------------------------------------------------------------- + + +def _make_view(metric_aggregation: AggregationType | None) -> tuple[Any, MagicMock]: + dim_b = Dimension(id="t.b", name="b", type=pa.utf8()) + dim_c = Dimension(id="t.c", name="c", type=pa.utf8()) + metric_x = Metric( + id="t.x", + name="x", + type=pa.float64(), + definition="sum(x)", + aggregation=metric_aggregation, + ) + impl = MagicMock() + impl.metrics = {metric_x} + impl.dimensions = {dim_b, dim_c} + impl.features = frozenset() + impl.get_metrics = MagicMock(return_value={metric_x}) + impl.get_dimensions = MagicMock(return_value={dim_b, dim_c}) + + ds = MagicMock() + ds.implementation = impl + ds.uuid = "proj-view" + ds.changed_on = datetime(2026, 3, 1, 0, 0, 0) + ds.cache_timeout = 60 + ds.fetch_values_predicate = None + return impl, ds + + +def _qo_dims(ds: MagicMock, columns: list[str]) -> ValidatedQueryObject: + return ValidatedQueryObject( + datasource=ds, + metrics=["x"], + columns=columns, # type: ignore[arg-type] + filters=[], + ) + + +def _result_bc(rows: list[tuple[str, str, float]]) -> SemanticResult: + df = pd.DataFrame(rows, columns=["b", "c", "x"]) + return SemanticResult( + requests=[SemanticRequest(type="SQL", definition="select b,c,sum(x)")], + results=pa.Table.from_pandas(df, preserve_index=False), + ) + + +def test_projection_reuses_cached_for_dropped_dim( + fake_cache: _InMemoryCache, +) -> None: + impl, ds = _make_view(AggregationType.SUM) + impl.get_table = MagicMock( + return_value=_result_bc( + [("b1", "c1", 5.0), ("b1", "c2", 3.0), ("b2", "c1", 4.0)] + ) + ) + + first = get_results(_qo_dims(ds, ["b", "c"])) + assert impl.get_table.call_count == 1 + assert len(first.df) == 3 + + second = get_results(_qo_dims(ds, ["b"])) + assert impl.get_table.call_count == 1 # served via projection + df = second.df.sort_values("b").reset_index(drop=True) + assert df["b"].tolist() == ["b1", "b2"] + assert df["x"].tolist() == [8.0, 4.0] + + +def test_projection_skipped_when_aggregation_unknown( + fake_cache: _InMemoryCache, +) -> None: + impl, ds = _make_view(None) # metric has no aggregation declared + impl.get_table = MagicMock( + side_effect=[ + _result_bc([("b1", "c1", 5.0), ("b1", "c2", 3.0)]), + _result_bc([("b1", "c1", 5.0)]), # what the SV would compute for [b] + ] + ) + + get_results(_qo_dims(ds, ["b", "c"])) + assert impl.get_table.call_count == 1 + + get_results(_qo_dims(ds, ["b"])) + assert impl.get_table.call_count == 2 # cannot project, re-executed + + +def test_projection_skipped_for_avg( + fake_cache: _InMemoryCache, +) -> None: + impl, ds = _make_view(AggregationType.AVG) + impl.get_table = MagicMock( + side_effect=[ + _result_bc([("b1", "c1", 5.0), ("b1", "c2", 3.0)]), + _result_bc([("b1", "c1", 4.0)]), + ] + ) + + get_results(_qo_dims(ds, ["b", "c"])) + get_results(_qo_dims(ds, ["b"])) + assert impl.get_table.call_count == 2 diff --git a/tests/unit_tests/semantic_layers/cache_test.py b/tests/unit_tests/semantic_layers/cache_test.py index bfe5cf948cc..c3634511cd5 100644 --- a/tests/unit_tests/semantic_layers/cache_test.py +++ b/tests/unit_tests/semantic_layers/cache_test.py @@ -24,6 +24,7 @@ import pandas as pd import pyarrow as pa import pytest from superset_core.semantic_layers.types import ( + AggregationType, Dimension, Filter, Metric, @@ -54,8 +55,18 @@ def dim(id_: str, name: str | None = None) -> Dimension: return Dimension(id=id_, name=name or id_, type=pa.utf8()) -def met(id_: str, name: str | None = None) -> Metric: - return Metric(id=id_, name=name or id_, type=pa.float64(), definition="x") +def met( + id_: str, + name: str | None = None, + aggregation: AggregationType | None = None, +) -> Metric: + return Metric( + id=id_, + name=name or id_, + type=pa.float64(), + definition="x", + aggregation=aggregation, + ) COL_A = dim("col.a", "a") @@ -95,10 +106,15 @@ def query( def entry_from(q: SemanticQuery, value_key_: str = "vk") -> CachedEntry: - from superset.semantic_layers.cache import _group_limit_key, _order_key + from superset.semantic_layers.cache import ( + _dimension_key, + _group_limit_key, + _order_key, + ) return CachedEntry( filters=frozenset(q.filters or set()), + dimension_keys=frozenset(_dimension_key(d) for d in q.dimensions), limit=q.limit, offset=q.offset or 0, order_key=_order_key(q.order), @@ -193,15 +209,16 @@ def test_implies_like_exact_match_only() -> None: def test_can_satisfy_empty_cached_returns_all_as_leftovers() -> None: cached_q = query(filters=None) new_q = query(filters={where(COL_A, Operator.GREATER_THAN, 5)}) - ok, leftovers = can_satisfy(entry_from(cached_q), new_q) + ok, leftovers, projection = can_satisfy(entry_from(cached_q), new_q) assert ok is True + assert projection is False assert leftovers == {where(COL_A, Operator.GREATER_THAN, 5)} def test_can_satisfy_narrower_filter() -> None: cached_q = query(filters={where(COL_A, Operator.GREATER_THAN, 1)}) new_q = query(filters={where(COL_A, Operator.GREATER_THAN, 2)}) - ok, leftovers = can_satisfy(entry_from(cached_q), new_q) + ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is True assert leftovers == {where(COL_A, Operator.GREATER_THAN, 2)} @@ -209,7 +226,7 @@ def test_can_satisfy_narrower_filter() -> None: def test_can_satisfy_broader_filter_fails() -> None: cached_q = query(filters={where(COL_A, Operator.GREATER_THAN, 2)}) new_q = query(filters={where(COL_A, Operator.GREATER_THAN, 1)}) - ok, leftovers = can_satisfy(entry_from(cached_q), new_q) + ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is False assert leftovers == set() @@ -217,7 +234,7 @@ def test_can_satisfy_broader_filter_fails() -> None: def test_can_satisfy_missing_constraint_fails() -> None: cached_q = query(filters={where(COL_A, Operator.GREATER_THAN, 1)}) new_q = query(filters=None) - ok, _ = can_satisfy(entry_from(cached_q), new_q) + ok, _, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is False @@ -229,7 +246,7 @@ def test_can_satisfy_new_filter_on_extra_column() -> None: where(COL_B, Operator.EQUALS, "x"), } ) - ok, leftovers = can_satisfy(entry_from(cached_q), new_q) + ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is True assert leftovers == { where(COL_A, Operator.GREATER_THAN, 2), @@ -244,7 +261,7 @@ def test_can_satisfy_leftover_on_non_projected_column_fails() -> None: filters={where(other, Operator.EQUALS, "x")}, dimensions=[COL_A, COL_B], ) - ok, _ = can_satisfy(entry_from(cached_q), new_q) + ok, _, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is False @@ -252,8 +269,8 @@ def test_can_satisfy_having_requires_exact_set() -> None: cached_q = query(filters={having(M_X, Operator.GREATER_THAN, 100)}) same = query(filters={having(M_X, Operator.GREATER_THAN, 100)}) tighter = query(filters={having(M_X, Operator.GREATER_THAN, 200)}) - ok_same, _ = can_satisfy(entry_from(cached_q), same) - ok_tight, _ = can_satisfy(entry_from(cached_q), tighter) + ok_same, _, _ = can_satisfy(entry_from(cached_q), same) + ok_tight, _, _ = can_satisfy(entry_from(cached_q), tighter) assert ok_same is True assert ok_tight is False @@ -262,8 +279,8 @@ def test_can_satisfy_adhoc_requires_exact_set() -> None: cached_q = query(filters={adhoc("col_a > 1")}) same = query(filters={adhoc("col_a > 1")}) different = query(filters={adhoc("col_a > 2")}) - ok_same, _ = can_satisfy(entry_from(cached_q), same) - ok_diff, _ = can_satisfy(entry_from(cached_q), different) + ok_same, _, _ = can_satisfy(entry_from(cached_q), same) + ok_diff, _, _ = can_satisfy(entry_from(cached_q), different) assert ok_same is True assert ok_diff is False @@ -276,7 +293,7 @@ def test_can_satisfy_adhoc_requires_exact_set() -> None: def test_can_satisfy_unlimited_cached_satisfies_any_limit() -> None: cached_q = query(filters=None, limit=None) new_q = query(filters=None, limit=10) - ok, leftovers = can_satisfy(entry_from(cached_q), new_q) + ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is True assert leftovers == set() @@ -285,35 +302,35 @@ def test_can_satisfy_smaller_limit_with_matching_order() -> None: order = [(M_X, OrderDirection.DESC)] cached_q = query(filters=None, limit=100, order=order) new_q = query(filters=None, limit=10, order=order) - ok, _ = can_satisfy(entry_from(cached_q), new_q) + ok, _, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is True def test_can_satisfy_smaller_limit_different_order_fails() -> None: cached_q = query(filters=None, limit=100, order=[(M_X, OrderDirection.DESC)]) new_q = query(filters=None, limit=10, order=[(M_X, OrderDirection.ASC)]) - ok, _ = can_satisfy(entry_from(cached_q), new_q) + ok, _, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is False def test_can_satisfy_larger_limit_fails() -> None: cached_q = query(filters=None, limit=10) new_q = query(filters=None, limit=100) - ok, _ = can_satisfy(entry_from(cached_q), new_q) + ok, _, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is False def test_can_satisfy_no_new_limit_when_cached_has_one_fails() -> None: cached_q = query(filters=None, limit=100) new_q = query(filters=None, limit=None) - ok, _ = can_satisfy(entry_from(cached_q), new_q) + ok, _, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is False def test_can_satisfy_offset_never_reused() -> None: cached_q = SemanticQuery(metrics=[M_X], dimensions=[COL_A], offset=5) new_q = SemanticQuery(metrics=[M_X], dimensions=[COL_A], offset=5) - ok, _ = can_satisfy(entry_from(cached_q), new_q) + ok, _, _ = can_satisfy(entry_from(cached_q), new_q) assert ok is False @@ -333,7 +350,7 @@ def test_apply_post_processing_filters_and_limits() -> None: limit=2, ) result = _apply_post_processing( - cached, new_q, {where(COL_A, Operator.GREATER_THAN, 2)} + cached, new_q, {where(COL_A, Operator.GREATER_THAN, 2)}, False ) result_df = result.results.to_pandas() assert list(result_df["a"]) == [3, 5] @@ -347,7 +364,7 @@ def test_apply_post_processing_no_leftovers_no_limit_returns_original() -> None: requests=[], results=pa.Table.from_pandas(df, preserve_index=False) ) new_q = query(filters=None, limit=None) - out = _apply_post_processing(cached, new_q, set()) + out = _apply_post_processing(cached, new_q, set(), False) # same object reference is OK; we explicitly return the input assert out is cached @@ -394,3 +411,261 @@ def test_value_key_with_datetime_filter() -> None: q = SemanticQuery(metrics=[M_X], dimensions=[COL_A], filters={f}) # should not raise assert value_key(VIEW, q).startswith("sv:val:") + + +def test_shape_key_independent_of_dimensions() -> None: + # The v2 shape key buckets entries by metric set only; different dimension + # sets share the same shape so the projection path can find broader entries. + q1 = SemanticQuery(metrics=[M_X], dimensions=[COL_A, COL_B]) + q2 = SemanticQuery(metrics=[M_X], dimensions=[COL_A]) + assert shape_key(VIEW, q1) == shape_key(VIEW, q2) + # Value keys still differ. + assert value_key(VIEW, q1) != value_key(VIEW, q2) + + +# --------------------------------------------------------------------------- +# Projection (v2) +# --------------------------------------------------------------------------- + + +M_SUM = met("met.sum", "sum_x", aggregation=AggregationType.SUM) +M_COUNT = met("met.count", "count_x", aggregation=AggregationType.COUNT) +M_MIN = met("met.min", "min_x", aggregation=AggregationType.MIN) +M_MAX = met("met.max", "max_x", aggregation=AggregationType.MAX) +M_AVG = met("met.avg", "avg_x", aggregation=AggregationType.AVG) +M_UNKNOWN = met("met.unknown", "unknown_x", aggregation=None) + + +def _projection_query( + metrics: list[Metric], + new_dimensions: list[Dimension], + cached_dimensions: list[Dimension], + cached_filters: set[Filter] | None = None, + cached_limit: int | None = None, + new_filters: set[Filter] | None = None, + new_limit: int | None = None, + new_order: Any = None, +) -> tuple[CachedEntry, SemanticQuery]: + cached_q = SemanticQuery( + metrics=metrics, + dimensions=cached_dimensions, + filters=cached_filters, + limit=cached_limit, + ) + new_q = SemanticQuery( + metrics=metrics, + dimensions=new_dimensions, + filters=new_filters, + limit=new_limit, + order=new_order, + ) + return entry_from(cached_q), new_q + + +@pytest.mark.parametrize( + "metric,operator", + [ + (M_SUM, "sum"), + (M_COUNT, "sum"), + (M_MIN, "min"), + (M_MAX, "max"), + ], +) +def test_can_satisfy_projection_each_additive_op(metric: Metric, operator: str) -> None: + entry, new_q = _projection_query( + metrics=[metric], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + ) + ok, leftovers, projection = can_satisfy(entry, new_q) + assert ok is True + assert projection is True + assert leftovers == set() + + +def test_projection_rolls_up_sum() -> None: + entry, new_q = _projection_query( + metrics=[M_SUM], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + ) + cached_df = pd.DataFrame( + {"a": ["x", "x", "y", "y"], "b": [1, 2, 1, 2], "sum_x": [10, 20, 30, 40]} + ) + cached = SemanticResult( + requests=[SemanticRequest(type="SQL", definition="select ...")], + results=pa.Table.from_pandas(cached_df, preserve_index=False), + ) + out = _apply_post_processing(cached, new_q, set(), True) + out_df = out.results.to_pandas().sort_values("a").reset_index(drop=True) + assert list(out_df["a"]) == ["x", "y"] + assert list(out_df["sum_x"]) == [30, 70] + + +def test_projection_rolls_up_min_max_count() -> None: + entry, new_q = _projection_query( + metrics=[M_MIN, M_MAX, M_COUNT], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + ) + cached_df = pd.DataFrame( + { + "a": ["x", "x", "y", "y"], + "b": [1, 2, 1, 2], + "min_x": [5, 2, 9, 8], + "max_x": [50, 60, 70, 80], + "count_x": [1, 1, 2, 3], + } + ) + cached = SemanticResult( + requests=[], + results=pa.Table.from_pandas(cached_df, preserve_index=False), + ) + out = _apply_post_processing(cached, new_q, set(), True) + df = out.results.to_pandas().sort_values("a").reset_index(drop=True) + assert list(df["min_x"]) == [2, 8] + assert list(df["max_x"]) == [60, 80] + assert list(df["count_x"]) == [2, 5] + + +def test_projection_drops_multiple_dims() -> None: + col_c = dim("col.c", "c") + entry, new_q = _projection_query( + metrics=[M_SUM], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B, col_c], + ) + cached_df = pd.DataFrame( + { + "a": ["x", "x", "x", "y"], + "b": [1, 1, 2, 1], + "c": [10, 20, 10, 10], + "sum_x": [1, 2, 3, 4], + } + ) + cached = SemanticResult( + requests=[], results=pa.Table.from_pandas(cached_df, preserve_index=False) + ) + out = _apply_post_processing(cached, new_q, set(), True) + df = out.results.to_pandas().sort_values("a").reset_index(drop=True) + assert list(df["sum_x"]) == [6, 4] + + +def test_projection_with_leftover_filter_then_rollup() -> None: + entry, new_q = _projection_query( + metrics=[M_SUM], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + new_filters={where(COL_B, Operator.GREATER_THAN, 1)}, + ) + cached_df = pd.DataFrame( + {"a": ["x", "x", "y"], "b": [1, 2, 2], "sum_x": [10, 20, 30]} + ) + cached = SemanticResult( + requests=[], results=pa.Table.from_pandas(cached_df, preserve_index=False) + ) + ok, leftovers, projection = can_satisfy(entry, new_q) + assert ok is True + assert projection is True + out = _apply_post_processing(cached, new_q, leftovers, projection) + df = out.results.to_pandas().sort_values("a").reset_index(drop=True) + # b > 1 removes the (x,1) row; x sums to 20, y to 30 + assert list(df["sum_x"]) == [20, 30] + + +def test_projection_with_order_and_limit() -> None: + entry, new_q = _projection_query( + metrics=[M_SUM], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + new_order=[(M_SUM, OrderDirection.DESC)], + new_limit=1, + ) + cached_df = pd.DataFrame( + {"a": ["x", "x", "y"], "b": [1, 2, 1], "sum_x": [1, 2, 100]} + ) + cached = SemanticResult( + requests=[], results=pa.Table.from_pandas(cached_df, preserve_index=False) + ) + out = _apply_post_processing(cached, new_q, set(), True) + df = out.results.to_pandas() + assert len(df) == 1 + assert df["a"].tolist() == ["y"] + assert df["sum_x"].tolist() == [100] + + +def test_projection_rejected_when_metric_aggregation_unknown() -> None: + entry, new_q = _projection_query( + metrics=[M_UNKNOWN], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + ) + ok, _, _ = can_satisfy(entry, new_q) + assert ok is False + + +def test_projection_rejected_for_avg() -> None: + entry, new_q = _projection_query( + metrics=[M_AVG], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + ) + ok, _, _ = can_satisfy(entry, new_q) + assert ok is False + + +def test_projection_rejected_when_cached_has_limit() -> None: + entry, new_q = _projection_query( + metrics=[M_SUM], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + cached_limit=10, + ) + ok, _, _ = can_satisfy(entry, new_q) + assert ok is False + + +def test_projection_rejected_when_cached_has_having() -> None: + entry, new_q = _projection_query( + metrics=[M_SUM], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + cached_filters={having(M_SUM, Operator.GREATER_THAN, 10)}, + new_filters={having(M_SUM, Operator.GREATER_THAN, 10)}, + ) + ok, _, _ = can_satisfy(entry, new_q) + assert ok is False + + +def test_projection_rejected_when_order_references_dropped_dim() -> None: + entry, new_q = _projection_query( + metrics=[M_SUM], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + new_order=[(COL_B, OrderDirection.ASC)], + ) + ok, _, _ = can_satisfy(entry, new_q) + assert ok is False + + +def test_projection_rejected_when_cached_has_filter_on_dropped_dim() -> None: + # cached restricts c; rolling up to [a] would miss rows we'd need + entry, new_q = _projection_query( + metrics=[M_SUM], + new_dimensions=[COL_A], + cached_dimensions=[COL_A, COL_B], + cached_filters={where(COL_B, Operator.GREATER_THAN, 5)}, + ) + ok, _, _ = can_satisfy(entry, new_q) + assert ok is False + + +def test_projection_rejected_when_cached_dims_subset_not_superset() -> None: + # cached has just [a]; new wants [a, b] — finer-grained data unavailable + entry, new_q = _projection_query( + metrics=[M_SUM], + new_dimensions=[COL_A, COL_B], + cached_dimensions=[COL_A], + ) + ok, _, _ = can_satisfy(entry, new_q) + assert ok is False