mirror of
https://github.com/apache/superset.git
synced 2026-04-29 21:14:22 +00:00
Compare commits
2 Commits
playwright
...
memory_lea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
793a075915 | ||
|
|
64d25c85f7 |
@@ -467,13 +467,24 @@ class QueryContextProcessor:
|
||||
This method handles both relative time offsets (e.g., "1 week ago") and
|
||||
absolute date range offsets (e.g., "2015-01-03 : 2015-01-04").
|
||||
"""
|
||||
import gc
|
||||
|
||||
query_context = self._query_context
|
||||
# ensure query_object is immutable
|
||||
query_object_clone = copy.copy(query_object)
|
||||
queries: list[str] = []
|
||||
cache_keys: list[str | None] = []
|
||||
offset_dfs: dict[str, pd.DataFrame] = {}
|
||||
|
||||
# Track memory usage for monitoring
|
||||
initial_memory = None
|
||||
try:
|
||||
import psutil
|
||||
|
||||
process = psutil.Process()
|
||||
initial_memory = process.memory_info().rss / 1024 / 1024
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
outer_from_dttm, outer_to_dttm = get_since_until_from_query_object(query_object)
|
||||
if not outer_from_dttm or not outer_to_dttm:
|
||||
raise QueryObjectValidationError(
|
||||
@@ -729,6 +740,21 @@ class QueryContextProcessor:
|
||||
join_keys,
|
||||
)
|
||||
|
||||
# Memory cleanup: clear DataFrame references and force garbage collection
|
||||
if initial_memory is not None:
|
||||
try:
|
||||
final_memory = process.memory_info().rss / 1024 / 1024
|
||||
memory_growth = final_memory - initial_memory
|
||||
logger.info(
|
||||
f"Time offset processing: {memory_growth:+.1f}MB, "
|
||||
f"{len(offset_dfs)} offsets"
|
||||
)
|
||||
except Exception: # noqa: S110
|
||||
pass
|
||||
|
||||
offset_dfs.clear()
|
||||
gc.collect()
|
||||
|
||||
return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)
|
||||
|
||||
def _get_temporal_column_for_filter( # noqa: C901
|
||||
@@ -855,10 +881,20 @@ class QueryContextProcessor:
|
||||
return offset_df, join_keys
|
||||
|
||||
def _perform_join(
|
||||
self, df: pd.DataFrame, offset_df: pd.DataFrame, actual_join_keys: list[str]
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
offset_df: pd.DataFrame,
|
||||
actual_join_keys: list[str],
|
||||
offset_name: str = "unknown",
|
||||
) -> pd.DataFrame:
|
||||
"""Perform the appropriate join operation."""
|
||||
"""Perform join with memory safety validation."""
|
||||
if actual_join_keys:
|
||||
# Validate join keys to prevent cartesian products (if feature enabled)
|
||||
if feature_flag_manager.is_feature_enabled("MEMORY_LEAK_JOIN_VALIDATION"):
|
||||
self._validate_join_keys_for_memory_safety(
|
||||
df, offset_df, actual_join_keys, offset_name
|
||||
)
|
||||
|
||||
return dataframe_utils.left_join_df(
|
||||
left_df=df,
|
||||
right_df=offset_df,
|
||||
@@ -884,6 +920,34 @@ class QueryContextProcessor:
|
||||
)
|
||||
return result_df
|
||||
|
||||
def _validate_join_keys_for_memory_safety(
|
||||
self,
|
||||
left_df: pd.DataFrame,
|
||||
right_df: pd.DataFrame,
|
||||
join_keys: list[str],
|
||||
offset_name: str,
|
||||
) -> None:
|
||||
"""
|
||||
Prevent memory explosions by ensuring unique join keys.
|
||||
|
||||
Time offset joins should have 1:1 key relationships to avoid cartesian products.
|
||||
"""
|
||||
if not join_keys:
|
||||
return
|
||||
|
||||
left_duplicates = left_df[join_keys].duplicated().sum()
|
||||
right_duplicates = right_df[join_keys].duplicated().sum()
|
||||
|
||||
if left_duplicates > 0 or right_duplicates > 0:
|
||||
raise QueryObjectValidationError(
|
||||
_(
|
||||
f"Time offset join failed: duplicate keys detected in "
|
||||
f"'{offset_name}'. Left: {left_duplicates}, "
|
||||
f"Right: {right_duplicates} "
|
||||
f"duplicates. This would cause memory explosion."
|
||||
)
|
||||
)
|
||||
|
||||
def join_offset_dfs(
|
||||
self,
|
||||
df: pd.DataFrame,
|
||||
@@ -925,7 +989,7 @@ class QueryContextProcessor:
|
||||
join_column_producer,
|
||||
)
|
||||
|
||||
df = self._perform_join(df, offset_df, actual_join_keys)
|
||||
df = self._perform_join(df, offset_df, actual_join_keys, offset)
|
||||
df = self._apply_cleanup_logic(
|
||||
df, offset, time_grain, join_keys, is_date_range_offset
|
||||
)
|
||||
|
||||
@@ -46,10 +46,103 @@ class QueryCacheManager:
|
||||
Class for manage query-cache getting and setting
|
||||
"""
|
||||
|
||||
# Maximum cache memory usage in MB to prevent OOM
|
||||
DEFAULT_MAX_CACHE_MEMORY_MB = 1024
|
||||
|
||||
@property
|
||||
def stats_logger(self) -> BaseStatsLogger:
|
||||
return current_app.config["STATS_LOGGER"]
|
||||
|
||||
@staticmethod
|
||||
def _get_dataframe_memory_mb(df: DataFrame) -> float:
|
||||
"""Get DataFrame memory usage in MB"""
|
||||
try:
|
||||
return df.memory_usage(deep=True).sum() / 1024 / 1024
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
@staticmethod
|
||||
def _get_cache_memory_usage(region: CacheRegion = CacheRegion.DATA) -> float:
|
||||
"""
|
||||
MEMORY LEAK FIX: Calculate total cache memory usage for DataFrames.
|
||||
|
||||
This helps prevent cache from consuming all available memory.
|
||||
"""
|
||||
try:
|
||||
cache = _cache[region]
|
||||
total_memory = 0.0
|
||||
|
||||
# Iterate through cache and sum DataFrame memory usage
|
||||
for key in cache.cache._cache.keys(): # Access underlying cache dict
|
||||
try:
|
||||
value = cache.get(key)
|
||||
if value and isinstance(value, dict) and "df" in value:
|
||||
df = value["df"]
|
||||
if isinstance(df, DataFrame):
|
||||
total_memory += QueryCacheManager._get_dataframe_memory_mb(
|
||||
df
|
||||
)
|
||||
except Exception: # noqa: S112
|
||||
continue
|
||||
|
||||
return total_memory
|
||||
except Exception as ex:
|
||||
logger.warning(f"Error calculating cache memory usage: {ex}")
|
||||
return 0.0
|
||||
|
||||
@staticmethod
|
||||
def _evict_largest_cache_entries(
|
||||
region: CacheRegion = CacheRegion.DATA, target_reduction_mb: float = 200
|
||||
) -> int:
|
||||
"""
|
||||
MEMORY LEAK FIX: Evict largest cache entries to free memory.
|
||||
|
||||
Returns number of entries evicted.
|
||||
"""
|
||||
try:
|
||||
cache = _cache[region]
|
||||
entries_with_sizes = []
|
||||
|
||||
# Get all cache entries with their DataFrame sizes
|
||||
for key in list(cache.cache._cache.keys()):
|
||||
try:
|
||||
value = cache.get(key)
|
||||
if value and isinstance(value, dict) and "df" in value:
|
||||
df = value["df"]
|
||||
if isinstance(df, DataFrame):
|
||||
size_mb = QueryCacheManager._get_dataframe_memory_mb(df)
|
||||
entries_with_sizes.append((key, size_mb))
|
||||
except Exception: # noqa: S112
|
||||
continue
|
||||
|
||||
# Sort by size descending (largest first)
|
||||
entries_with_sizes.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# Evict largest entries until we hit target reduction
|
||||
evicted = 0
|
||||
total_freed = 0.0
|
||||
|
||||
for key, size_mb in entries_with_sizes:
|
||||
if total_freed >= target_reduction_mb:
|
||||
break
|
||||
|
||||
try:
|
||||
cache.delete(key)
|
||||
evicted += 1
|
||||
total_freed += size_mb
|
||||
logger.debug(f"Evicted cache entry {key}: {size_mb:.2f}MB")
|
||||
except Exception: # noqa: S112
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f"Cache eviction: removed {evicted} entries, freed {total_freed:.2f}MB"
|
||||
)
|
||||
return evicted
|
||||
|
||||
except Exception as ex:
|
||||
logger.error(f"Error during cache eviction: {ex}")
|
||||
return 0
|
||||
|
||||
# pylint: disable=too-many-instance-attributes,too-many-arguments
|
||||
def __init__(
|
||||
self,
|
||||
@@ -98,6 +191,24 @@ class QueryCacheManager:
|
||||
"""
|
||||
Set dataframe of query-result to specific cache region
|
||||
"""
|
||||
# Check cache size before adding new entries
|
||||
max_cache_memory = current_app.config.get(
|
||||
"QUERY_CACHE_MAX_MEMORY_MB", self.DEFAULT_MAX_CACHE_MEMORY_MB
|
||||
)
|
||||
|
||||
if key and query_result.df is not None and not query_result.df.empty:
|
||||
new_df_size = self._get_dataframe_memory_mb(query_result.df)
|
||||
current_cache_size = self._get_cache_memory_usage(region)
|
||||
|
||||
if current_cache_size + new_df_size > max_cache_memory:
|
||||
logger.info(
|
||||
f"Cache limit exceeded ({current_cache_size:.0f}MB), "
|
||||
f"evicting entries"
|
||||
)
|
||||
self._evict_largest_cache_entries(
|
||||
region, target_reduction_mb=new_df_size + 100
|
||||
)
|
||||
|
||||
try:
|
||||
self.status = query_result.status
|
||||
self.query = query_result.query
|
||||
|
||||
@@ -626,6 +626,9 @@ DEFAULT_FEATURE_FLAGS: dict[str, bool] = {
|
||||
"DATE_RANGE_TIMESHIFTS_ENABLED": False,
|
||||
# Enable Matrixify feature for matrix-style chart layouts
|
||||
"MATRIXIFY": False,
|
||||
# Memory leak prevention: validate join keys to prevent cartesian product explosions
|
||||
# Disable if time series queries with legitimate duplicate keys are failing
|
||||
"MEMORY_LEAK_JOIN_VALIDATION": True,
|
||||
}
|
||||
|
||||
# ------------------------------
|
||||
@@ -900,6 +903,10 @@ CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "NullCache"}
|
||||
# Cache for datasource metadata and query results
|
||||
DATA_CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "NullCache"}
|
||||
|
||||
# Maximum memory usage (in MB) for DataFrame cache to prevent OOM
|
||||
# Set based on available worker memory (recommended: 25-50% of worker memory)
|
||||
QUERY_CACHE_MAX_MEMORY_MB = 1024 # 1GB default
|
||||
|
||||
# Cache for dashboard filter state. `CACHE_TYPE` defaults to `SupersetMetastoreCache`
|
||||
# that stores the values in the key-value table in the Superset metastore, as it's
|
||||
# required for Superset to operate correctly, but can be replaced by any
|
||||
|
||||
@@ -24,6 +24,7 @@ import pytest
|
||||
from superset.common.chart_data import ChartDataResultFormat
|
||||
from superset.common.query_context_processor import QueryContextProcessor
|
||||
from superset.utils.core import GenericDataType
|
||||
from tests.integration_tests.conftest import with_feature_flags
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -624,3 +625,160 @@ def test_processing_time_offsets_date_range_enabled(processor):
|
||||
assert isinstance(result["df"], pd.DataFrame)
|
||||
assert isinstance(result["queries"], list)
|
||||
assert isinstance(result["cache_keys"], list)
|
||||
|
||||
|
||||
class TestMemoryLeakFixes:
|
||||
"""Test the memory leak fixes in QueryContextProcessor."""
|
||||
|
||||
def test_validate_join_keys_for_memory_safety_with_unique_keys(self):
|
||||
"""Test that validation passes with unique join keys."""
|
||||
processor = QueryContextProcessor(MagicMock())
|
||||
|
||||
# Create DataFrames with unique join keys
|
||||
left_df = pd.DataFrame({"category": ["A", "B", "C"], "value": [1, 2, 3]})
|
||||
right_df = pd.DataFrame(
|
||||
{"category": ["A", "B", "C"], "other_value": [10, 20, 30]}
|
||||
)
|
||||
|
||||
# Should not raise any exception
|
||||
processor._validate_join_keys_for_memory_safety(
|
||||
left_df, right_df, ["category"], "test_offset"
|
||||
)
|
||||
|
||||
def test_validate_join_keys_for_memory_safety_with_duplicate_keys(self):
|
||||
"""Test that validation fails with duplicate join keys."""
|
||||
from superset.exceptions import QueryObjectValidationError
|
||||
|
||||
processor = QueryContextProcessor(MagicMock())
|
||||
|
||||
# Create DataFrames with duplicate join keys (cartesian product risk)
|
||||
left_df = pd.DataFrame(
|
||||
{
|
||||
"category": ["A", "A", "B", "B"], # Duplicates
|
||||
"value": [1, 2, 3, 4],
|
||||
}
|
||||
)
|
||||
right_df = pd.DataFrame(
|
||||
{
|
||||
"category": ["A", "A", "B"], # Duplicates
|
||||
"other_value": [10, 20, 30],
|
||||
}
|
||||
)
|
||||
|
||||
# Should raise QueryObjectValidationError
|
||||
with pytest.raises(QueryObjectValidationError) as exc_info:
|
||||
processor._validate_join_keys_for_memory_safety(
|
||||
left_df, right_df, ["category"], "test_offset"
|
||||
)
|
||||
|
||||
assert "duplicate keys detected" in str(exc_info.value)
|
||||
assert "test_offset" in str(exc_info.value)
|
||||
|
||||
def test_validate_join_keys_for_memory_safety_with_empty_join_keys(self):
|
||||
"""Test that validation passes when no join keys specified."""
|
||||
processor = QueryContextProcessor(MagicMock())
|
||||
|
||||
left_df = pd.DataFrame({"value": [1, 2, 3]})
|
||||
right_df = pd.DataFrame({"other_value": [10, 20, 30]})
|
||||
|
||||
# Should not raise any exception with empty join keys
|
||||
processor._validate_join_keys_for_memory_safety(
|
||||
left_df, right_df, [], "test_offset"
|
||||
)
|
||||
|
||||
def test_validate_join_keys_for_memory_safety_with_no_duplicates_message(self):
|
||||
"""Test error message format for duplicate keys."""
|
||||
from superset.exceptions import QueryObjectValidationError
|
||||
|
||||
processor = QueryContextProcessor(MagicMock())
|
||||
|
||||
left_df = pd.DataFrame(
|
||||
{
|
||||
"category": ["A", "A"], # 1 duplicate
|
||||
"value": [1, 2],
|
||||
}
|
||||
)
|
||||
right_df = pd.DataFrame(
|
||||
{
|
||||
"category": ["B", "B", "B"], # 2 duplicates
|
||||
"other_value": [10, 20, 30],
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(QueryObjectValidationError) as exc_info:
|
||||
processor._validate_join_keys_for_memory_safety(
|
||||
left_df, right_df, ["category"], "weekly_offset"
|
||||
)
|
||||
|
||||
error_msg = str(exc_info.value)
|
||||
assert "weekly_offset" in error_msg
|
||||
assert "Left: 1" in error_msg # 1 duplicate
|
||||
assert "Right: 2" in error_msg # 2 duplicates
|
||||
assert "memory explosion" in error_msg
|
||||
|
||||
@patch("gc.collect")
|
||||
def test_processing_time_offsets_calls_garbage_collection(self, mock_gc_collect):
|
||||
"""Test that garbage collection is called after processing time offsets."""
|
||||
# Create a minimal mock setup
|
||||
mock_query_context = MagicMock()
|
||||
mock_query_context.datasource = MagicMock()
|
||||
|
||||
processor = QueryContextProcessor(mock_query_context)
|
||||
|
||||
# Mock the query object
|
||||
query_object = MagicMock()
|
||||
query_object.time_offsets = [] # Empty to avoid complex mocking
|
||||
|
||||
# Mock external dependencies to focus on GC testing
|
||||
with patch(
|
||||
"superset.common.query_context_processor.get_since_until_from_query_object"
|
||||
) as mock_get_since_until:
|
||||
mock_get_since_until.return_value = ("2024-01-01", "2024-02-01")
|
||||
|
||||
with patch.object(processor, "get_time_grain", return_value="P1D"):
|
||||
with patch(
|
||||
"superset.common.query_context_processor.get_metric_names",
|
||||
return_value=["count"],
|
||||
):
|
||||
# Create test DataFrame
|
||||
test_df = pd.DataFrame({"count": [1, 2, 3]})
|
||||
|
||||
# Call the method
|
||||
result = processor.processing_time_offsets(test_df, query_object)
|
||||
|
||||
# Verify garbage collection was called
|
||||
mock_gc_collect.assert_called_once()
|
||||
|
||||
# Verify result is returned
|
||||
assert result is not None
|
||||
|
||||
@with_feature_flags(MEMORY_LEAK_JOIN_VALIDATION=True)
|
||||
def test_join_validation_with_feature_flag_enabled(self):
|
||||
"""Test that join validation runs when feature flag is enabled."""
|
||||
processor = QueryContextProcessor(MagicMock())
|
||||
|
||||
# Create DataFrames with duplicate keys that should trigger validation
|
||||
left_df = pd.DataFrame({"category": ["A", "A"], "value": [1, 2]})
|
||||
right_df = pd.DataFrame({"category": ["A", "A"], "other_value": [10, 20]})
|
||||
|
||||
# Should call validation and raise error due to duplicates
|
||||
from superset.exceptions import QueryObjectValidationError
|
||||
|
||||
with pytest.raises(QueryObjectValidationError):
|
||||
processor._perform_join(left_df, right_df, ["category"], "test_offset")
|
||||
|
||||
@with_feature_flags(MEMORY_LEAK_JOIN_VALIDATION=False)
|
||||
def test_join_validation_with_feature_flag_disabled(self):
|
||||
"""Test that join validation is skipped when feature flag is disabled."""
|
||||
processor = QueryContextProcessor(MagicMock())
|
||||
|
||||
# Create DataFrames with duplicate keys that would normally trigger validation
|
||||
left_df = pd.DataFrame({"category": ["A", "A"], "value": [1, 2]})
|
||||
right_df = pd.DataFrame({"category": ["A", "A"], "other_value": [10, 20]})
|
||||
|
||||
# Should NOT raise error because validation is disabled
|
||||
# (This will create a cartesian product, but validation is bypassed)
|
||||
result = processor._perform_join(left_df, right_df, ["category"], "test_offset")
|
||||
|
||||
# Verify the join was performed (cartesian product: 2x2 = 4 rows)
|
||||
assert len(result) == 4
|
||||
|
||||
Reference in New Issue
Block a user