diff --git a/superset-frontend/packages/superset-ui-core/src/query/types/QueryResponse.ts b/superset-frontend/packages/superset-ui-core/src/query/types/QueryResponse.ts index 210685f3480..23c13d3c7c1 100644 --- a/superset-frontend/packages/superset-ui-core/src/query/types/QueryResponse.ts +++ b/superset-frontend/packages/superset-ui-core/src/query/types/QueryResponse.ts @@ -77,6 +77,7 @@ export interface ChartDataResponseResult { // TODO(hainenber): define proper type for below attributes rejected_filters?: any[]; applied_filters?: any[]; + warning?: string | null; /** * Detected ISO 4217 currency code when AUTO mode is used. * Returns the currency code if all filtered data contains a single currency, diff --git a/superset-frontend/src/components/Chart/chartAction.ts b/superset-frontend/src/components/Chart/chartAction.ts index 40d8ed79b16..039c3f14068 100644 --- a/superset-frontend/src/components/Chart/chartAction.ts +++ b/superset-frontend/src/components/Chart/chartAction.ts @@ -42,7 +42,10 @@ import { getQuerySettings, getChartDataUri, } from 'src/explore/exploreUtils'; -import { addDangerToast } from 'src/components/MessageToasts/actions'; +import { + addDangerToast, + addWarningToast, +} from 'src/components/MessageToasts/actions'; import { logEvent } from 'src/logger/actions'; import { Logger, LOG_ACTIONS_LOAD_CHART } from 'src/logger/LogUtils'; import { allowCrossDomain as domainShardingEnabled } from 'src/utils/hostNamesConfig'; @@ -805,6 +808,11 @@ export function exploreJSON( }), ), ); + (queriesResponse as QueryData[]).forEach(response => { + if (response.warning) { + dispatch(addWarningToast(response.warning)); + } + }); return dispatch( chartUpdateSucceeded(queriesResponse as QueryData[], key as number), ); diff --git a/superset/common/query_context_processor.py b/superset/common/query_context_processor.py index 52fc6d24f28..0f05c3649d9 100644 --- a/superset/common/query_context_processor.py +++ b/superset/common/query_context_processor.py @@ -21,7 +21,7 @@ import re from typing import Any, cast, ClassVar, Sequence, TYPE_CHECKING import pandas as pd -from flask import current_app +from flask import current_app, g from flask_babel import gettext as _ from superset.common.chart_data import ChartDataResultFormat @@ -190,6 +190,16 @@ class QueryContextProcessor: ) cache.df.columns = [unescape_separator(col) for col in cache.df.columns.values] + warning: str | None = None + if getattr(g, "bq_memory_limited", False): + row_count = getattr(g, "bq_memory_limited_row_count", len(cache.df)) + chart_id = (self._query_context.form_data or {}).get("slice_id", "") + prefix = f"Chart {chart_id}: " if chart_id else "" + warning = ( + f"{prefix}Results truncated to {row_count:,} rows" + " due to memory constraints." + ) + return { "cache_key": cache_key, "cached_dttm": cache.cache_dttm, @@ -210,6 +220,7 @@ class QueryContextProcessor: "from_dttm": query_obj.from_dttm, "to_dttm": query_obj.to_dttm, "label_map": label_map, + "warning": warning, } def query_cache_key(self, query_obj: QueryObject, **kwargs: Any) -> str | None: diff --git a/superset/config.py b/superset/config.py index 2216c325b37..0e0ef531428 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1442,6 +1442,9 @@ DEFAULT_DB_ID = None # Timeout duration for SQL Lab synchronous queries SQLLAB_TIMEOUT = int(timedelta(seconds=30).total_seconds()) +# BigQuery max fetch size in MB (limits memory usage when fetching large results) +BQ_FETCH_MAX_MB = 200 + # Timeout duration for SQL Lab query validation SQLLAB_VALIDATION_TIMEOUT = int(timedelta(seconds=10).total_seconds()) diff --git a/superset/db_engine_specs/bigquery.py b/superset/db_engine_specs/bigquery.py index de13b1f2666..1bf91eb205c 100644 --- a/superset/db_engine_specs/bigquery.py +++ b/superset/db_engine_specs/bigquery.py @@ -19,6 +19,7 @@ from __future__ import annotations import logging import re +import sys import urllib from datetime import datetime from re import Pattern @@ -27,6 +28,7 @@ from typing import Any, TYPE_CHECKING, TypedDict import pandas as pd from apispec import APISpec from apispec.ext.marshmallow import MarshmallowPlugin +from flask import current_app, g from flask_babel import gettext as __ from marshmallow import fields, Schema from marshmallow.exceptions import ValidationError @@ -304,12 +306,72 @@ class BigQueryEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-met @classmethod def fetch_data(cls, cursor: Any, limit: int | None = None) -> list[tuple[Any, ...]]: - data = super().fetch_data(cursor, limit) - # Support type BigQuery Row, introduced here PR #4071 - # google.cloud.bigquery.table.Row - if data and type(data[0]).__name__ == "Row": - data = [r.values() for r in data] # type: ignore - return data + """ + Progressive fetch for BigQuery to prevent browser memory overload. + + Samples a first batch to estimate row size, then extrapolates the + total number of rows that fit within ``BQ_FETCH_MAX_MB``. + Falls back to the parent implementation on any error. + """ + max_mb: int = ( + current_app.config.get("BQ_FETCH_MAX_MB", 200) if current_app else 200 + ) + max_bytes = max_mb * 1024 * 1024 + + try: + initial_batch_size = min(1000, limit) if limit else 1000 + first_batch: list[Any] = cursor.fetchmany(initial_batch_size) + + if not first_batch: + g.bq_memory_limited = False + g.bq_memory_limited_row_count = 0 + return [] + + # Support BigQuery Row objects (PR #4071) + if type(first_batch[0]).__name__ == "Row": + first_batch = [r.values() for r in first_batch] + + # Estimate how many rows fit in the memory budget + first_batch_bytes = sys.getsizeof(str(first_batch)) + rows_fetched = len(first_batch) + avg_bytes_per_row = first_batch_bytes / rows_fetched + total_rows_for_target = int(max_bytes / avg_bytes_per_row) + + if limit: + total_rows_for_target = min(limit, total_rows_for_target) + + remaining_rows = total_rows_for_target - rows_fetched + + # First batch already covers the budget or the result set + if rows_fetched < initial_batch_size or remaining_rows <= 0: + memory_limited = ( + remaining_rows <= 0 and rows_fetched == initial_batch_size + ) + g.bq_memory_limited = memory_limited + g.bq_memory_limited_row_count = len(first_batch) + return first_batch + + # Fetch the rest up to the budget + second_batch: list[Any] = cursor.fetchmany(remaining_rows) or [] + if second_batch and type(second_batch[0]).__name__ == "Row": + second_batch = [r.values() for r in second_batch] + + data = first_batch + second_batch + + # If we received exactly what we asked for, more rows may exist + memory_limited = len(second_batch) == remaining_rows + g.bq_memory_limited = memory_limited + g.bq_memory_limited_row_count = len(data) + return data + + except Exception: # pylint: disable=broad-except + # Fallback to parent implementation + data = super().fetch_data(cursor, limit) + if data and type(data[0]).__name__ == "Row": + data = [r.values() for r in data] # type: ignore + g.bq_memory_limited = False + g.bq_memory_limited_row_count = len(data) if data else 0 + return data @staticmethod def _mutate_label(label: str) -> str: diff --git a/tests/unit_tests/common/test_query_context_processor.py b/tests/unit_tests/common/test_query_context_processor.py index 3bf558072b1..88e7fc83867 100644 --- a/tests/unit_tests/common/test_query_context_processor.py +++ b/tests/unit_tests/common/test_query_context_processor.py @@ -1379,3 +1379,125 @@ def test_get_df_payload_invalidates_cache_missing_applied_filter_columns(): assert mock_cache.is_loaded is False, ( "Cache should be inv when no applied_filter_columns and query has filters" ) + + +def test_get_df_payload_bq_memory_limited_warning(): + """ + Test that get_df_payload includes a warning when BigQuery results are + truncated due to the memory limit (g.bq_memory_limited is set). + """ + from superset.common.query_object import QueryObject + + mock_query_context = MagicMock() + mock_query_context.force = False + mock_query_context.form_data = {"slice_id": 42} + + mock_datasource = MagicMock() + mock_datasource.column_names = ["col1"] + mock_datasource.uid = "test_ds" + mock_datasource.cache_timeout = None + mock_datasource.changed_on = None + mock_datasource.get_extra_cache_keys.return_value = [] + mock_datasource.data = MagicMock() + mock_datasource.data.get.return_value = {} + + processor = QueryContextProcessor(mock_query_context) + processor._qc_datasource = mock_datasource + + query_obj = QueryObject( + datasource=mock_datasource, + columns=["col1"], + ) + + with patch( + "superset.common.query_context_processor.QueryCacheManager" + ) as mock_cache_manager: + mock_cache = MagicMock() + mock_cache.is_loaded = True + mock_cache.df = pd.DataFrame({"col1": [1, 2, 3]}) + mock_cache.query = "SELECT col1 FROM table" + mock_cache.error_message = None + mock_cache.status = "success" + mock_cache.applied_filter_columns = ["col1"] + mock_cache.applied_template_filters = [] + mock_cache.rejected_filter_columns = [] + mock_cache.annotation_data = {} + mock_cache.is_cached = True + mock_cache.sql_rowcount = 3 + mock_cache.cache_dttm = "2024-01-01T00:00:00" + mock_cache.queried_dttm = "2024-01-01T00:00:00" + mock_cache_manager.get.return_value = mock_cache + + with patch.object(query_obj, "validate", return_value=None): + with patch.object(processor, "query_cache_key", return_value="key"): + with patch.object(processor, "get_cache_timeout", return_value=3600): + # Simulate BigQuery memory-limited flag being set on Flask g + with patch("superset.common.query_context_processor.g") as mock_g: + mock_g.bq_memory_limited = True + mock_g.bq_memory_limited_row_count = 5000 + + result = processor.get_df_payload(query_obj, force_cached=False) + + assert result["warning"] is not None + assert "Chart 42" in result["warning"] + assert "5,000 rows" in result["warning"] + assert "memory constraints" in result["warning"] + + +def test_get_df_payload_no_warning_when_not_memory_limited(): + """ + Test that get_df_payload does not include a warning when BigQuery + results were not truncated. + """ + from superset.common.query_object import QueryObject + + mock_query_context = MagicMock() + mock_query_context.force = False + mock_query_context.form_data = {} + + mock_datasource = MagicMock() + mock_datasource.column_names = ["col1"] + mock_datasource.uid = "test_ds" + mock_datasource.cache_timeout = None + mock_datasource.changed_on = None + mock_datasource.get_extra_cache_keys.return_value = [] + mock_datasource.data = MagicMock() + mock_datasource.data.get.return_value = {} + + processor = QueryContextProcessor(mock_query_context) + processor._qc_datasource = mock_datasource + + query_obj = QueryObject( + datasource=mock_datasource, + columns=["col1"], + ) + + with patch( + "superset.common.query_context_processor.QueryCacheManager" + ) as mock_cache_manager: + mock_cache = MagicMock() + mock_cache.is_loaded = True + mock_cache.df = pd.DataFrame({"col1": [1, 2]}) + mock_cache.query = "SELECT col1 FROM table" + mock_cache.error_message = None + mock_cache.status = "success" + mock_cache.applied_filter_columns = ["col1"] + mock_cache.applied_template_filters = [] + mock_cache.rejected_filter_columns = [] + mock_cache.annotation_data = {} + mock_cache.is_cached = True + mock_cache.sql_rowcount = 2 + mock_cache.cache_dttm = "2024-01-01T00:00:00" + mock_cache.queried_dttm = "2024-01-01T00:00:00" + mock_cache_manager.get.return_value = mock_cache + + with patch.object(query_obj, "validate", return_value=None): + with patch.object(processor, "query_cache_key", return_value="key"): + with patch.object(processor, "get_cache_timeout", return_value=3600): + # g.bq_memory_limited is not set (default) + with patch("superset.common.query_context_processor.g") as mock_g: + mock_g.bq_memory_limited = False + + result = processor.get_df_payload(query_obj, force_cached=False) + + assert result["warning"] is None diff --git a/tests/unit_tests/db_engine_specs/test_bigquery.py b/tests/unit_tests/db_engine_specs/test_bigquery.py index a7231508e8e..a8695008282 100644 --- a/tests/unit_tests/db_engine_specs/test_bigquery.py +++ b/tests/unit_tests/db_engine_specs/test_bigquery.py @@ -18,7 +18,7 @@ # pylint: disable=line-too-long, import-outside-toplevel, protected-access, invalid-name from datetime import datetime -from typing import Optional +from typing import Any, Optional from unittest import mock import pytest @@ -528,3 +528,133 @@ def test_get_view_names_excludes_materialized_views() -> None: assert "table_type = 'VIEW'" in executed_query # Ensure it's not querying for materialized views assert "MATERIALIZED VIEW" not in executed_query + + +def _patch_bq_fetch_deps( + mocker: MockerFixture, max_mb: int = 200 +) -> tuple[mock.MagicMock, mock.MagicMock]: + """Helper to patch Flask g and current_app for BigQuery fetch_data tests.""" + flask_g = mocker.patch("superset.db_engine_specs.bigquery.g") + app = mocker.patch("superset.db_engine_specs.bigquery.current_app") + # Make current_app truthy and .config.get() return a plain int + app.__bool__ = mock.Mock(return_value=True) + app.config = mock.MagicMock() + app.config.get = mock.Mock(return_value=max_mb) + return flask_g, app + + +def test_fetch_data_within_memory_limit(mocker: MockerFixture) -> None: + """ + Test that fetch_data returns all rows when the result fits within the + configured memory limit. + """ + from superset.db_engine_specs.bigquery import BigQueryEngineSpec + + rows = [(1, "a"), (2, "b"), (3, "c")] + + cursor = mock.MagicMock() + # First fetchmany returns all rows; the result set is smaller than limit + cursor.fetchmany.return_value = rows + + flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=200) + + result = BigQueryEngineSpec.fetch_data(cursor, limit=100) + + assert result == rows + assert flask_g.bq_memory_limited is False + assert flask_g.bq_memory_limited_row_count == 3 + + +def test_fetch_data_truncated_by_memory_limit(mocker: MockerFixture) -> None: + """ + Test that fetch_data truncates results and sets the memory_limited flag + when the memory budget is exceeded. + + We use a very small budget (1 MB) so that after the first batch the + method computes ``remaining_rows <= 0``, hitting the truncation path. + """ + from superset.db_engine_specs.bigquery import BigQueryEngineSpec + + # 1000 rows of ~10KB each --> first batch ~10 MB >> 1 MB budget + first_batch = [(i, "x" * 10_000) for i in range(1000)] + + cursor = mock.MagicMock() + cursor.fetchmany.return_value = first_batch + + # 1 MB budget: first batch exceeds it, so remaining_rows <= 0 + flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=1) + + result = BigQueryEngineSpec.fetch_data(cursor, limit=None) + + assert result == first_batch + assert flask_g.bq_memory_limited is True + assert flask_g.bq_memory_limited_row_count == len(first_batch) + + +def test_fetch_data_empty_result(mocker: MockerFixture) -> None: + """ + Test that fetch_data handles an empty result set gracefully. + """ + from superset.db_engine_specs.bigquery import BigQueryEngineSpec + + cursor = mock.MagicMock() + cursor.fetchmany.return_value = [] + + flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=200) + + result = BigQueryEngineSpec.fetch_data(cursor, limit=100) + + assert result == [] + assert flask_g.bq_memory_limited is False + assert flask_g.bq_memory_limited_row_count == 0 + + +def test_fetch_data_fallback_on_exception(mocker: MockerFixture) -> None: + """ + Test that fetch_data falls back to the parent implementation when the + progressive fetch raises an exception. + """ + from superset.db_engine_specs.bigquery import BigQueryEngineSpec + + cursor = mock.MagicMock() + cursor.fetchmany.side_effect = RuntimeError("cursor error") + cursor.fetchall.return_value = [(1, "a"), (2, "b")] + cursor.description = [("col1", None), ("col2", None)] + + flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=200) + + result = BigQueryEngineSpec.fetch_data(cursor, limit=None) + + assert result == [(1, "a"), (2, "b")] + assert flask_g.bq_memory_limited is False + assert flask_g.bq_memory_limited_row_count == 2 + + +def test_fetch_data_converts_bigquery_row_objects(mocker: MockerFixture) -> None: + """ + Test that BigQuery Row objects are converted to plain values. + """ + from superset.db_engine_specs.bigquery import BigQueryEngineSpec + + class FakeRow: + """Mimics google.cloud.bigquery.table.Row""" + + def __init__(self, vals: tuple[Any, ...]) -> None: + self._vals = vals + + def values(self) -> tuple[Any, ...]: + return self._vals + + FakeRow.__name__ = "Row" + + rows = [FakeRow((1, "a")), FakeRow((2, "b"))] + + cursor = mock.MagicMock() + cursor.fetchmany.return_value = rows + + flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=200) + + result = BigQueryEngineSpec.fetch_data(cursor, limit=100) + + assert result == [(1, "a"), (2, "b")] + assert flask_g.bq_memory_limited is False