fix: Time shifts with different granularity for ECharts (#24176)

This commit is contained in:
Michael S. Molina
2023-06-08 16:03:37 -03:00
committed by GitHub
parent e922f0993d
commit e5b7f7c9b5
39 changed files with 682 additions and 416 deletions

View File

@@ -37,7 +37,7 @@ from superset.common.utils import dataframe_utils
from superset.common.utils.query_cache_manager import QueryCacheManager
from superset.common.utils.time_range_utils import get_since_until_from_query_object
from superset.connectors.base.models import BaseDatasource
from superset.constants import CacheRegion
from superset.constants import CacheRegion, TimeGrain
from superset.exceptions import (
InvalidPostProcessingError,
QueryObjectValidationError,
@@ -74,6 +74,27 @@ config = app.config
stats_logger: BaseStatsLogger = config["STATS_LOGGER"]
logger = logging.getLogger(__name__)
# Temporary column used for joining aggregated offset results
AGGREGATED_JOIN_COLUMN = "__aggregated_join_column"
# This only includes time grains that may influence
# the temporal column used for joining offset results.
# Given that we don't allow time shifts smaller than a day,
# we don't need to include smaller time grains aggregations.
AGGREGATED_JOIN_GRAINS = {
TimeGrain.WEEK,
TimeGrain.WEEK_STARTING_SUNDAY,
TimeGrain.WEEK_STARTING_MONDAY,
TimeGrain.WEEK_ENDING_SATURDAY,
TimeGrain.WEEK_ENDING_SUNDAY,
TimeGrain.MONTH,
TimeGrain.QUARTER,
TimeGrain.YEAR,
}
# Right suffix used for joining offset results
R_SUFFIX = "__right_suffix"
class CachedTimeOffset(TypedDict):
df: pd.DataFrame
@@ -89,10 +110,6 @@ class QueryContextProcessor:
_query_context: QueryContext
_qc_datasource: BaseDatasource
"""
The query context contains the query object and additional fields necessary
to retrieve the data payload for a given viz.
"""
def __init__(self, query_context: QueryContext):
self._query_context = query_context
@@ -307,6 +324,35 @@ class QueryContextProcessor:
return df
@staticmethod
def get_time_grain(query_object: QueryObject) -> Any | None:
if (
query_object.columns
and len(query_object.columns) > 0
and isinstance(query_object.columns[0], dict)
):
# If the time grain is in the columns it will be the first one
# and it will be of AdhocColumn type
return query_object.columns[0].get("timeGrain")
return query_object.extras.get("time_grain_sqla")
def add_aggregated_join_column(
self,
df: pd.DataFrame,
time_grain: str,
join_column_producer: Any = None,
) -> None:
if join_column_producer:
df[AGGREGATED_JOIN_COLUMN] = df.apply(
lambda row: join_column_producer(row, 0), axis=1
)
else:
df[AGGREGATED_JOIN_COLUMN] = df.apply(
lambda row: self.get_aggregated_join_column(row, 0, time_grain),
axis=1,
)
def processing_time_offsets( # pylint: disable=too-many-locals,too-many-statements
self,
df: pd.DataFrame,
@@ -317,9 +363,8 @@ class QueryContextProcessor:
query_object_clone = copy.copy(query_object)
queries: list[str] = []
cache_keys: list[str | None] = []
rv_dfs: list[pd.DataFrame] = [df]
offset_dfs: list[pd.DataFrame] = []
time_offsets = query_object.time_offsets
outer_from_dttm, outer_to_dttm = get_since_until_from_query_object(query_object)
if not outer_from_dttm or not outer_to_dttm:
raise QueryObjectValidationError(
@@ -328,7 +373,31 @@ class QueryContextProcessor:
"when using a Time Comparison."
)
)
for offset in time_offsets:
columns = df.columns
time_grain = self.get_time_grain(query_object)
if not time_grain:
raise QueryObjectValidationError(
_("Time Grain must be specified when using Time Shift.")
)
join_column_producer = config["TIME_GRAIN_JOIN_COLUMN_PRODUCERS"].get(
time_grain
)
use_aggregated_join_column = (
join_column_producer or time_grain in AGGREGATED_JOIN_GRAINS
)
if use_aggregated_join_column:
self.add_aggregated_join_column(df, time_grain, join_column_producer)
# skips the first column which is the temporal column
# because we'll use the aggregated join columns instead
columns = df.columns[1:]
metric_names = get_metric_names(query_object.metrics)
join_keys = [col for col in columns if col not in metric_names]
for offset in query_object.time_offsets:
try:
# pylint: disable=line-too-long
# Since the xaxis is also a column name for the time filter, xaxis_label will be set as granularity
@@ -364,13 +433,15 @@ class QueryContextProcessor:
]
# `offset` is added to the hash function
cache_key = self.query_cache_key(query_object_clone, time_offset=offset)
cache_key = self.query_cache_key(
query_object_clone, time_offset=offset, time_grain=time_grain
)
cache = QueryCacheManager.get(
cache_key, CacheRegion.DATA, query_context.force
)
# whether hit on the cache
if cache.is_loaded:
rv_dfs.append(cache.df)
offset_dfs.append(cache.df)
queries.append(cache.query)
cache_keys.append(cache_key)
continue
@@ -379,11 +450,8 @@ class QueryContextProcessor:
# rename metrics: SUM(value) => SUM(value) 1 year ago
metrics_mapping = {
metric: TIME_COMPARISON.join([metric, offset])
for metric in get_metric_names(
query_object_clone_dct.get("metrics", [])
)
for metric in metric_names
}
join_keys = [col for col in df.columns if col not in metrics_mapping.keys()]
if isinstance(self._qc_datasource, Query):
result = self._qc_datasource.exc_query(query_object_clone_dct)
@@ -420,21 +488,19 @@ class QueryContextProcessor:
)
)
# modifies temporal column using offset
offset_metrics_df[index] = offset_metrics_df[index] - DateOffset(
**normalize_time_delta(offset)
)
# df left join `offset_metrics_df`
offset_df = dataframe_utils.left_join_df(
left_df=df,
right_df=offset_metrics_df,
join_keys=join_keys,
)
offset_slice = offset_df[metrics_mapping.values()]
if use_aggregated_join_column:
self.add_aggregated_join_column(
offset_metrics_df, time_grain, join_column_producer
)
# set offset_slice to cache and stack.
# cache df and query
value = {
"df": offset_slice,
"df": offset_metrics_df,
"query": result.query,
}
cache.set(
@@ -444,10 +510,51 @@ class QueryContextProcessor:
datasource_uid=query_context.datasource.uid,
region=CacheRegion.DATA,
)
rv_dfs.append(offset_slice)
offset_dfs.append(offset_metrics_df)
rv_df = pd.concat(rv_dfs, axis=1, copy=False) if time_offsets else df
return CachedTimeOffset(df=rv_df, queries=queries, cache_keys=cache_keys)
if offset_dfs:
# iterate on offset_dfs, left join each with df
for offset_df in offset_dfs:
df = dataframe_utils.left_join_df(
left_df=df,
right_df=offset_df,
join_keys=join_keys,
rsuffix=R_SUFFIX,
)
# removes columns used for join
df.drop(
list(df.filter(regex=f"{AGGREGATED_JOIN_COLUMN}|{R_SUFFIX}")),
axis=1,
inplace=True,
)
return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)
@staticmethod
def get_aggregated_join_column(
row: pd.Series, column_index: int, time_grain: str
) -> str:
if time_grain in (
TimeGrain.WEEK_STARTING_SUNDAY,
TimeGrain.WEEK_ENDING_SATURDAY,
):
return row[column_index].strftime("%Y-W%U")
if time_grain in (
TimeGrain.WEEK,
TimeGrain.WEEK_STARTING_MONDAY,
TimeGrain.WEEK_ENDING_SUNDAY,
):
return row[column_index].strftime("%Y-W%W")
if time_grain == TimeGrain.MONTH:
return row[column_index].strftime("%Y-%m")
if time_grain == TimeGrain.QUARTER:
return row[column_index].strftime("%Y-Q") + str(row[column_index].quarter)
return row[column_index].strftime("%Y")
def get_data(self, df: pd.DataFrame) -> str | list[dict[str, Any]]:
if self._query_context.result_format in ChartDataResultFormat.table_like():