mirror of
https://github.com/apache/superset.git
synced 2026-05-29 11:45:16 +00:00
Leverage additive metrics
This commit is contained in:
@@ -92,6 +92,26 @@ class Dimension:
|
||||
grain: Grain | None = None
|
||||
|
||||
|
||||
class AggregationType(str, enum.Enum):
|
||||
"""
|
||||
Aggregation function applied by a metric.
|
||||
|
||||
Additivity (across an arbitrary set of grouping dimensions):
|
||||
* ``SUM``, ``COUNT``: fully additive — sub-group sums roll up via ``sum``.
|
||||
* ``MIN``, ``MAX``: roll up via ``min`` / ``max`` of sub-group values.
|
||||
* ``AVG``, ``COUNT_DISTINCT``, ``OTHER``: not safely roll-uppable from
|
||||
sub-aggregates without auxiliary data.
|
||||
"""
|
||||
|
||||
SUM = "SUM"
|
||||
COUNT = "COUNT"
|
||||
MIN = "MIN"
|
||||
MAX = "MAX"
|
||||
AVG = "AVG"
|
||||
COUNT_DISTINCT = "COUNT_DISTINCT"
|
||||
OTHER = "OTHER"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Metric:
|
||||
id: str
|
||||
@@ -100,6 +120,7 @@ class Metric:
|
||||
|
||||
definition: str
|
||||
description: str | None = None
|
||||
aggregation: AggregationType | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
||||
@@ -50,10 +50,12 @@ import pyarrow as pa
|
||||
from flask import current_app
|
||||
from superset_core.semantic_layers.types import (
|
||||
AdhocExpression,
|
||||
AggregationType,
|
||||
Dimension,
|
||||
Filter,
|
||||
Metric,
|
||||
Operator,
|
||||
OrderDirection,
|
||||
OrderTuple,
|
||||
PredicateType,
|
||||
SemanticQuery,
|
||||
@@ -64,6 +66,7 @@ from superset_core.semantic_layers.types import (
|
||||
from superset.extensions import cache_manager
|
||||
from superset.utils import json
|
||||
from superset.utils.hashing import hash_from_str
|
||||
from superset.utils.pandas_postprocessing.aggregate import aggregate
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -71,6 +74,14 @@ INDEX_KEY_PREFIX = "sv:idx:"
|
||||
VALUE_KEY_PREFIX = "sv:val:"
|
||||
MAX_ENTRIES_PER_SHAPE = 32
|
||||
|
||||
_AGGREGATION_TO_PANDAS: dict[AggregationType, str] = {
|
||||
AggregationType.SUM: "sum",
|
||||
AggregationType.COUNT: "sum",
|
||||
AggregationType.MIN: "min",
|
||||
AggregationType.MAX: "max",
|
||||
}
|
||||
ADDITIVE_AGGREGATIONS = frozenset(_AGGREGATION_TO_PANDAS)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ViewMeta:
|
||||
@@ -84,6 +95,7 @@ class ViewMeta:
|
||||
@dataclass(frozen=True)
|
||||
class CachedEntry:
|
||||
filters: frozenset[Filter]
|
||||
dimension_keys: frozenset[str]
|
||||
limit: int | None
|
||||
offset: int
|
||||
order_key: str
|
||||
@@ -113,14 +125,16 @@ def try_serve_from_cache(
|
||||
served: SemanticResult | None = None
|
||||
for entry in entries:
|
||||
if served is None:
|
||||
ok, leftovers = can_satisfy(entry, query)
|
||||
ok, leftovers, projection_needed = can_satisfy(entry, query)
|
||||
if ok:
|
||||
payload = cache.get(entry.value_key)
|
||||
if payload is None:
|
||||
# value evicted but index entry survived; drop it
|
||||
continue
|
||||
pruned.append(entry)
|
||||
served = _apply_post_processing(payload, query, leftovers)
|
||||
served = _apply_post_processing(
|
||||
payload, query, leftovers, projection_needed
|
||||
)
|
||||
continue
|
||||
# keep entry; verify its value is still alive
|
||||
if cache.get(entry.value_key) is not None:
|
||||
@@ -150,6 +164,7 @@ def store_result(
|
||||
entries: list[CachedEntry] = list(cache.get(idx_key) or [])
|
||||
entry = CachedEntry(
|
||||
filters=frozenset(query.filters or set()),
|
||||
dimension_keys=frozenset(_dimension_key(d) for d in query.dimensions),
|
||||
limit=query.limit,
|
||||
offset=query.offset or 0,
|
||||
order_key=_order_key(query.order),
|
||||
@@ -173,10 +188,10 @@ def store_result(
|
||||
|
||||
|
||||
def shape_key(view_meta: ViewMeta, query: SemanticQuery) -> str:
|
||||
shape = {
|
||||
"m": sorted(m.id for m in query.metrics),
|
||||
"d": sorted(_dimension_key(d) for d in query.dimensions),
|
||||
}
|
||||
# The shape key buckets entries by metric set only; dimensions live on each
|
||||
# ``CachedEntry`` so we can find broader (dimension-superset) entries via the
|
||||
# projection path.
|
||||
shape = {"m": sorted(m.id for m in query.metrics)}
|
||||
digest = hash_from_str(json.dumps(shape, sort_keys=True))[:16]
|
||||
return f"{INDEX_KEY_PREFIX}{view_meta.uuid}:{view_meta.changed_on_iso}:{digest}"
|
||||
|
||||
@@ -270,26 +285,42 @@ def _timeout(view_meta: ViewMeta) -> int | None:
|
||||
def can_satisfy( # noqa: C901
|
||||
entry: CachedEntry,
|
||||
query: SemanticQuery,
|
||||
) -> tuple[bool, set[Filter]]:
|
||||
"""Return ``(reusable, leftover_filters_to_apply)`` for ``entry`` vs ``query``."""
|
||||
) -> tuple[bool, set[Filter], bool]:
|
||||
"""
|
||||
Return ``(reusable, leftover_filters, projection_needed)`` for ``entry`` vs
|
||||
``query``. ``projection_needed`` is True when the cached entry has a strict
|
||||
superset of the new dimensions and a pandas rollup is required.
|
||||
"""
|
||||
new_dim_keys = frozenset(_dimension_key(d) for d in query.dimensions)
|
||||
cached_dim_keys = entry.dimension_keys
|
||||
|
||||
if cached_dim_keys == new_dim_keys:
|
||||
projection_needed = False
|
||||
elif cached_dim_keys > new_dim_keys:
|
||||
projection_needed = True
|
||||
if not _projection_allowed(entry, query, new_dim_keys, cached_dim_keys):
|
||||
return False, set(), False
|
||||
else:
|
||||
return False, set(), False
|
||||
|
||||
new_filters = frozenset(query.filters or set())
|
||||
|
||||
c_adhoc, c_having, c_where = _split(entry.filters)
|
||||
n_adhoc, n_having, n_where = _split(new_filters)
|
||||
|
||||
if c_adhoc != n_adhoc:
|
||||
return False, set()
|
||||
return False, set(), False
|
||||
if c_having != n_having:
|
||||
return False, set()
|
||||
return False, set(), False
|
||||
|
||||
c_by_col = _group_by_column(c_where)
|
||||
n_by_col = _group_by_column(n_where)
|
||||
|
||||
for col_id, c_list in c_by_col.items():
|
||||
n_list = n_by_col.get(col_id, [])
|
||||
for c_list in c_by_col.values():
|
||||
for c in c_list:
|
||||
n_list = n_by_col.get(_filter_col_id(c), [])
|
||||
if not any(_implies(n, c) for n in n_list):
|
||||
return False, set()
|
||||
return False, set(), False
|
||||
|
||||
leftovers: set[Filter] = set()
|
||||
for col_id, n_list in n_by_col.items():
|
||||
@@ -297,27 +328,72 @@ def can_satisfy( # noqa: C901
|
||||
for n in n_list:
|
||||
if not any(_implies(c, n) for c in c_list):
|
||||
if n.column is None or n.operator == Operator.ADHOC:
|
||||
return False, set()
|
||||
return False, set(), False
|
||||
leftovers.add(n)
|
||||
|
||||
projection_ids = _projection_ids(query)
|
||||
# Leftover filters are applied to the cached DataFrame BEFORE the optional
|
||||
# rollup, so their columns must be present in the cached projection.
|
||||
allowed_ids = _cached_column_ids(entry, query)
|
||||
for leftover in leftovers:
|
||||
if leftover.column is None or leftover.column.id not in projection_ids:
|
||||
return False, set()
|
||||
if leftover.column is None or leftover.column.id not in allowed_ids:
|
||||
return False, set(), False
|
||||
|
||||
if entry.offset != 0 or (query.offset or 0) != 0:
|
||||
return False, set()
|
||||
return False, set(), False
|
||||
|
||||
if projection_needed:
|
||||
# Re-aggregation will re-order by ``query.order`` after rollup, so the
|
||||
# cached order is irrelevant. We do require the new order (if any) to
|
||||
# reference only surviving columns; otherwise sort would fail post-rollup.
|
||||
if not _order_uses_only(query.order, _projection_ids(query)):
|
||||
return False, set(), False
|
||||
else:
|
||||
if entry.limit is not None:
|
||||
if query.limit is None or query.limit > entry.limit:
|
||||
return False, set(), False
|
||||
if entry.order_key != _order_key(query.order):
|
||||
return False, set(), False
|
||||
|
||||
if entry.group_limit_key != _group_limit_key(query.group_limit):
|
||||
return False, set(), False
|
||||
|
||||
return True, leftovers, projection_needed
|
||||
|
||||
|
||||
def _projection_allowed(
|
||||
entry: CachedEntry,
|
||||
query: SemanticQuery,
|
||||
new_dim_keys: frozenset[str],
|
||||
cached_dim_keys: frozenset[str],
|
||||
) -> bool:
|
||||
"""
|
||||
Gates for the projection path (above and beyond filter containment).
|
||||
"""
|
||||
if any(m.aggregation not in ADDITIVE_AGGREGATIONS for m in query.metrics):
|
||||
return False
|
||||
# Cached truncation makes the rollup unsafe (we're missing rows).
|
||||
if entry.limit is not None:
|
||||
if query.limit is None or query.limit > entry.limit:
|
||||
return False, set()
|
||||
if entry.order_key != _order_key(query.order):
|
||||
return False, set()
|
||||
return False
|
||||
if entry.group_limit_key:
|
||||
return False
|
||||
# Cached HAVING dropped sub-aggregate rows; the rolled-up totals would be
|
||||
# off. Conservative: skip the projection path when cached has any HAVING.
|
||||
if any(f.type == PredicateType.HAVING for f in entry.filters):
|
||||
return False
|
||||
return True
|
||||
|
||||
if entry.group_limit_key != _group_limit_key(query.group_limit):
|
||||
return False, set()
|
||||
|
||||
return True, leftovers
|
||||
def _filter_col_id(f: Filter) -> str | None:
|
||||
return f.column.id if f.column is not None else None
|
||||
|
||||
|
||||
def _order_uses_only(
|
||||
order: list[OrderTuple] | None,
|
||||
allowed_ids: set[str],
|
||||
) -> bool:
|
||||
if not order:
|
||||
return True
|
||||
return all(_orderable_id(element) in allowed_ids for element, _ in order)
|
||||
|
||||
|
||||
def _split(
|
||||
@@ -348,6 +424,12 @@ def _projection_ids(query: SemanticQuery) -> set[str]:
|
||||
return {d.id for d in query.dimensions} | {m.id for m in query.metrics}
|
||||
|
||||
|
||||
def _cached_column_ids(entry: CachedEntry, query: SemanticQuery) -> set[str]:
|
||||
"""Column ids available in the cached DataFrame (cached dims + shared metrics)."""
|
||||
cached_dim_ids = {key.rsplit("@", 1)[0] for key in entry.dimension_keys}
|
||||
return cached_dim_ids | {m.id for m in query.metrics}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pairwise implication
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -522,9 +604,10 @@ def _apply_post_processing(
|
||||
cached: SemanticResult,
|
||||
query: SemanticQuery,
|
||||
leftovers: set[Filter],
|
||||
projection_needed: bool,
|
||||
) -> SemanticResult:
|
||||
"""Apply leftover filters and the new limit to a cached result."""
|
||||
if not leftovers and query.limit is None:
|
||||
"""Apply leftover filters, projection (re-aggregation), order, and limit."""
|
||||
if not leftovers and not projection_needed and query.limit is None:
|
||||
return cached
|
||||
|
||||
df = cached.results.to_pandas()
|
||||
@@ -533,17 +616,54 @@ def _apply_post_processing(
|
||||
for f in leftovers:
|
||||
mask &= _mask_for(df, f)
|
||||
df = df[mask]
|
||||
|
||||
note_def = "Served from semantic view smart cache (post-processed locally)"
|
||||
if projection_needed:
|
||||
groupby = [d.name for d in query.dimensions]
|
||||
aggregates = {
|
||||
m.name: {
|
||||
"column": m.name,
|
||||
"operator": _AGGREGATION_TO_PANDAS[
|
||||
# Guarded by ``_projection_allowed`` — non-None and additive.
|
||||
m.aggregation # type: ignore[index]
|
||||
],
|
||||
}
|
||||
for m in query.metrics
|
||||
}
|
||||
df = aggregate(df, groupby=groupby, aggregates=aggregates)
|
||||
df = _apply_order(df, query.order)
|
||||
note_def = "Served from semantic view smart cache (re-aggregated locally)"
|
||||
|
||||
if query.limit is not None:
|
||||
df = df.head(query.limit)
|
||||
|
||||
table = pa.Table.from_pandas(df, preserve_index=False)
|
||||
note = SemanticRequest(
|
||||
type="cache",
|
||||
definition="Served from semantic view smart cache (post-processed locally)",
|
||||
)
|
||||
note = SemanticRequest(type="cache", definition=note_def)
|
||||
return SemanticResult(requests=list(cached.requests) + [note], results=table)
|
||||
|
||||
|
||||
def _apply_order(
|
||||
df: pd.DataFrame,
|
||||
order: list[OrderTuple] | None,
|
||||
) -> pd.DataFrame:
|
||||
if not order:
|
||||
return df
|
||||
available: list[tuple[str, bool]] = []
|
||||
for element, direction in order:
|
||||
col = _orderable_id_name(element)
|
||||
if col in df.columns:
|
||||
available.append((col, direction == OrderDirection.ASC))
|
||||
if not available:
|
||||
return df
|
||||
cols = [col for col, _ in available]
|
||||
asc = [a for _, a in available]
|
||||
return df.sort_values(by=cols, ascending=asc).reset_index(drop=True)
|
||||
|
||||
|
||||
def _orderable_id_name(element: Metric | Dimension | AdhocExpression) -> str:
|
||||
return getattr(element, "name", element.id)
|
||||
|
||||
|
||||
def _mask_for(df: pd.DataFrame, f: Filter) -> pd.Series: # noqa: C901
|
||||
if f.column is None:
|
||||
return pd.Series(True, index=df.index)
|
||||
|
||||
@@ -28,6 +28,7 @@ import pyarrow as pa
|
||||
import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
from superset_core.semantic_layers.types import (
|
||||
AggregationType,
|
||||
Dimension,
|
||||
Metric,
|
||||
SemanticRequest,
|
||||
@@ -189,3 +190,106 @@ def test_changed_on_invalidates_cache(
|
||||
datasource.changed_on = datetime(2026, 2, 1, 0, 0, 0)
|
||||
get_results(_qo(datasource, ">", 1))
|
||||
assert view_implementation.get_table.call_count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Projection (v2) — dropping a dimension and re-aggregating
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_view(metric_aggregation: AggregationType | None) -> tuple[Any, MagicMock]:
|
||||
dim_b = Dimension(id="t.b", name="b", type=pa.utf8())
|
||||
dim_c = Dimension(id="t.c", name="c", type=pa.utf8())
|
||||
metric_x = Metric(
|
||||
id="t.x",
|
||||
name="x",
|
||||
type=pa.float64(),
|
||||
definition="sum(x)",
|
||||
aggregation=metric_aggregation,
|
||||
)
|
||||
impl = MagicMock()
|
||||
impl.metrics = {metric_x}
|
||||
impl.dimensions = {dim_b, dim_c}
|
||||
impl.features = frozenset()
|
||||
impl.get_metrics = MagicMock(return_value={metric_x})
|
||||
impl.get_dimensions = MagicMock(return_value={dim_b, dim_c})
|
||||
|
||||
ds = MagicMock()
|
||||
ds.implementation = impl
|
||||
ds.uuid = "proj-view"
|
||||
ds.changed_on = datetime(2026, 3, 1, 0, 0, 0)
|
||||
ds.cache_timeout = 60
|
||||
ds.fetch_values_predicate = None
|
||||
return impl, ds
|
||||
|
||||
|
||||
def _qo_dims(ds: MagicMock, columns: list[str]) -> ValidatedQueryObject:
|
||||
return ValidatedQueryObject(
|
||||
datasource=ds,
|
||||
metrics=["x"],
|
||||
columns=columns, # type: ignore[arg-type]
|
||||
filters=[],
|
||||
)
|
||||
|
||||
|
||||
def _result_bc(rows: list[tuple[str, str, float]]) -> SemanticResult:
|
||||
df = pd.DataFrame(rows, columns=["b", "c", "x"])
|
||||
return SemanticResult(
|
||||
requests=[SemanticRequest(type="SQL", definition="select b,c,sum(x)")],
|
||||
results=pa.Table.from_pandas(df, preserve_index=False),
|
||||
)
|
||||
|
||||
|
||||
def test_projection_reuses_cached_for_dropped_dim(
|
||||
fake_cache: _InMemoryCache,
|
||||
) -> None:
|
||||
impl, ds = _make_view(AggregationType.SUM)
|
||||
impl.get_table = MagicMock(
|
||||
return_value=_result_bc(
|
||||
[("b1", "c1", 5.0), ("b1", "c2", 3.0), ("b2", "c1", 4.0)]
|
||||
)
|
||||
)
|
||||
|
||||
first = get_results(_qo_dims(ds, ["b", "c"]))
|
||||
assert impl.get_table.call_count == 1
|
||||
assert len(first.df) == 3
|
||||
|
||||
second = get_results(_qo_dims(ds, ["b"]))
|
||||
assert impl.get_table.call_count == 1 # served via projection
|
||||
df = second.df.sort_values("b").reset_index(drop=True)
|
||||
assert df["b"].tolist() == ["b1", "b2"]
|
||||
assert df["x"].tolist() == [8.0, 4.0]
|
||||
|
||||
|
||||
def test_projection_skipped_when_aggregation_unknown(
|
||||
fake_cache: _InMemoryCache,
|
||||
) -> None:
|
||||
impl, ds = _make_view(None) # metric has no aggregation declared
|
||||
impl.get_table = MagicMock(
|
||||
side_effect=[
|
||||
_result_bc([("b1", "c1", 5.0), ("b1", "c2", 3.0)]),
|
||||
_result_bc([("b1", "c1", 5.0)]), # what the SV would compute for [b]
|
||||
]
|
||||
)
|
||||
|
||||
get_results(_qo_dims(ds, ["b", "c"]))
|
||||
assert impl.get_table.call_count == 1
|
||||
|
||||
get_results(_qo_dims(ds, ["b"]))
|
||||
assert impl.get_table.call_count == 2 # cannot project, re-executed
|
||||
|
||||
|
||||
def test_projection_skipped_for_avg(
|
||||
fake_cache: _InMemoryCache,
|
||||
) -> None:
|
||||
impl, ds = _make_view(AggregationType.AVG)
|
||||
impl.get_table = MagicMock(
|
||||
side_effect=[
|
||||
_result_bc([("b1", "c1", 5.0), ("b1", "c2", 3.0)]),
|
||||
_result_bc([("b1", "c1", 4.0)]),
|
||||
]
|
||||
)
|
||||
|
||||
get_results(_qo_dims(ds, ["b", "c"]))
|
||||
get_results(_qo_dims(ds, ["b"]))
|
||||
assert impl.get_table.call_count == 2
|
||||
|
||||
@@ -24,6 +24,7 @@ import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
from superset_core.semantic_layers.types import (
|
||||
AggregationType,
|
||||
Dimension,
|
||||
Filter,
|
||||
Metric,
|
||||
@@ -54,8 +55,18 @@ def dim(id_: str, name: str | None = None) -> Dimension:
|
||||
return Dimension(id=id_, name=name or id_, type=pa.utf8())
|
||||
|
||||
|
||||
def met(id_: str, name: str | None = None) -> Metric:
|
||||
return Metric(id=id_, name=name or id_, type=pa.float64(), definition="x")
|
||||
def met(
|
||||
id_: str,
|
||||
name: str | None = None,
|
||||
aggregation: AggregationType | None = None,
|
||||
) -> Metric:
|
||||
return Metric(
|
||||
id=id_,
|
||||
name=name or id_,
|
||||
type=pa.float64(),
|
||||
definition="x",
|
||||
aggregation=aggregation,
|
||||
)
|
||||
|
||||
|
||||
COL_A = dim("col.a", "a")
|
||||
@@ -95,10 +106,15 @@ def query(
|
||||
|
||||
|
||||
def entry_from(q: SemanticQuery, value_key_: str = "vk") -> CachedEntry:
|
||||
from superset.semantic_layers.cache import _group_limit_key, _order_key
|
||||
from superset.semantic_layers.cache import (
|
||||
_dimension_key,
|
||||
_group_limit_key,
|
||||
_order_key,
|
||||
)
|
||||
|
||||
return CachedEntry(
|
||||
filters=frozenset(q.filters or set()),
|
||||
dimension_keys=frozenset(_dimension_key(d) for d in q.dimensions),
|
||||
limit=q.limit,
|
||||
offset=q.offset or 0,
|
||||
order_key=_order_key(q.order),
|
||||
@@ -193,15 +209,16 @@ def test_implies_like_exact_match_only() -> None:
|
||||
def test_can_satisfy_empty_cached_returns_all_as_leftovers() -> None:
|
||||
cached_q = query(filters=None)
|
||||
new_q = query(filters={where(COL_A, Operator.GREATER_THAN, 5)})
|
||||
ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, leftovers, projection = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is True
|
||||
assert projection is False
|
||||
assert leftovers == {where(COL_A, Operator.GREATER_THAN, 5)}
|
||||
|
||||
|
||||
def test_can_satisfy_narrower_filter() -> None:
|
||||
cached_q = query(filters={where(COL_A, Operator.GREATER_THAN, 1)})
|
||||
new_q = query(filters={where(COL_A, Operator.GREATER_THAN, 2)})
|
||||
ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is True
|
||||
assert leftovers == {where(COL_A, Operator.GREATER_THAN, 2)}
|
||||
|
||||
@@ -209,7 +226,7 @@ def test_can_satisfy_narrower_filter() -> None:
|
||||
def test_can_satisfy_broader_filter_fails() -> None:
|
||||
cached_q = query(filters={where(COL_A, Operator.GREATER_THAN, 2)})
|
||||
new_q = query(filters={where(COL_A, Operator.GREATER_THAN, 1)})
|
||||
ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is False
|
||||
assert leftovers == set()
|
||||
|
||||
@@ -217,7 +234,7 @@ def test_can_satisfy_broader_filter_fails() -> None:
|
||||
def test_can_satisfy_missing_constraint_fails() -> None:
|
||||
cached_q = query(filters={where(COL_A, Operator.GREATER_THAN, 1)})
|
||||
new_q = query(filters=None)
|
||||
ok, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
@@ -229,7 +246,7 @@ def test_can_satisfy_new_filter_on_extra_column() -> None:
|
||||
where(COL_B, Operator.EQUALS, "x"),
|
||||
}
|
||||
)
|
||||
ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is True
|
||||
assert leftovers == {
|
||||
where(COL_A, Operator.GREATER_THAN, 2),
|
||||
@@ -244,7 +261,7 @@ def test_can_satisfy_leftover_on_non_projected_column_fails() -> None:
|
||||
filters={where(other, Operator.EQUALS, "x")},
|
||||
dimensions=[COL_A, COL_B],
|
||||
)
|
||||
ok, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
@@ -252,8 +269,8 @@ def test_can_satisfy_having_requires_exact_set() -> None:
|
||||
cached_q = query(filters={having(M_X, Operator.GREATER_THAN, 100)})
|
||||
same = query(filters={having(M_X, Operator.GREATER_THAN, 100)})
|
||||
tighter = query(filters={having(M_X, Operator.GREATER_THAN, 200)})
|
||||
ok_same, _ = can_satisfy(entry_from(cached_q), same)
|
||||
ok_tight, _ = can_satisfy(entry_from(cached_q), tighter)
|
||||
ok_same, _, _ = can_satisfy(entry_from(cached_q), same)
|
||||
ok_tight, _, _ = can_satisfy(entry_from(cached_q), tighter)
|
||||
assert ok_same is True
|
||||
assert ok_tight is False
|
||||
|
||||
@@ -262,8 +279,8 @@ def test_can_satisfy_adhoc_requires_exact_set() -> None:
|
||||
cached_q = query(filters={adhoc("col_a > 1")})
|
||||
same = query(filters={adhoc("col_a > 1")})
|
||||
different = query(filters={adhoc("col_a > 2")})
|
||||
ok_same, _ = can_satisfy(entry_from(cached_q), same)
|
||||
ok_diff, _ = can_satisfy(entry_from(cached_q), different)
|
||||
ok_same, _, _ = can_satisfy(entry_from(cached_q), same)
|
||||
ok_diff, _, _ = can_satisfy(entry_from(cached_q), different)
|
||||
assert ok_same is True
|
||||
assert ok_diff is False
|
||||
|
||||
@@ -276,7 +293,7 @@ def test_can_satisfy_adhoc_requires_exact_set() -> None:
|
||||
def test_can_satisfy_unlimited_cached_satisfies_any_limit() -> None:
|
||||
cached_q = query(filters=None, limit=None)
|
||||
new_q = query(filters=None, limit=10)
|
||||
ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is True
|
||||
assert leftovers == set()
|
||||
|
||||
@@ -285,35 +302,35 @@ def test_can_satisfy_smaller_limit_with_matching_order() -> None:
|
||||
order = [(M_X, OrderDirection.DESC)]
|
||||
cached_q = query(filters=None, limit=100, order=order)
|
||||
new_q = query(filters=None, limit=10, order=order)
|
||||
ok, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is True
|
||||
|
||||
|
||||
def test_can_satisfy_smaller_limit_different_order_fails() -> None:
|
||||
cached_q = query(filters=None, limit=100, order=[(M_X, OrderDirection.DESC)])
|
||||
new_q = query(filters=None, limit=10, order=[(M_X, OrderDirection.ASC)])
|
||||
ok, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_can_satisfy_larger_limit_fails() -> None:
|
||||
cached_q = query(filters=None, limit=10)
|
||||
new_q = query(filters=None, limit=100)
|
||||
ok, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_can_satisfy_no_new_limit_when_cached_has_one_fails() -> None:
|
||||
cached_q = query(filters=None, limit=100)
|
||||
new_q = query(filters=None, limit=None)
|
||||
ok, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_can_satisfy_offset_never_reused() -> None:
|
||||
cached_q = SemanticQuery(metrics=[M_X], dimensions=[COL_A], offset=5)
|
||||
new_q = SemanticQuery(metrics=[M_X], dimensions=[COL_A], offset=5)
|
||||
ok, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
@@ -333,7 +350,7 @@ def test_apply_post_processing_filters_and_limits() -> None:
|
||||
limit=2,
|
||||
)
|
||||
result = _apply_post_processing(
|
||||
cached, new_q, {where(COL_A, Operator.GREATER_THAN, 2)}
|
||||
cached, new_q, {where(COL_A, Operator.GREATER_THAN, 2)}, False
|
||||
)
|
||||
result_df = result.results.to_pandas()
|
||||
assert list(result_df["a"]) == [3, 5]
|
||||
@@ -347,7 +364,7 @@ def test_apply_post_processing_no_leftovers_no_limit_returns_original() -> None:
|
||||
requests=[], results=pa.Table.from_pandas(df, preserve_index=False)
|
||||
)
|
||||
new_q = query(filters=None, limit=None)
|
||||
out = _apply_post_processing(cached, new_q, set())
|
||||
out = _apply_post_processing(cached, new_q, set(), False)
|
||||
# same object reference is OK; we explicitly return the input
|
||||
assert out is cached
|
||||
|
||||
@@ -394,3 +411,261 @@ def test_value_key_with_datetime_filter() -> None:
|
||||
q = SemanticQuery(metrics=[M_X], dimensions=[COL_A], filters={f})
|
||||
# should not raise
|
||||
assert value_key(VIEW, q).startswith("sv:val:")
|
||||
|
||||
|
||||
def test_shape_key_independent_of_dimensions() -> None:
|
||||
# The v2 shape key buckets entries by metric set only; different dimension
|
||||
# sets share the same shape so the projection path can find broader entries.
|
||||
q1 = SemanticQuery(metrics=[M_X], dimensions=[COL_A, COL_B])
|
||||
q2 = SemanticQuery(metrics=[M_X], dimensions=[COL_A])
|
||||
assert shape_key(VIEW, q1) == shape_key(VIEW, q2)
|
||||
# Value keys still differ.
|
||||
assert value_key(VIEW, q1) != value_key(VIEW, q2)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Projection (v2)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
M_SUM = met("met.sum", "sum_x", aggregation=AggregationType.SUM)
|
||||
M_COUNT = met("met.count", "count_x", aggregation=AggregationType.COUNT)
|
||||
M_MIN = met("met.min", "min_x", aggregation=AggregationType.MIN)
|
||||
M_MAX = met("met.max", "max_x", aggregation=AggregationType.MAX)
|
||||
M_AVG = met("met.avg", "avg_x", aggregation=AggregationType.AVG)
|
||||
M_UNKNOWN = met("met.unknown", "unknown_x", aggregation=None)
|
||||
|
||||
|
||||
def _projection_query(
|
||||
metrics: list[Metric],
|
||||
new_dimensions: list[Dimension],
|
||||
cached_dimensions: list[Dimension],
|
||||
cached_filters: set[Filter] | None = None,
|
||||
cached_limit: int | None = None,
|
||||
new_filters: set[Filter] | None = None,
|
||||
new_limit: int | None = None,
|
||||
new_order: Any = None,
|
||||
) -> tuple[CachedEntry, SemanticQuery]:
|
||||
cached_q = SemanticQuery(
|
||||
metrics=metrics,
|
||||
dimensions=cached_dimensions,
|
||||
filters=cached_filters,
|
||||
limit=cached_limit,
|
||||
)
|
||||
new_q = SemanticQuery(
|
||||
metrics=metrics,
|
||||
dimensions=new_dimensions,
|
||||
filters=new_filters,
|
||||
limit=new_limit,
|
||||
order=new_order,
|
||||
)
|
||||
return entry_from(cached_q), new_q
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"metric,operator",
|
||||
[
|
||||
(M_SUM, "sum"),
|
||||
(M_COUNT, "sum"),
|
||||
(M_MIN, "min"),
|
||||
(M_MAX, "max"),
|
||||
],
|
||||
)
|
||||
def test_can_satisfy_projection_each_additive_op(metric: Metric, operator: str) -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[metric],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
)
|
||||
ok, leftovers, projection = can_satisfy(entry, new_q)
|
||||
assert ok is True
|
||||
assert projection is True
|
||||
assert leftovers == set()
|
||||
|
||||
|
||||
def test_projection_rolls_up_sum() -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_SUM],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
)
|
||||
cached_df = pd.DataFrame(
|
||||
{"a": ["x", "x", "y", "y"], "b": [1, 2, 1, 2], "sum_x": [10, 20, 30, 40]}
|
||||
)
|
||||
cached = SemanticResult(
|
||||
requests=[SemanticRequest(type="SQL", definition="select ...")],
|
||||
results=pa.Table.from_pandas(cached_df, preserve_index=False),
|
||||
)
|
||||
out = _apply_post_processing(cached, new_q, set(), True)
|
||||
out_df = out.results.to_pandas().sort_values("a").reset_index(drop=True)
|
||||
assert list(out_df["a"]) == ["x", "y"]
|
||||
assert list(out_df["sum_x"]) == [30, 70]
|
||||
|
||||
|
||||
def test_projection_rolls_up_min_max_count() -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_MIN, M_MAX, M_COUNT],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
)
|
||||
cached_df = pd.DataFrame(
|
||||
{
|
||||
"a": ["x", "x", "y", "y"],
|
||||
"b": [1, 2, 1, 2],
|
||||
"min_x": [5, 2, 9, 8],
|
||||
"max_x": [50, 60, 70, 80],
|
||||
"count_x": [1, 1, 2, 3],
|
||||
}
|
||||
)
|
||||
cached = SemanticResult(
|
||||
requests=[],
|
||||
results=pa.Table.from_pandas(cached_df, preserve_index=False),
|
||||
)
|
||||
out = _apply_post_processing(cached, new_q, set(), True)
|
||||
df = out.results.to_pandas().sort_values("a").reset_index(drop=True)
|
||||
assert list(df["min_x"]) == [2, 8]
|
||||
assert list(df["max_x"]) == [60, 80]
|
||||
assert list(df["count_x"]) == [2, 5]
|
||||
|
||||
|
||||
def test_projection_drops_multiple_dims() -> None:
|
||||
col_c = dim("col.c", "c")
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_SUM],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B, col_c],
|
||||
)
|
||||
cached_df = pd.DataFrame(
|
||||
{
|
||||
"a": ["x", "x", "x", "y"],
|
||||
"b": [1, 1, 2, 1],
|
||||
"c": [10, 20, 10, 10],
|
||||
"sum_x": [1, 2, 3, 4],
|
||||
}
|
||||
)
|
||||
cached = SemanticResult(
|
||||
requests=[], results=pa.Table.from_pandas(cached_df, preserve_index=False)
|
||||
)
|
||||
out = _apply_post_processing(cached, new_q, set(), True)
|
||||
df = out.results.to_pandas().sort_values("a").reset_index(drop=True)
|
||||
assert list(df["sum_x"]) == [6, 4]
|
||||
|
||||
|
||||
def test_projection_with_leftover_filter_then_rollup() -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_SUM],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
new_filters={where(COL_B, Operator.GREATER_THAN, 1)},
|
||||
)
|
||||
cached_df = pd.DataFrame(
|
||||
{"a": ["x", "x", "y"], "b": [1, 2, 2], "sum_x": [10, 20, 30]}
|
||||
)
|
||||
cached = SemanticResult(
|
||||
requests=[], results=pa.Table.from_pandas(cached_df, preserve_index=False)
|
||||
)
|
||||
ok, leftovers, projection = can_satisfy(entry, new_q)
|
||||
assert ok is True
|
||||
assert projection is True
|
||||
out = _apply_post_processing(cached, new_q, leftovers, projection)
|
||||
df = out.results.to_pandas().sort_values("a").reset_index(drop=True)
|
||||
# b > 1 removes the (x,1) row; x sums to 20, y to 30
|
||||
assert list(df["sum_x"]) == [20, 30]
|
||||
|
||||
|
||||
def test_projection_with_order_and_limit() -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_SUM],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
new_order=[(M_SUM, OrderDirection.DESC)],
|
||||
new_limit=1,
|
||||
)
|
||||
cached_df = pd.DataFrame(
|
||||
{"a": ["x", "x", "y"], "b": [1, 2, 1], "sum_x": [1, 2, 100]}
|
||||
)
|
||||
cached = SemanticResult(
|
||||
requests=[], results=pa.Table.from_pandas(cached_df, preserve_index=False)
|
||||
)
|
||||
out = _apply_post_processing(cached, new_q, set(), True)
|
||||
df = out.results.to_pandas()
|
||||
assert len(df) == 1
|
||||
assert df["a"].tolist() == ["y"]
|
||||
assert df["sum_x"].tolist() == [100]
|
||||
|
||||
|
||||
def test_projection_rejected_when_metric_aggregation_unknown() -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_UNKNOWN],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
)
|
||||
ok, _, _ = can_satisfy(entry, new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_projection_rejected_for_avg() -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_AVG],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
)
|
||||
ok, _, _ = can_satisfy(entry, new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_projection_rejected_when_cached_has_limit() -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_SUM],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
cached_limit=10,
|
||||
)
|
||||
ok, _, _ = can_satisfy(entry, new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_projection_rejected_when_cached_has_having() -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_SUM],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
cached_filters={having(M_SUM, Operator.GREATER_THAN, 10)},
|
||||
new_filters={having(M_SUM, Operator.GREATER_THAN, 10)},
|
||||
)
|
||||
ok, _, _ = can_satisfy(entry, new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_projection_rejected_when_order_references_dropped_dim() -> None:
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_SUM],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
new_order=[(COL_B, OrderDirection.ASC)],
|
||||
)
|
||||
ok, _, _ = can_satisfy(entry, new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_projection_rejected_when_cached_has_filter_on_dropped_dim() -> None:
|
||||
# cached restricts c; rolling up to [a] would miss rows we'd need
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_SUM],
|
||||
new_dimensions=[COL_A],
|
||||
cached_dimensions=[COL_A, COL_B],
|
||||
cached_filters={where(COL_B, Operator.GREATER_THAN, 5)},
|
||||
)
|
||||
ok, _, _ = can_satisfy(entry, new_q)
|
||||
assert ok is False
|
||||
|
||||
|
||||
def test_projection_rejected_when_cached_dims_subset_not_superset() -> None:
|
||||
# cached has just [a]; new wants [a, b] — finer-grained data unavailable
|
||||
entry, new_q = _projection_query(
|
||||
metrics=[M_SUM],
|
||||
new_dimensions=[COL_A, COL_B],
|
||||
cached_dimensions=[COL_A],
|
||||
)
|
||||
ok, _, _ = can_satisfy(entry, new_q)
|
||||
assert ok is False
|
||||
|
||||
Reference in New Issue
Block a user