mirror of
https://github.com/apache/superset.git
synced 2026-04-19 08:04:53 +00:00
fix(Timeshift): Determine temporal column correctly (#34582)
This commit is contained in:
committed by
GitHub
parent
a66b7e98e0
commit
adaae8ba15
@@ -456,11 +456,17 @@ class QueryContextProcessor:
|
||||
return f"{(outer_from_dttm - offset_date).days} days ago"
|
||||
return ""
|
||||
|
||||
def processing_time_offsets( # pylint: disable=too-many-locals,too-many-statements # noqa: C901
|
||||
def processing_time_offsets( # pylint: disable=too-many-locals,too-many-statements # noqa: C901
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
query_object: QueryObject,
|
||||
) -> CachedTimeOffset:
|
||||
"""
|
||||
Process time offsets for time comparison feature.
|
||||
|
||||
This method handles both relative time offsets (e.g., "1 week ago") and
|
||||
absolute date range offsets (e.g., "2015-01-03 : 2015-01-04").
|
||||
"""
|
||||
query_context = self._query_context
|
||||
# ensure query_object is immutable
|
||||
query_object_clone = copy.copy(query_object)
|
||||
@@ -550,11 +556,10 @@ class QueryContextProcessor:
|
||||
# Get time offset index
|
||||
index = (get_base_axis_labels(query_object.columns) or [DTTM_ALIAS])[0]
|
||||
|
||||
# Handle temporal filters
|
||||
if is_date_range_offset and feature_flag_manager.is_feature_enabled(
|
||||
"DATE_RANGE_TIMESHIFTS_ENABLED"
|
||||
):
|
||||
# Create a completely new filter list to avoid conflicts
|
||||
# Create a completely new filter list to preserve original filters
|
||||
query_object_clone.filter = copy.deepcopy(query_object_clone.filter)
|
||||
|
||||
# Remove any existing temporal filters that might conflict
|
||||
@@ -564,8 +569,12 @@ class QueryContextProcessor:
|
||||
if not (flt.get("op") == FilterOperator.TEMPORAL_RANGE)
|
||||
]
|
||||
|
||||
# Add our specific temporal filter
|
||||
temporal_col = query_object_clone.granularity or x_axis_label
|
||||
# Determine the temporal column with multiple fallback strategies
|
||||
temporal_col = self._get_temporal_column_for_filter(
|
||||
query_object_clone, x_axis_label
|
||||
)
|
||||
|
||||
# Always add a temporal filter for date range offsets
|
||||
if temporal_col:
|
||||
new_temporal_filter: QueryObjectFilterClause = {
|
||||
"col": temporal_col,
|
||||
@@ -577,7 +586,17 @@ class QueryContextProcessor:
|
||||
}
|
||||
query_object_clone.filter.append(new_temporal_filter)
|
||||
|
||||
else:
|
||||
# This should rarely happen with proper fallbacks
|
||||
raise QueryObjectValidationError(
|
||||
_(
|
||||
"Unable to identify temporal column for date range time comparison." # noqa: E501
|
||||
"Please ensure your dataset has a properly configured time column." # noqa: E501
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
# RELATIVE OFFSET: Original logic for non-date-range offsets
|
||||
# The comparison is not using a temporal column so we need to modify
|
||||
# the temporal filter so we run the query with the correct time range
|
||||
if not dataframe_utils.is_datetime_series(df.get(index)):
|
||||
@@ -600,8 +619,7 @@ class QueryContextProcessor:
|
||||
)
|
||||
flt["val"] = f"{new_outer_from_dttm} : {new_outer_to_dttm}"
|
||||
else:
|
||||
# If it IS a datetime series, we still need to clear conflicting
|
||||
# filters
|
||||
# If it IS a datetime series, we still need to clear conflicts
|
||||
query_object_clone.filter = copy.deepcopy(query_object_clone.filter)
|
||||
|
||||
# For relative offsets with datetime series, ensure the temporal
|
||||
@@ -629,7 +647,7 @@ class QueryContextProcessor:
|
||||
)
|
||||
]
|
||||
|
||||
# Continue with the rest of the method...
|
||||
# Continue with the rest of the method (caching, execution, etc.)
|
||||
cached_time_offset_key = (
|
||||
offset if offset == original_offset else f"{offset}_{original_offset}"
|
||||
)
|
||||
@@ -713,6 +731,40 @@ class QueryContextProcessor:
|
||||
|
||||
return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)
|
||||
|
||||
def _get_temporal_column_for_filter( # noqa: C901
|
||||
self, query_object: QueryObject, x_axis_label: str | None
|
||||
) -> str | None:
|
||||
"""
|
||||
Helper method to reliably determine the temporal column for filtering.
|
||||
|
||||
This method tries multiple strategies to find the correct temporal column:
|
||||
1. Use explicitly set granularity
|
||||
2. Use x_axis_label if it's a temporal column
|
||||
3. Find any datetime column in the datasource
|
||||
|
||||
:param query_object: The query object
|
||||
:param x_axis_label: The x-axis label from the query
|
||||
:return: The name of the temporal column, or None if not found
|
||||
"""
|
||||
# Strategy 1: Use explicitly set granularity
|
||||
if query_object.granularity:
|
||||
return query_object.granularity
|
||||
|
||||
# Strategy 2: Use x_axis_label if it exists
|
||||
if x_axis_label:
|
||||
return x_axis_label
|
||||
|
||||
# Strategy 3: Find any datetime column in the datasource
|
||||
if hasattr(self._qc_datasource, "columns"):
|
||||
for col in self._qc_datasource.columns:
|
||||
if hasattr(col, "is_dttm") and col.is_dttm:
|
||||
if hasattr(col, "column_name"):
|
||||
return col.column_name
|
||||
elif hasattr(col, "name"):
|
||||
return col.name
|
||||
|
||||
return None
|
||||
|
||||
def _process_date_range_offset(
|
||||
self, offset_df: pd.DataFrame, join_keys: list[str]
|
||||
) -> tuple[pd.DataFrame, list[str]]:
|
||||
|
||||
@@ -371,3 +371,256 @@ def test_get_offset_custom_or_inherit_with_invalid_date(processor):
|
||||
|
||||
# Should return empty string for invalid format
|
||||
assert result == ""
|
||||
|
||||
|
||||
def test_get_temporal_column_for_filter_with_granularity(processor):
|
||||
"""Test _get_temporal_column_for_filter returns granularity when available."""
|
||||
query_object = MagicMock()
|
||||
query_object.granularity = "date_column"
|
||||
|
||||
result = processor._get_temporal_column_for_filter(query_object, "x_axis_col")
|
||||
|
||||
assert result == "date_column"
|
||||
|
||||
|
||||
def test_get_temporal_column_for_filter_with_x_axis_fallback(processor):
|
||||
"""Test _get_temporal_column_for_filter falls back to x_axis_label."""
|
||||
query_object = MagicMock()
|
||||
query_object.granularity = None
|
||||
|
||||
result = processor._get_temporal_column_for_filter(query_object, "x_axis_col")
|
||||
|
||||
assert result == "x_axis_col"
|
||||
|
||||
|
||||
def test_get_temporal_column_for_filter_with_datasource_columns(processor):
|
||||
"""Test _get_temporal_column_for_filter finds datetime column from datasource."""
|
||||
query_object = MagicMock()
|
||||
query_object.granularity = None
|
||||
|
||||
# Mock datasource with datetime columns
|
||||
mock_datetime_col = MagicMock()
|
||||
mock_datetime_col.is_dttm = True
|
||||
mock_datetime_col.column_name = "created_at"
|
||||
|
||||
mock_regular_col = MagicMock()
|
||||
mock_regular_col.is_dttm = False
|
||||
mock_regular_col.column_name = "name"
|
||||
|
||||
processor._qc_datasource.columns = [mock_regular_col, mock_datetime_col]
|
||||
|
||||
result = processor._get_temporal_column_for_filter(query_object, None)
|
||||
|
||||
assert result == "created_at"
|
||||
|
||||
|
||||
def test_get_temporal_column_for_filter_with_datasource_name_attr(processor):
|
||||
"""Test _get_temporal_column_for_filter with columns using name attribute."""
|
||||
query_object = MagicMock()
|
||||
query_object.granularity = None
|
||||
|
||||
# Mock datasource with datetime column using 'name' attribute
|
||||
# instead of 'column_name'
|
||||
mock_datetime_col = MagicMock()
|
||||
mock_datetime_col.is_dttm = True
|
||||
mock_datetime_col.name = "timestamp_col"
|
||||
# Remove column_name attribute to test name fallback
|
||||
del mock_datetime_col.column_name
|
||||
|
||||
processor._qc_datasource.columns = [mock_datetime_col]
|
||||
|
||||
result = processor._get_temporal_column_for_filter(query_object, None)
|
||||
|
||||
assert result == "timestamp_col"
|
||||
|
||||
|
||||
def test_get_temporal_column_for_filter_no_columns_found(processor):
|
||||
"""Test _get_temporal_column_for_filter
|
||||
returns None when no temporal column found."""
|
||||
query_object = MagicMock()
|
||||
query_object.granularity = None
|
||||
|
||||
# Mock datasource with no datetime columns
|
||||
mock_regular_col = MagicMock()
|
||||
mock_regular_col.is_dttm = False
|
||||
mock_regular_col.column_name = "name"
|
||||
|
||||
processor._qc_datasource.columns = [mock_regular_col]
|
||||
|
||||
result = processor._get_temporal_column_for_filter(query_object, None)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_get_temporal_column_for_filter_no_datasource_columns(processor):
|
||||
"""Test _get_temporal_column_for_filter handles datasource
|
||||
without columns attribute."""
|
||||
query_object = MagicMock()
|
||||
query_object.granularity = None
|
||||
|
||||
# Remove columns attribute from datasource
|
||||
if hasattr(processor._qc_datasource, "columns"):
|
||||
delattr(processor._qc_datasource, "columns")
|
||||
|
||||
result = processor._get_temporal_column_for_filter(query_object, None)
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_processing_time_offsets_temporal_column_error(processor):
|
||||
"""Test processing_time_offsets raises QueryObjectValidationError
|
||||
when temporal column can't be determined."""
|
||||
from superset.common.query_object import QueryObject
|
||||
from superset.exceptions import QueryObjectValidationError
|
||||
|
||||
# Create a dataframe for testing
|
||||
df = pd.DataFrame({"dim1": ["A", "B", "C"], "metric1": [10, 20, 30]})
|
||||
|
||||
# Create query object with date range offset and proper time range
|
||||
query_object = QueryObject(
|
||||
datasource=MagicMock(),
|
||||
granularity=None, # No granularity set
|
||||
columns=[],
|
||||
is_timeseries=True,
|
||||
time_offsets=["2023-01-01 : 2023-01-31"],
|
||||
filter=[
|
||||
{
|
||||
"col": "some_date_col",
|
||||
"op": "TEMPORAL_RANGE",
|
||||
"val": "2024-01-01 : 2024-01-31",
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
# Mock get_since_until_from_query_object to return valid dates
|
||||
with patch(
|
||||
"superset.common.query_context_processor.get_since_until_from_query_object"
|
||||
) as mock_dates:
|
||||
mock_dates.return_value = (
|
||||
pd.Timestamp("2024-01-01"),
|
||||
pd.Timestamp("2024-01-31"),
|
||||
)
|
||||
|
||||
# Mock feature flag to be enabled
|
||||
with patch(
|
||||
"superset.common.query_context_processor.feature_flag_manager"
|
||||
) as mock_ff:
|
||||
mock_ff.is_feature_enabled.return_value = True
|
||||
|
||||
# Mock _get_temporal_column_for_filter to return None
|
||||
# (no temporal column found)
|
||||
with patch.object(
|
||||
processor, "_get_temporal_column_for_filter", return_value=None
|
||||
):
|
||||
with patch(
|
||||
"superset.common.query_context_processor.get_base_axis_labels",
|
||||
return_value=["__timestamp"],
|
||||
):
|
||||
with pytest.raises(
|
||||
QueryObjectValidationError,
|
||||
match="Unable to identify temporal column",
|
||||
):
|
||||
processor.processing_time_offsets(df, query_object)
|
||||
|
||||
|
||||
def test_processing_time_offsets_date_range_enabled(processor):
|
||||
"""Test processing_time_offsets correctly handles
|
||||
date range offsets when enabled."""
|
||||
from superset.common.query_object import QueryObject
|
||||
|
||||
# Create a dataframe for testing
|
||||
df = pd.DataFrame(
|
||||
{
|
||||
"dim1": ["A", "B", "C"],
|
||||
"metric1": [10, 20, 30],
|
||||
"__timestamp": pd.date_range("2023-01-01", periods=3, freq="D"),
|
||||
}
|
||||
)
|
||||
|
||||
# Create a properly mocked datasource
|
||||
mock_datasource = MagicMock()
|
||||
mock_datasource.id = 123
|
||||
mock_datasource.uid = "abc123"
|
||||
mock_datasource.cache_timeout = None
|
||||
mock_datasource.changed_on = pd.Timestamp("2023-01-01")
|
||||
mock_datasource.get_extra_cache_keys.return_value = {}
|
||||
|
||||
# Create query object with date range offset
|
||||
query_object = QueryObject(
|
||||
datasource=mock_datasource,
|
||||
granularity="date_col",
|
||||
columns=[],
|
||||
is_timeseries=True,
|
||||
time_offsets=["2022-01-01 : 2022-01-31"],
|
||||
filter=[],
|
||||
)
|
||||
|
||||
# Mock the query context and its methods
|
||||
processor._query_context.queries = [query_object]
|
||||
|
||||
with patch(
|
||||
"superset.common.query_context_processor.feature_flag_manager"
|
||||
) as mock_ff:
|
||||
mock_ff.is_feature_enabled.return_value = True
|
||||
|
||||
with patch(
|
||||
"superset.common.query_context_processor.get_base_axis_labels",
|
||||
return_value=["__timestamp"],
|
||||
):
|
||||
with patch(
|
||||
"superset.common.query_context_processor.get_since_until_from_query_object"
|
||||
) as mock_dates:
|
||||
mock_dates.return_value = (
|
||||
pd.Timestamp("2023-01-01"),
|
||||
pd.Timestamp("2023-01-03"),
|
||||
)
|
||||
|
||||
with patch(
|
||||
"superset.common.query_context_processor.get_since_until_from_time_range"
|
||||
) as mock_time_range:
|
||||
mock_time_range.return_value = (
|
||||
pd.Timestamp("2022-01-01"),
|
||||
pd.Timestamp("2022-01-31"),
|
||||
)
|
||||
|
||||
with patch.object(
|
||||
processor, "get_query_result"
|
||||
) as mock_query_result:
|
||||
mock_result = MagicMock()
|
||||
mock_result.df = pd.DataFrame(
|
||||
{
|
||||
"dim1": ["A", "B"],
|
||||
"metric1": [5, 10],
|
||||
"__timestamp": pd.date_range(
|
||||
"2022-01-01", periods=2, freq="D"
|
||||
),
|
||||
}
|
||||
)
|
||||
mock_result.query = "SELECT * FROM table"
|
||||
mock_result.cache_key = "offset_cache_key"
|
||||
mock_query_result.return_value = mock_result
|
||||
|
||||
with patch.object(
|
||||
processor,
|
||||
"_get_temporal_column_for_filter",
|
||||
return_value="date_col",
|
||||
):
|
||||
with patch.object(
|
||||
processor,
|
||||
"query_cache_key",
|
||||
return_value="mock_cache_key",
|
||||
):
|
||||
# Test the method
|
||||
result = processor.processing_time_offsets(
|
||||
df, query_object
|
||||
)
|
||||
|
||||
# Verify that the method completes successfully
|
||||
assert "df" in result
|
||||
assert "queries" in result
|
||||
assert "cache_keys" in result
|
||||
|
||||
# Verify the result has the expected structure
|
||||
assert isinstance(result["df"], pd.DataFrame)
|
||||
assert isinstance(result["queries"], list)
|
||||
assert isinstance(result["cache_keys"], list)
|
||||
|
||||
Reference in New Issue
Block a user