fix: cache key generation (#36225)

This commit is contained in:
Beto Dealmeida
2025-11-25 12:50:00 -05:00
committed by GitHub
parent 8d5d71199a
commit 0c87034b17
7 changed files with 391 additions and 12 deletions

View File

@@ -700,6 +700,169 @@ def test_processing_time_offsets_date_range_enabled(processor):
assert isinstance(result["cache_keys"], list)
def test_ensure_totals_available_updates_cache_values():
"""
Test that ensure_totals_available() updates the query objects AND
cache_values to keep them in sync.
The issue was that ensure_totals_available() modified QueryObject instances
(e.g., setting row_limit=None on totals queries and adding contribution_totals
to post_processing), but cache_values still contained the original queries.
This caused cache key mismatches between worker execution and cache fetch.
"""
import pandas as pd
from superset.common.query_object import QueryObject
# Create a mock datasource
mock_datasource = MagicMock()
mock_datasource.uid = "test_datasource"
mock_datasource.database.db_engine_spec.engine = "postgresql"
mock_datasource.cache_timeout = None
mock_datasource.changed_on = None
# Create QueryObjects that would trigger ensure_totals_available logic
# Query 1: Main query with contribution post-processing (needs totals)
main_query = QueryObject(
datasource=mock_datasource,
columns=["brokerage"],
metrics=["Net Amount In", "Amount Out", "Amount In"],
row_limit=50000,
orderby=[["Net Amount In", False]],
post_processing=[
{
"operation": "contribution",
"options": {
"columns": ["Amount In", "Amount Out"],
"rename_columns": ["%Amount In", "%Amount Out"],
},
}
],
)
# Query 2: Totals query (no columns, has metrics, no post-processing)
totals_query = QueryObject(
datasource=mock_datasource,
columns=[], # No columns = totals query
metrics=["Net Amount In", "Amount Out", "Amount In"],
row_limit=50000,
post_processing=[], # No post-processing
)
# Create mock query context
mock_query_context = MagicMock()
mock_query_context.force = False
mock_query_context.datasource = mock_datasource
mock_query_context.queries = [main_query, totals_query]
mock_query_context.result_type = "full"
mock_query_context.cache_values = {
"datasource": {"type": "table", "id": 1},
"queries": [
# These are the original queries as they would be stored in cache_values
{
"columns": ["brokerage"],
"metrics": ["Net Amount In", "Amount Out", "Amount In"],
"row_limit": 50000,
"orderby": [("Net Amount In", False)],
"post_processing": [
{
"operation": "contribution",
"options": {
"columns": ["Amount In", "Amount Out"],
"rename_columns": ["%Amount In", "%Amount Out"],
},
}
],
},
{
"columns": [],
"metrics": ["Net Amount In", "Amount Out", "Amount In"],
"row_limit": 50000,
"post_processing": [],
},
],
"result_type": "full",
"result_format": "json",
}
# Create processor
processor = QueryContextProcessor(mock_query_context)
processor._qc_datasource = mock_datasource
# Mock the query execution result for totals query
mock_query_result = MagicMock()
mock_df = pd.DataFrame(
{
"Net Amount In": [20228060486.838825],
"Amount Out": [-20543489614.980007],
"Amount In": [40771550101.81883],
}
)
mock_query_result.df = mock_df
with patch.object(
mock_query_context, "get_query_result", return_value=mock_query_result
):
# Call ensure_totals_available
processor.ensure_totals_available()
# Now call get_payload which should update cache_values
with patch(
"superset.common.query_context_processor.get_query_results"
) as mock_get_query_results:
# Mock the query results
mock_query_results_response = [
{
"data": [{"brokerage": "Test", "Net Amount In": 100}],
"query": "SELECT ...",
}
]
mock_get_query_results.return_value = mock_query_results_response
# Mock cache manager to avoid actual caching
with patch(
"superset.common.query_context_processor.QueryCacheManager"
) as mock_cache_manager:
mock_cache = MagicMock()
mock_cache.is_loaded = True
mock_cache.df = pd.DataFrame(
{"brokerage": ["Test"], "Net Amount In": [100]}
)
mock_cache.query = "SELECT ..."
mock_cache.error_message = None
mock_cache.status = "success"
mock_cache_manager.get.return_value = mock_cache
# This should update cache_values to match the modified queries
processor.get_payload(cache_query_context=False)
# Verify that cache_values has been updated to reflect the modifications
updated_cache_queries = mock_query_context.cache_values["queries"]
# Check that totals query has row_limit=None (modified by ensure_totals_available)
assert updated_cache_queries[1]["row_limit"] is None, (
"Expected totals query to have row_limit=None after ensure_totals_available, "
f"but got: {updated_cache_queries[1]['row_limit']}"
)
# Check that the main query has contribution_totals in post_processing
assert (
"contribution_totals"
in updated_cache_queries[0]["post_processing"][0]["options"]
), "Expected main query post_processing to have contribution_totals added"
# Verify the contribution_totals match what we mocked
expected_totals = {
"Net Amount In": 20228060486.838825,
"Amount Out": -20543489614.980007,
"Amount In": 40771550101.81883,
}
assert (
updated_cache_queries[0]["post_processing"][0]["options"]["contribution_totals"]
== expected_totals
)
def test_get_df_payload_validates_before_cache_key_generation():
"""
Test that get_df_payload calls validate() before generating cache key.
@@ -775,3 +938,131 @@ def test_get_df_payload_validates_before_cache_key_generation():
f"Expected validate to be called before cache_key, "
f"but got call order: {call_order}"
)
def test_cache_values_sync_after_ensure_totals_available():
"""
Test that cache_values is synchronized with QueryObject modifications
after ensure_totals_available() runs.
This is a focused regression test for the cache key mismatch issue.
It verifies that when ensure_totals_available() modifies QueryObject
instances, those changes are reflected in cache_values before the
QueryContext cache key is generated.
"""
import pandas as pd
from superset.common.query_object import QueryObject
# Create a mock datasource
mock_datasource = MagicMock()
mock_datasource.uid = "test_datasource_456"
mock_datasource.database.db_engine_spec.engine = "pinot"
mock_datasource.cache_timeout = None
mock_datasource.changed_on = None
# Create two queries: one totals query and one main query with contribution
totals_query = QueryObject(
datasource=mock_datasource,
columns=[],
metrics=["sales"],
row_limit=1000,
post_processing=[],
)
main_query = QueryObject(
datasource=mock_datasource,
columns=["region"],
metrics=["sales"],
row_limit=1000,
post_processing=[{"operation": "contribution", "options": {}}],
)
# Create mock query context with initial cache_values
mock_query_context = MagicMock()
mock_query_context.force = False
mock_query_context.datasource = mock_datasource
mock_query_context.queries = [main_query, totals_query]
mock_query_context.result_type = "full"
mock_query_context.cache_values = {
"datasource": {"type": "table", "id": 20},
"queries": [
{
"columns": ["region"],
"metrics": ["sales"],
"row_limit": 1000,
"post_processing": [{"operation": "contribution", "options": {}}],
},
{
"columns": [],
"metrics": ["sales"],
"row_limit": 1000,
"post_processing": [],
},
],
"result_type": "full",
"result_format": "json",
}
# Create processor
processor = QueryContextProcessor(mock_query_context)
processor._qc_datasource = mock_datasource
# Mock query execution result (totals query execution)
mock_query_result = MagicMock()
mock_df = pd.DataFrame({"sales": [1000.0]})
mock_query_result.df = mock_df
# Patch methods to isolate the test
with patch.object(
mock_query_context, "get_query_result", return_value=mock_query_result
):
# Mock cache management to prevent actual caching
with patch(
"superset.common.query_context_processor.QueryCacheManager"
) as mock_cache_manager:
mock_cache = MagicMock()
mock_cache.is_loaded = True
mock_cache.df = pd.DataFrame({"region": ["North"], "sales": [100]})
mock_cache.query = "SELECT region, SUM(sales) FROM table GROUP BY region"
mock_cache.error_message = None
mock_cache.status = "success"
mock_cache_manager.get.return_value = mock_cache
# Mock the query results
with patch(
"superset.common.query_context_processor.get_query_results"
) as mock_get_query_results:
mock_query_results_response = [
{
"data": [{"region": "North", "sales": 100}],
"query": "SELECT region, SUM(sales) FROM table GROUP BY region",
}
]
mock_get_query_results.return_value = mock_query_results_response
# Call get_payload - this internally calls ensure_totals_available()
# and then should update cache_values
processor.get_payload(cache_query_context=False)
# Verify the fix: cache_values should now reflect the modifications
updated_cache_queries = mock_query_context.cache_values["queries"]
updated_totals_row_limit = updated_cache_queries[1]["row_limit"]
# Before the fix: row_limit would remain 1000 in cache_values
# After the fix: row_limit should be None (modified by
# ensure_totals_available)
assert updated_totals_row_limit is None, (
"Expected row_limit to be None after ensure_totals_available, "
f"but got: {updated_totals_row_limit}"
)
# Verify that contribution_totals was added to the main query
assert (
"contribution_totals"
in updated_cache_queries[0]["post_processing"][0]["options"]
)
# Verify that the main query row_limit is still 1000 (only totals query
# should be modified)
assert updated_cache_queries[0]["row_limit"] == 1000