mirror of
https://github.com/apache/superset.git
synced 2026-04-25 11:04:48 +00:00
refactor(plugins): BigNumber Time Comparison with existing time_offset API (#27718)
Co-authored-by: lilykuang <jialikuang@gmail.com> Co-authored-by: Kamil Gabryjelski <kamil.gabryjelski@gmail.com>
This commit is contained in:
@@ -19,7 +19,7 @@ from __future__ import annotations
|
||||
import copy
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, ClassVar, TYPE_CHECKING, TypedDict
|
||||
from typing import Any, cast, ClassVar, TYPE_CHECKING, TypedDict
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
@@ -55,6 +55,7 @@ from superset.utils.core import (
|
||||
DateColumn,
|
||||
DTTM_ALIAS,
|
||||
error_msg_from_exception,
|
||||
FilterOperator,
|
||||
get_base_axis_labels,
|
||||
get_column_names_from_columns,
|
||||
get_column_names_from_metrics,
|
||||
@@ -390,11 +391,6 @@ class QueryContextProcessor:
|
||||
|
||||
time_grain = self.get_time_grain(query_object)
|
||||
|
||||
if not time_grain:
|
||||
raise QueryObjectValidationError(
|
||||
_("Time Grain must be specified when using Time Shift.")
|
||||
)
|
||||
|
||||
metric_names = get_metric_names(query_object.metrics)
|
||||
|
||||
# use columns that are not metrics as join keys
|
||||
@@ -429,6 +425,28 @@ class QueryContextProcessor:
|
||||
query_object_clone.inner_to_dttm = outer_to_dttm
|
||||
query_object_clone.time_offsets = []
|
||||
query_object_clone.post_processing = []
|
||||
# Get time offset index
|
||||
index = (get_base_axis_labels(query_object.columns) or [DTTM_ALIAS])[0]
|
||||
# The comparison is not using a temporal column so we need to modify
|
||||
# the temporal filter so we run the query with the correct time range
|
||||
if not dataframe_utils.is_datetime_series(df.get(index)):
|
||||
# Lets find the first temporal filter in the filters array and change
|
||||
# its val to be the result of get_since_until with the offset
|
||||
for flt in query_object_clone.filter:
|
||||
if flt.get(
|
||||
"op"
|
||||
) == FilterOperator.TEMPORAL_RANGE.value and isinstance(
|
||||
flt.get("val"), str
|
||||
):
|
||||
time_range = cast(str, flt.get("val"))
|
||||
(
|
||||
new_outer_from_dttm,
|
||||
new_outer_to_dttm,
|
||||
) = get_since_until_from_time_range(
|
||||
time_range=time_range,
|
||||
time_shift=offset,
|
||||
)
|
||||
flt["val"] = f"{new_outer_from_dttm} : {new_outer_to_dttm}"
|
||||
query_object_clone.filter = [
|
||||
flt
|
||||
for flt in query_object_clone.filter
|
||||
@@ -488,16 +506,6 @@ class QueryContextProcessor:
|
||||
# 2. rename extra query columns
|
||||
offset_metrics_df = offset_metrics_df.rename(columns=metrics_mapping)
|
||||
|
||||
# 3. set time offset for index
|
||||
index = (get_base_axis_labels(query_object.columns) or [DTTM_ALIAS])[0]
|
||||
if not dataframe_utils.is_datetime_series(offset_metrics_df.get(index)):
|
||||
raise QueryObjectValidationError(
|
||||
_(
|
||||
"A time column must be specified "
|
||||
"when using a Time Comparison."
|
||||
)
|
||||
)
|
||||
|
||||
# cache df and query
|
||||
value = {
|
||||
"df": offset_metrics_df,
|
||||
@@ -526,7 +534,7 @@ class QueryContextProcessor:
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
offset_dfs: dict[str, pd.DataFrame],
|
||||
time_grain: str,
|
||||
time_grain: str | None,
|
||||
join_keys: list[str],
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
@@ -541,43 +549,58 @@ class QueryContextProcessor:
|
||||
time_grain
|
||||
)
|
||||
|
||||
if join_column_producer and not time_grain:
|
||||
raise QueryObjectValidationError(
|
||||
_("Time Grain must be specified when using Time Shift.")
|
||||
)
|
||||
|
||||
# iterate on offset_dfs, left join each with df
|
||||
for offset, offset_df in offset_dfs.items():
|
||||
# defines a column name for the offset join column
|
||||
column_name = OFFSET_JOIN_COLUMN_SUFFIX + offset
|
||||
actual_join_keys = join_keys
|
||||
|
||||
# add offset join column to df
|
||||
self.add_offset_join_column(
|
||||
df, column_name, time_grain, offset, join_column_producer
|
||||
)
|
||||
if time_grain:
|
||||
# defines a column name for the offset join column
|
||||
column_name = OFFSET_JOIN_COLUMN_SUFFIX + offset
|
||||
|
||||
# add offset join column to offset_df
|
||||
self.add_offset_join_column(
|
||||
offset_df, column_name, time_grain, None, join_column_producer
|
||||
)
|
||||
# add offset join column to df
|
||||
self.add_offset_join_column(
|
||||
df, column_name, time_grain, offset, join_column_producer
|
||||
)
|
||||
|
||||
# the temporal column is the first column in the join keys
|
||||
# so we use the join column instead of the temporal column
|
||||
actual_join_keys = [column_name, *join_keys[1:]]
|
||||
# add offset join column to offset_df
|
||||
self.add_offset_join_column(
|
||||
offset_df, column_name, time_grain, None, join_column_producer
|
||||
)
|
||||
|
||||
# left join df with offset_df
|
||||
df = dataframe_utils.left_join_df(
|
||||
left_df=df,
|
||||
right_df=offset_df,
|
||||
join_keys=actual_join_keys,
|
||||
rsuffix=R_SUFFIX,
|
||||
)
|
||||
# the temporal column is the first column in the join keys
|
||||
# so we use the join column instead of the temporal column
|
||||
actual_join_keys = [column_name, *join_keys[1:]]
|
||||
|
||||
# move the temporal column to the first column in df
|
||||
col = df.pop(join_keys[0])
|
||||
df.insert(0, col.name, col)
|
||||
if join_keys:
|
||||
df = dataframe_utils.left_join_df(
|
||||
left_df=df,
|
||||
right_df=offset_df,
|
||||
join_keys=actual_join_keys,
|
||||
rsuffix=R_SUFFIX,
|
||||
)
|
||||
else:
|
||||
df = dataframe_utils.full_outer_join_df(
|
||||
left_df=df,
|
||||
right_df=offset_df,
|
||||
rsuffix=R_SUFFIX,
|
||||
)
|
||||
|
||||
# removes columns created only for join purposes
|
||||
df.drop(
|
||||
list(df.filter(regex=f"{OFFSET_JOIN_COLUMN_SUFFIX}|{R_SUFFIX}")),
|
||||
axis=1,
|
||||
inplace=True,
|
||||
)
|
||||
if time_grain:
|
||||
# move the temporal column to the first column in df
|
||||
col = df.pop(join_keys[0])
|
||||
df.insert(0, col.name, col)
|
||||
|
||||
# removes columns created only for join purposes
|
||||
df.drop(
|
||||
list(df.filter(regex=f"{OFFSET_JOIN_COLUMN_SUFFIX}|{R_SUFFIX}")),
|
||||
axis=1,
|
||||
inplace=True,
|
||||
)
|
||||
return df
|
||||
|
||||
@staticmethod
|
||||
|
||||
Reference in New Issue
Block a user