mirror of
https://github.com/apache/superset.git
synced 2026-05-05 16:04:19 +00:00
Compare commits
2 Commits
docs/testi
...
memory_lea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
793a075915 | ||
|
|
64d25c85f7 |
@@ -467,13 +467,24 @@ class QueryContextProcessor:
|
|||||||
This method handles both relative time offsets (e.g., "1 week ago") and
|
This method handles both relative time offsets (e.g., "1 week ago") and
|
||||||
absolute date range offsets (e.g., "2015-01-03 : 2015-01-04").
|
absolute date range offsets (e.g., "2015-01-03 : 2015-01-04").
|
||||||
"""
|
"""
|
||||||
|
import gc
|
||||||
|
|
||||||
query_context = self._query_context
|
query_context = self._query_context
|
||||||
# ensure query_object is immutable
|
|
||||||
query_object_clone = copy.copy(query_object)
|
query_object_clone = copy.copy(query_object)
|
||||||
queries: list[str] = []
|
queries: list[str] = []
|
||||||
cache_keys: list[str | None] = []
|
cache_keys: list[str | None] = []
|
||||||
offset_dfs: dict[str, pd.DataFrame] = {}
|
offset_dfs: dict[str, pd.DataFrame] = {}
|
||||||
|
|
||||||
|
# Track memory usage for monitoring
|
||||||
|
initial_memory = None
|
||||||
|
try:
|
||||||
|
import psutil
|
||||||
|
|
||||||
|
process = psutil.Process()
|
||||||
|
initial_memory = process.memory_info().rss / 1024 / 1024
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
outer_from_dttm, outer_to_dttm = get_since_until_from_query_object(query_object)
|
outer_from_dttm, outer_to_dttm = get_since_until_from_query_object(query_object)
|
||||||
if not outer_from_dttm or not outer_to_dttm:
|
if not outer_from_dttm or not outer_to_dttm:
|
||||||
raise QueryObjectValidationError(
|
raise QueryObjectValidationError(
|
||||||
@@ -729,6 +740,21 @@ class QueryContextProcessor:
|
|||||||
join_keys,
|
join_keys,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Memory cleanup: clear DataFrame references and force garbage collection
|
||||||
|
if initial_memory is not None:
|
||||||
|
try:
|
||||||
|
final_memory = process.memory_info().rss / 1024 / 1024
|
||||||
|
memory_growth = final_memory - initial_memory
|
||||||
|
logger.info(
|
||||||
|
f"Time offset processing: {memory_growth:+.1f}MB, "
|
||||||
|
f"{len(offset_dfs)} offsets"
|
||||||
|
)
|
||||||
|
except Exception: # noqa: S110
|
||||||
|
pass
|
||||||
|
|
||||||
|
offset_dfs.clear()
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)
|
return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)
|
||||||
|
|
||||||
def _get_temporal_column_for_filter( # noqa: C901
|
def _get_temporal_column_for_filter( # noqa: C901
|
||||||
@@ -855,10 +881,20 @@ class QueryContextProcessor:
|
|||||||
return offset_df, join_keys
|
return offset_df, join_keys
|
||||||
|
|
||||||
def _perform_join(
|
def _perform_join(
|
||||||
self, df: pd.DataFrame, offset_df: pd.DataFrame, actual_join_keys: list[str]
|
self,
|
||||||
|
df: pd.DataFrame,
|
||||||
|
offset_df: pd.DataFrame,
|
||||||
|
actual_join_keys: list[str],
|
||||||
|
offset_name: str = "unknown",
|
||||||
) -> pd.DataFrame:
|
) -> pd.DataFrame:
|
||||||
"""Perform the appropriate join operation."""
|
"""Perform join with memory safety validation."""
|
||||||
if actual_join_keys:
|
if actual_join_keys:
|
||||||
|
# Validate join keys to prevent cartesian products (if feature enabled)
|
||||||
|
if feature_flag_manager.is_feature_enabled("MEMORY_LEAK_JOIN_VALIDATION"):
|
||||||
|
self._validate_join_keys_for_memory_safety(
|
||||||
|
df, offset_df, actual_join_keys, offset_name
|
||||||
|
)
|
||||||
|
|
||||||
return dataframe_utils.left_join_df(
|
return dataframe_utils.left_join_df(
|
||||||
left_df=df,
|
left_df=df,
|
||||||
right_df=offset_df,
|
right_df=offset_df,
|
||||||
@@ -884,6 +920,34 @@ class QueryContextProcessor:
|
|||||||
)
|
)
|
||||||
return result_df
|
return result_df
|
||||||
|
|
||||||
|
def _validate_join_keys_for_memory_safety(
|
||||||
|
self,
|
||||||
|
left_df: pd.DataFrame,
|
||||||
|
right_df: pd.DataFrame,
|
||||||
|
join_keys: list[str],
|
||||||
|
offset_name: str,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Prevent memory explosions by ensuring unique join keys.
|
||||||
|
|
||||||
|
Time offset joins should have 1:1 key relationships to avoid cartesian products.
|
||||||
|
"""
|
||||||
|
if not join_keys:
|
||||||
|
return
|
||||||
|
|
||||||
|
left_duplicates = left_df[join_keys].duplicated().sum()
|
||||||
|
right_duplicates = right_df[join_keys].duplicated().sum()
|
||||||
|
|
||||||
|
if left_duplicates > 0 or right_duplicates > 0:
|
||||||
|
raise QueryObjectValidationError(
|
||||||
|
_(
|
||||||
|
f"Time offset join failed: duplicate keys detected in "
|
||||||
|
f"'{offset_name}'. Left: {left_duplicates}, "
|
||||||
|
f"Right: {right_duplicates} "
|
||||||
|
f"duplicates. This would cause memory explosion."
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def join_offset_dfs(
|
def join_offset_dfs(
|
||||||
self,
|
self,
|
||||||
df: pd.DataFrame,
|
df: pd.DataFrame,
|
||||||
@@ -925,7 +989,7 @@ class QueryContextProcessor:
|
|||||||
join_column_producer,
|
join_column_producer,
|
||||||
)
|
)
|
||||||
|
|
||||||
df = self._perform_join(df, offset_df, actual_join_keys)
|
df = self._perform_join(df, offset_df, actual_join_keys, offset)
|
||||||
df = self._apply_cleanup_logic(
|
df = self._apply_cleanup_logic(
|
||||||
df, offset, time_grain, join_keys, is_date_range_offset
|
df, offset, time_grain, join_keys, is_date_range_offset
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -46,10 +46,103 @@ class QueryCacheManager:
|
|||||||
Class for manage query-cache getting and setting
|
Class for manage query-cache getting and setting
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Maximum cache memory usage in MB to prevent OOM
|
||||||
|
DEFAULT_MAX_CACHE_MEMORY_MB = 1024
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def stats_logger(self) -> BaseStatsLogger:
|
def stats_logger(self) -> BaseStatsLogger:
|
||||||
return current_app.config["STATS_LOGGER"]
|
return current_app.config["STATS_LOGGER"]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_dataframe_memory_mb(df: DataFrame) -> float:
|
||||||
|
"""Get DataFrame memory usage in MB"""
|
||||||
|
try:
|
||||||
|
return df.memory_usage(deep=True).sum() / 1024 / 1024
|
||||||
|
except Exception:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _get_cache_memory_usage(region: CacheRegion = CacheRegion.DATA) -> float:
|
||||||
|
"""
|
||||||
|
MEMORY LEAK FIX: Calculate total cache memory usage for DataFrames.
|
||||||
|
|
||||||
|
This helps prevent cache from consuming all available memory.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
cache = _cache[region]
|
||||||
|
total_memory = 0.0
|
||||||
|
|
||||||
|
# Iterate through cache and sum DataFrame memory usage
|
||||||
|
for key in cache.cache._cache.keys(): # Access underlying cache dict
|
||||||
|
try:
|
||||||
|
value = cache.get(key)
|
||||||
|
if value and isinstance(value, dict) and "df" in value:
|
||||||
|
df = value["df"]
|
||||||
|
if isinstance(df, DataFrame):
|
||||||
|
total_memory += QueryCacheManager._get_dataframe_memory_mb(
|
||||||
|
df
|
||||||
|
)
|
||||||
|
except Exception: # noqa: S112
|
||||||
|
continue
|
||||||
|
|
||||||
|
return total_memory
|
||||||
|
except Exception as ex:
|
||||||
|
logger.warning(f"Error calculating cache memory usage: {ex}")
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _evict_largest_cache_entries(
|
||||||
|
region: CacheRegion = CacheRegion.DATA, target_reduction_mb: float = 200
|
||||||
|
) -> int:
|
||||||
|
"""
|
||||||
|
MEMORY LEAK FIX: Evict largest cache entries to free memory.
|
||||||
|
|
||||||
|
Returns number of entries evicted.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
cache = _cache[region]
|
||||||
|
entries_with_sizes = []
|
||||||
|
|
||||||
|
# Get all cache entries with their DataFrame sizes
|
||||||
|
for key in list(cache.cache._cache.keys()):
|
||||||
|
try:
|
||||||
|
value = cache.get(key)
|
||||||
|
if value and isinstance(value, dict) and "df" in value:
|
||||||
|
df = value["df"]
|
||||||
|
if isinstance(df, DataFrame):
|
||||||
|
size_mb = QueryCacheManager._get_dataframe_memory_mb(df)
|
||||||
|
entries_with_sizes.append((key, size_mb))
|
||||||
|
except Exception: # noqa: S112
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Sort by size descending (largest first)
|
||||||
|
entries_with_sizes.sort(key=lambda x: x[1], reverse=True)
|
||||||
|
|
||||||
|
# Evict largest entries until we hit target reduction
|
||||||
|
evicted = 0
|
||||||
|
total_freed = 0.0
|
||||||
|
|
||||||
|
for key, size_mb in entries_with_sizes:
|
||||||
|
if total_freed >= target_reduction_mb:
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
cache.delete(key)
|
||||||
|
evicted += 1
|
||||||
|
total_freed += size_mb
|
||||||
|
logger.debug(f"Evicted cache entry {key}: {size_mb:.2f}MB")
|
||||||
|
except Exception: # noqa: S112
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Cache eviction: removed {evicted} entries, freed {total_freed:.2f}MB"
|
||||||
|
)
|
||||||
|
return evicted
|
||||||
|
|
||||||
|
except Exception as ex:
|
||||||
|
logger.error(f"Error during cache eviction: {ex}")
|
||||||
|
return 0
|
||||||
|
|
||||||
# pylint: disable=too-many-instance-attributes,too-many-arguments
|
# pylint: disable=too-many-instance-attributes,too-many-arguments
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -98,6 +191,24 @@ class QueryCacheManager:
|
|||||||
"""
|
"""
|
||||||
Set dataframe of query-result to specific cache region
|
Set dataframe of query-result to specific cache region
|
||||||
"""
|
"""
|
||||||
|
# Check cache size before adding new entries
|
||||||
|
max_cache_memory = current_app.config.get(
|
||||||
|
"QUERY_CACHE_MAX_MEMORY_MB", self.DEFAULT_MAX_CACHE_MEMORY_MB
|
||||||
|
)
|
||||||
|
|
||||||
|
if key and query_result.df is not None and not query_result.df.empty:
|
||||||
|
new_df_size = self._get_dataframe_memory_mb(query_result.df)
|
||||||
|
current_cache_size = self._get_cache_memory_usage(region)
|
||||||
|
|
||||||
|
if current_cache_size + new_df_size > max_cache_memory:
|
||||||
|
logger.info(
|
||||||
|
f"Cache limit exceeded ({current_cache_size:.0f}MB), "
|
||||||
|
f"evicting entries"
|
||||||
|
)
|
||||||
|
self._evict_largest_cache_entries(
|
||||||
|
region, target_reduction_mb=new_df_size + 100
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.status = query_result.status
|
self.status = query_result.status
|
||||||
self.query = query_result.query
|
self.query = query_result.query
|
||||||
|
|||||||
@@ -626,6 +626,9 @@ DEFAULT_FEATURE_FLAGS: dict[str, bool] = {
|
|||||||
"DATE_RANGE_TIMESHIFTS_ENABLED": False,
|
"DATE_RANGE_TIMESHIFTS_ENABLED": False,
|
||||||
# Enable Matrixify feature for matrix-style chart layouts
|
# Enable Matrixify feature for matrix-style chart layouts
|
||||||
"MATRIXIFY": False,
|
"MATRIXIFY": False,
|
||||||
|
# Memory leak prevention: validate join keys to prevent cartesian product explosions
|
||||||
|
# Disable if time series queries with legitimate duplicate keys are failing
|
||||||
|
"MEMORY_LEAK_JOIN_VALIDATION": True,
|
||||||
}
|
}
|
||||||
|
|
||||||
# ------------------------------
|
# ------------------------------
|
||||||
@@ -900,6 +903,10 @@ CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "NullCache"}
|
|||||||
# Cache for datasource metadata and query results
|
# Cache for datasource metadata and query results
|
||||||
DATA_CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "NullCache"}
|
DATA_CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "NullCache"}
|
||||||
|
|
||||||
|
# Maximum memory usage (in MB) for DataFrame cache to prevent OOM
|
||||||
|
# Set based on available worker memory (recommended: 25-50% of worker memory)
|
||||||
|
QUERY_CACHE_MAX_MEMORY_MB = 1024 # 1GB default
|
||||||
|
|
||||||
# Cache for dashboard filter state. `CACHE_TYPE` defaults to `SupersetMetastoreCache`
|
# Cache for dashboard filter state. `CACHE_TYPE` defaults to `SupersetMetastoreCache`
|
||||||
# that stores the values in the key-value table in the Superset metastore, as it's
|
# that stores the values in the key-value table in the Superset metastore, as it's
|
||||||
# required for Superset to operate correctly, but can be replaced by any
|
# required for Superset to operate correctly, but can be replaced by any
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import pytest
|
|||||||
from superset.common.chart_data import ChartDataResultFormat
|
from superset.common.chart_data import ChartDataResultFormat
|
||||||
from superset.common.query_context_processor import QueryContextProcessor
|
from superset.common.query_context_processor import QueryContextProcessor
|
||||||
from superset.utils.core import GenericDataType
|
from superset.utils.core import GenericDataType
|
||||||
|
from tests.integration_tests.conftest import with_feature_flags
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
@@ -624,3 +625,160 @@ def test_processing_time_offsets_date_range_enabled(processor):
|
|||||||
assert isinstance(result["df"], pd.DataFrame)
|
assert isinstance(result["df"], pd.DataFrame)
|
||||||
assert isinstance(result["queries"], list)
|
assert isinstance(result["queries"], list)
|
||||||
assert isinstance(result["cache_keys"], list)
|
assert isinstance(result["cache_keys"], list)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMemoryLeakFixes:
|
||||||
|
"""Test the memory leak fixes in QueryContextProcessor."""
|
||||||
|
|
||||||
|
def test_validate_join_keys_for_memory_safety_with_unique_keys(self):
|
||||||
|
"""Test that validation passes with unique join keys."""
|
||||||
|
processor = QueryContextProcessor(MagicMock())
|
||||||
|
|
||||||
|
# Create DataFrames with unique join keys
|
||||||
|
left_df = pd.DataFrame({"category": ["A", "B", "C"], "value": [1, 2, 3]})
|
||||||
|
right_df = pd.DataFrame(
|
||||||
|
{"category": ["A", "B", "C"], "other_value": [10, 20, 30]}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should not raise any exception
|
||||||
|
processor._validate_join_keys_for_memory_safety(
|
||||||
|
left_df, right_df, ["category"], "test_offset"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_validate_join_keys_for_memory_safety_with_duplicate_keys(self):
|
||||||
|
"""Test that validation fails with duplicate join keys."""
|
||||||
|
from superset.exceptions import QueryObjectValidationError
|
||||||
|
|
||||||
|
processor = QueryContextProcessor(MagicMock())
|
||||||
|
|
||||||
|
# Create DataFrames with duplicate join keys (cartesian product risk)
|
||||||
|
left_df = pd.DataFrame(
|
||||||
|
{
|
||||||
|
"category": ["A", "A", "B", "B"], # Duplicates
|
||||||
|
"value": [1, 2, 3, 4],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
right_df = pd.DataFrame(
|
||||||
|
{
|
||||||
|
"category": ["A", "A", "B"], # Duplicates
|
||||||
|
"other_value": [10, 20, 30],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should raise QueryObjectValidationError
|
||||||
|
with pytest.raises(QueryObjectValidationError) as exc_info:
|
||||||
|
processor._validate_join_keys_for_memory_safety(
|
||||||
|
left_df, right_df, ["category"], "test_offset"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert "duplicate keys detected" in str(exc_info.value)
|
||||||
|
assert "test_offset" in str(exc_info.value)
|
||||||
|
|
||||||
|
def test_validate_join_keys_for_memory_safety_with_empty_join_keys(self):
|
||||||
|
"""Test that validation passes when no join keys specified."""
|
||||||
|
processor = QueryContextProcessor(MagicMock())
|
||||||
|
|
||||||
|
left_df = pd.DataFrame({"value": [1, 2, 3]})
|
||||||
|
right_df = pd.DataFrame({"other_value": [10, 20, 30]})
|
||||||
|
|
||||||
|
# Should not raise any exception with empty join keys
|
||||||
|
processor._validate_join_keys_for_memory_safety(
|
||||||
|
left_df, right_df, [], "test_offset"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_validate_join_keys_for_memory_safety_with_no_duplicates_message(self):
|
||||||
|
"""Test error message format for duplicate keys."""
|
||||||
|
from superset.exceptions import QueryObjectValidationError
|
||||||
|
|
||||||
|
processor = QueryContextProcessor(MagicMock())
|
||||||
|
|
||||||
|
left_df = pd.DataFrame(
|
||||||
|
{
|
||||||
|
"category": ["A", "A"], # 1 duplicate
|
||||||
|
"value": [1, 2],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
right_df = pd.DataFrame(
|
||||||
|
{
|
||||||
|
"category": ["B", "B", "B"], # 2 duplicates
|
||||||
|
"other_value": [10, 20, 30],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(QueryObjectValidationError) as exc_info:
|
||||||
|
processor._validate_join_keys_for_memory_safety(
|
||||||
|
left_df, right_df, ["category"], "weekly_offset"
|
||||||
|
)
|
||||||
|
|
||||||
|
error_msg = str(exc_info.value)
|
||||||
|
assert "weekly_offset" in error_msg
|
||||||
|
assert "Left: 1" in error_msg # 1 duplicate
|
||||||
|
assert "Right: 2" in error_msg # 2 duplicates
|
||||||
|
assert "memory explosion" in error_msg
|
||||||
|
|
||||||
|
@patch("gc.collect")
|
||||||
|
def test_processing_time_offsets_calls_garbage_collection(self, mock_gc_collect):
|
||||||
|
"""Test that garbage collection is called after processing time offsets."""
|
||||||
|
# Create a minimal mock setup
|
||||||
|
mock_query_context = MagicMock()
|
||||||
|
mock_query_context.datasource = MagicMock()
|
||||||
|
|
||||||
|
processor = QueryContextProcessor(mock_query_context)
|
||||||
|
|
||||||
|
# Mock the query object
|
||||||
|
query_object = MagicMock()
|
||||||
|
query_object.time_offsets = [] # Empty to avoid complex mocking
|
||||||
|
|
||||||
|
# Mock external dependencies to focus on GC testing
|
||||||
|
with patch(
|
||||||
|
"superset.common.query_context_processor.get_since_until_from_query_object"
|
||||||
|
) as mock_get_since_until:
|
||||||
|
mock_get_since_until.return_value = ("2024-01-01", "2024-02-01")
|
||||||
|
|
||||||
|
with patch.object(processor, "get_time_grain", return_value="P1D"):
|
||||||
|
with patch(
|
||||||
|
"superset.common.query_context_processor.get_metric_names",
|
||||||
|
return_value=["count"],
|
||||||
|
):
|
||||||
|
# Create test DataFrame
|
||||||
|
test_df = pd.DataFrame({"count": [1, 2, 3]})
|
||||||
|
|
||||||
|
# Call the method
|
||||||
|
result = processor.processing_time_offsets(test_df, query_object)
|
||||||
|
|
||||||
|
# Verify garbage collection was called
|
||||||
|
mock_gc_collect.assert_called_once()
|
||||||
|
|
||||||
|
# Verify result is returned
|
||||||
|
assert result is not None
|
||||||
|
|
||||||
|
@with_feature_flags(MEMORY_LEAK_JOIN_VALIDATION=True)
|
||||||
|
def test_join_validation_with_feature_flag_enabled(self):
|
||||||
|
"""Test that join validation runs when feature flag is enabled."""
|
||||||
|
processor = QueryContextProcessor(MagicMock())
|
||||||
|
|
||||||
|
# Create DataFrames with duplicate keys that should trigger validation
|
||||||
|
left_df = pd.DataFrame({"category": ["A", "A"], "value": [1, 2]})
|
||||||
|
right_df = pd.DataFrame({"category": ["A", "A"], "other_value": [10, 20]})
|
||||||
|
|
||||||
|
# Should call validation and raise error due to duplicates
|
||||||
|
from superset.exceptions import QueryObjectValidationError
|
||||||
|
|
||||||
|
with pytest.raises(QueryObjectValidationError):
|
||||||
|
processor._perform_join(left_df, right_df, ["category"], "test_offset")
|
||||||
|
|
||||||
|
@with_feature_flags(MEMORY_LEAK_JOIN_VALIDATION=False)
|
||||||
|
def test_join_validation_with_feature_flag_disabled(self):
|
||||||
|
"""Test that join validation is skipped when feature flag is disabled."""
|
||||||
|
processor = QueryContextProcessor(MagicMock())
|
||||||
|
|
||||||
|
# Create DataFrames with duplicate keys that would normally trigger validation
|
||||||
|
left_df = pd.DataFrame({"category": ["A", "A"], "value": [1, 2]})
|
||||||
|
right_df = pd.DataFrame({"category": ["A", "A"], "other_value": [10, 20]})
|
||||||
|
|
||||||
|
# Should NOT raise error because validation is disabled
|
||||||
|
# (This will create a cartesian product, but validation is bypassed)
|
||||||
|
result = processor._perform_join(left_df, right_df, ["category"], "test_offset")
|
||||||
|
|
||||||
|
# Verify the join was performed (cartesian product: 2x2 = 4 rows)
|
||||||
|
assert len(result) == 4
|
||||||
|
|||||||
Reference in New Issue
Block a user