mirror of
https://github.com/apache/superset.git
synced 2026-05-14 20:35:23 +00:00
Compare commits
7 Commits
fix/mcp-ex
...
adopt-pr-3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3601013a78 | ||
|
|
05a9b47f1a | ||
|
|
889f23e412 | ||
|
|
bbbd1490ef | ||
|
|
1894b77b44 | ||
|
|
ffe7addcd2 | ||
|
|
1773531807 |
@@ -77,6 +77,7 @@ export interface ChartDataResponseResult {
|
||||
// TODO(hainenber): define proper type for below attributes
|
||||
rejected_filters?: any[];
|
||||
applied_filters?: any[];
|
||||
warning?: string | null;
|
||||
/**
|
||||
* Detected ISO 4217 currency code when AUTO mode is used.
|
||||
* Returns the currency code if all filtered data contains a single currency,
|
||||
|
||||
@@ -42,7 +42,10 @@ import {
|
||||
getQuerySettings,
|
||||
getChartDataUri,
|
||||
} from 'src/explore/exploreUtils';
|
||||
import { addDangerToast } from 'src/components/MessageToasts/actions';
|
||||
import {
|
||||
addDangerToast,
|
||||
addWarningToast,
|
||||
} from 'src/components/MessageToasts/actions';
|
||||
import { logEvent } from 'src/logger/actions';
|
||||
import { Logger, LOG_ACTIONS_LOAD_CHART } from 'src/logger/LogUtils';
|
||||
import { allowCrossDomain as domainShardingEnabled } from 'src/utils/hostNamesConfig';
|
||||
@@ -805,6 +808,11 @@ export function exploreJSON(
|
||||
}),
|
||||
),
|
||||
);
|
||||
(queriesResponse as QueryData[]).forEach(response => {
|
||||
if (response.warning) {
|
||||
dispatch(addWarningToast(response.warning, { noDuplicate: true }));
|
||||
}
|
||||
});
|
||||
return dispatch(
|
||||
chartUpdateSucceeded(queriesResponse as QueryData[], key as number),
|
||||
);
|
||||
|
||||
@@ -31,6 +31,7 @@ import {
|
||||
AnnotationSourceType,
|
||||
AnnotationStyle,
|
||||
} from '@superset-ui/core';
|
||||
import * as toastActions from 'src/components/MessageToasts/actions';
|
||||
import { LOG_EVENT } from 'src/logger/actions';
|
||||
import * as exploreUtils from 'src/explore/exploreUtils';
|
||||
import * as actions from 'src/components/Chart/chartAction';
|
||||
@@ -340,6 +341,56 @@ describe('chart actions', () => {
|
||||
);
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
test('dispatches addWarningToast when a query response includes a warning', async () => {
|
||||
const warningMessage =
|
||||
'Results truncated to 1,000 rows due to memory constraints.';
|
||||
fetchMock.removeRoute(MOCK_URL);
|
||||
fetchMock.post(
|
||||
`glob:*${MOCK_URL}*`,
|
||||
{ result: [{ warning: warningMessage }] },
|
||||
{ name: MOCK_URL },
|
||||
);
|
||||
const addWarningToastSpy = jest.spyOn(toastActions, 'addWarningToast');
|
||||
|
||||
const actionThunk = actions.postChartFormData(
|
||||
{ viz_type: 'my_viz' } as QueryFormData,
|
||||
false,
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
await actionThunk(
|
||||
dispatch as unknown as actions.ChartThunkDispatch,
|
||||
mockGetState as unknown as () => actions.RootState,
|
||||
undefined,
|
||||
);
|
||||
|
||||
expect(addWarningToastSpy).toHaveBeenCalledWith(warningMessage, {
|
||||
noDuplicate: true,
|
||||
});
|
||||
addWarningToastSpy.mockRestore();
|
||||
fetchMock.removeRoute(MOCK_URL);
|
||||
setupDefaultFetchMock();
|
||||
});
|
||||
|
||||
test('does not dispatch addWarningToast when no query response has a warning', async () => {
|
||||
const addWarningToastSpy = jest.spyOn(toastActions, 'addWarningToast');
|
||||
|
||||
const actionThunk = actions.postChartFormData(
|
||||
{ viz_type: 'my_viz' } as QueryFormData,
|
||||
false,
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
await actionThunk(
|
||||
dispatch as unknown as actions.ChartThunkDispatch,
|
||||
mockGetState as unknown as () => actions.RootState,
|
||||
undefined,
|
||||
);
|
||||
|
||||
expect(addWarningToastSpy).not.toHaveBeenCalled();
|
||||
addWarningToastSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
// eslint-disable-next-line no-restricted-globals -- TODO: Migrate from describe blocks
|
||||
|
||||
@@ -1561,6 +1561,10 @@ class ChartDataResponseResult(Schema):
|
||||
required=False,
|
||||
allow_none=True,
|
||||
)
|
||||
warning = fields.String(
|
||||
metadata={"description": "Warning message when results were truncated"},
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
|
||||
class DashboardFilterInfoSchema(Schema):
|
||||
|
||||
@@ -190,6 +190,18 @@ class QueryContextProcessor:
|
||||
)
|
||||
cache.df.columns = [unescape_separator(col) for col in cache.df.columns.values]
|
||||
|
||||
warning: str | None = None
|
||||
if cache.bq_memory_limited:
|
||||
row_count = cache.bq_memory_limited_row_count
|
||||
chart_id = (self._query_context.form_data or {}).get("slice_id", "")
|
||||
prefix = f"Chart {chart_id}: " if chart_id else ""
|
||||
warning = _(
|
||||
"%(prefix)sResults truncated to %(row_count)s rows"
|
||||
" due to memory constraints.",
|
||||
prefix=prefix,
|
||||
row_count=f"{row_count:,}",
|
||||
)
|
||||
|
||||
return {
|
||||
"cache_key": cache_key,
|
||||
"cached_dttm": cache.cache_dttm,
|
||||
@@ -210,6 +222,7 @@ class QueryContextProcessor:
|
||||
"from_dttm": query_obj.from_dttm,
|
||||
"to_dttm": query_obj.to_dttm,
|
||||
"label_map": label_map,
|
||||
"warning": warning,
|
||||
}
|
||||
|
||||
def query_cache_key(self, query_obj: QueryObject, **kwargs: Any) -> str | None:
|
||||
|
||||
@@ -20,7 +20,7 @@ import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from flask import current_app
|
||||
from flask import current_app, g, has_request_context
|
||||
from flask_caching import Cache
|
||||
from pandas import DataFrame
|
||||
|
||||
@@ -86,6 +86,8 @@ class QueryCacheManager:
|
||||
self.cache_value = cache_value
|
||||
self.sql_rowcount = sql_rowcount
|
||||
self.queried_dttm = queried_dttm
|
||||
self.bq_memory_limited: bool = False
|
||||
self.bq_memory_limited_row_count: int = 0
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def set_query_result(
|
||||
@@ -123,6 +125,15 @@ class QueryCacheManager:
|
||||
)
|
||||
self.is_loaded = True
|
||||
|
||||
# Capture BigQuery memory-limit flag so it survives cache hits
|
||||
if has_request_context():
|
||||
self.bq_memory_limited = getattr(g, "bq_memory_limited", False)
|
||||
self.bq_memory_limited_row_count = getattr(
|
||||
g, "bq_memory_limited_row_count", 0
|
||||
)
|
||||
g.bq_memory_limited = False
|
||||
g.bq_memory_limited_row_count = 0
|
||||
|
||||
value = {
|
||||
"df": self.df,
|
||||
"query": self.query,
|
||||
@@ -133,6 +144,8 @@ class QueryCacheManager:
|
||||
"sql_rowcount": self.sql_rowcount,
|
||||
"queried_dttm": self.queried_dttm,
|
||||
"dttm": self.queried_dttm, # Backwards compatibility
|
||||
"bq_memory_limited": self.bq_memory_limited,
|
||||
"bq_memory_limited_row_count": self.bq_memory_limited_row_count,
|
||||
}
|
||||
if self.is_loaded and key and self.status != QueryStatus.FAILED:
|
||||
self.set(
|
||||
@@ -193,6 +206,12 @@ class QueryCacheManager:
|
||||
"queried_dttm", cache_value.get("dttm")
|
||||
)
|
||||
query_cache.cache_value = cache_value
|
||||
query_cache.bq_memory_limited = cache_value.get(
|
||||
"bq_memory_limited", False
|
||||
)
|
||||
query_cache.bq_memory_limited_row_count = cache_value.get(
|
||||
"bq_memory_limited_row_count", 0
|
||||
)
|
||||
current_app.config["STATS_LOGGER"].incr("loaded_from_cache")
|
||||
except KeyError as ex:
|
||||
logger.exception(ex)
|
||||
|
||||
@@ -1447,6 +1447,9 @@ DEFAULT_DB_ID = None
|
||||
# Timeout duration for SQL Lab synchronous queries
|
||||
SQLLAB_TIMEOUT = int(timedelta(seconds=30).total_seconds())
|
||||
|
||||
# BigQuery max fetch size in MB (limits memory usage when fetching large results)
|
||||
BQ_FETCH_MAX_MB = 200
|
||||
|
||||
# Timeout duration for SQL Lab query validation
|
||||
SQLLAB_VALIDATION_TIMEOUT = int(timedelta(seconds=10).total_seconds())
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import urllib
|
||||
from datetime import datetime
|
||||
from re import Pattern
|
||||
@@ -27,6 +28,7 @@ from typing import Any, TYPE_CHECKING, TypedDict
|
||||
import pandas as pd
|
||||
from apispec import APISpec
|
||||
from apispec.ext.marshmallow import MarshmallowPlugin
|
||||
from flask import current_app, g, has_app_context, has_request_context
|
||||
from flask_babel import gettext as __
|
||||
from marshmallow import fields, Schema
|
||||
from marshmallow.exceptions import ValidationError
|
||||
@@ -303,13 +305,86 @@ class BigQueryEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-met
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def fetch_data(cls, cursor: Any, limit: int | None = None) -> list[tuple[Any, ...]]:
|
||||
data = super().fetch_data(cursor, limit)
|
||||
# Support type BigQuery Row, introduced here PR #4071
|
||||
# google.cloud.bigquery.table.Row
|
||||
if data and type(data[0]).__name__ == "Row":
|
||||
data = [r.values() for r in data] # type: ignore
|
||||
return data
|
||||
def fetch_data(cls, cursor: Any, limit: int | None = None) -> list[tuple[Any, ...]]: # noqa: C901
|
||||
"""
|
||||
Progressive fetch for BigQuery to prevent browser memory overload.
|
||||
|
||||
Samples a first batch to estimate row size, then extrapolates the
|
||||
total number of rows that fit within ``BQ_FETCH_MAX_MB``.
|
||||
Falls back to the parent implementation on any error.
|
||||
"""
|
||||
max_mb: int = (
|
||||
current_app.config.get("BQ_FETCH_MAX_MB", 200) if has_app_context() else 200
|
||||
)
|
||||
max_bytes = max_mb * 1024 * 1024
|
||||
|
||||
try:
|
||||
initial_batch_size = min(1000, limit) if limit else 1000
|
||||
first_batch: list[Any] = cursor.fetchmany(initial_batch_size)
|
||||
|
||||
if not first_batch:
|
||||
if has_request_context():
|
||||
g.bq_memory_limited = False
|
||||
g.bq_memory_limited_row_count = 0
|
||||
return []
|
||||
|
||||
# Support BigQuery Row objects (PR #4071)
|
||||
if type(first_batch[0]).__name__ == "Row":
|
||||
first_batch = [r.values() for r in first_batch]
|
||||
|
||||
# Estimate how many rows fit in the memory budget.
|
||||
# Sum container + element sizes (one level deep) for a better
|
||||
# estimate. Most BigQuery cell values are primitives (str, int,
|
||||
# float, date), so one level captures the dominant allocation.
|
||||
first_batch_bytes = sum(
|
||||
sys.getsizeof(row) + sum(sys.getsizeof(v) for v in row)
|
||||
for row in first_batch
|
||||
)
|
||||
rows_fetched = len(first_batch)
|
||||
avg_bytes_per_row = first_batch_bytes / rows_fetched
|
||||
total_rows_for_target = int(max_bytes / avg_bytes_per_row)
|
||||
|
||||
if limit:
|
||||
total_rows_for_target = min(limit, total_rows_for_target)
|
||||
|
||||
remaining_rows = total_rows_for_target - rows_fetched
|
||||
|
||||
# First batch already covers the budget or the result set
|
||||
if rows_fetched < initial_batch_size or remaining_rows <= 0:
|
||||
memory_limited = (
|
||||
remaining_rows <= 0 and rows_fetched == initial_batch_size
|
||||
)
|
||||
if has_request_context():
|
||||
g.bq_memory_limited = memory_limited
|
||||
g.bq_memory_limited_row_count = len(first_batch)
|
||||
return first_batch
|
||||
|
||||
# Fetch one extra row to confirm truncation without false positives
|
||||
second_batch: list[Any] = cursor.fetchmany(remaining_rows + 1) or []
|
||||
if second_batch and type(second_batch[0]).__name__ == "Row":
|
||||
second_batch = [r.values() for r in second_batch]
|
||||
|
||||
# Truncation is confirmed only when more rows exist beyond the budget
|
||||
memory_limited = len(second_batch) > remaining_rows
|
||||
if memory_limited:
|
||||
second_batch = second_batch[:remaining_rows]
|
||||
|
||||
data = first_batch + second_batch
|
||||
|
||||
if has_request_context():
|
||||
g.bq_memory_limited = memory_limited
|
||||
g.bq_memory_limited_row_count = len(data)
|
||||
return data
|
||||
|
||||
except Exception: # pylint: disable=broad-except
|
||||
# Fallback to parent implementation
|
||||
data = super().fetch_data(cursor, limit)
|
||||
if data and type(data[0]).__name__ == "Row":
|
||||
data = [r.values() for r in data] # type: ignore
|
||||
if has_request_context():
|
||||
g.bq_memory_limited = False
|
||||
g.bq_memory_limited_row_count = len(data) if data else 0
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def _mutate_label(label: str) -> str:
|
||||
|
||||
@@ -928,6 +928,7 @@ def test_get_df_payload_validates_before_cache_key_generation():
|
||||
mock_cache.query = "SELECT * FROM table"
|
||||
mock_cache.error_message = None
|
||||
mock_cache.status = "success"
|
||||
mock_cache.bq_memory_limited = False
|
||||
mock_cache_manager.get.return_value = mock_cache
|
||||
|
||||
# Call get_df_payload
|
||||
@@ -1303,6 +1304,7 @@ def test_force_cached_normalizes_totals_query_row_limit():
|
||||
cache.is_cached = True
|
||||
cache.sql_rowcount = len(df)
|
||||
cache.cache_dttm = "2024-01-01T00:00:00"
|
||||
cache.bq_memory_limited = False
|
||||
return cache
|
||||
|
||||
mock_cache_manager.get.side_effect = cache_get
|
||||
@@ -1359,6 +1361,8 @@ def test_get_df_payload_invalidates_cache_missing_applied_filter_columns():
|
||||
self.applied_template_filters = []
|
||||
self.rejected_filter_columns = []
|
||||
self.annotation_data = {}
|
||||
self.bq_memory_limited = False
|
||||
self.bq_memory_limited_row_count = 0
|
||||
self.set_query_result = MagicMock()
|
||||
|
||||
mock_cache = MockCache()
|
||||
@@ -1379,3 +1383,119 @@ def test_get_df_payload_invalidates_cache_missing_applied_filter_columns():
|
||||
assert mock_cache.is_loaded is False, (
|
||||
"Cache should be inv when no applied_filter_columns and query has filters"
|
||||
)
|
||||
|
||||
|
||||
def test_get_df_payload_bq_memory_limited_warning():
|
||||
"""
|
||||
Test that get_df_payload includes a warning when BigQuery results are
|
||||
truncated due to the memory limit (g.bq_memory_limited is set).
|
||||
"""
|
||||
from superset.common.query_object import QueryObject
|
||||
|
||||
mock_query_context = MagicMock()
|
||||
mock_query_context.force = False
|
||||
mock_query_context.form_data = {"slice_id": 42}
|
||||
|
||||
mock_datasource = MagicMock()
|
||||
mock_datasource.column_names = ["col1"]
|
||||
mock_datasource.uid = "test_ds"
|
||||
mock_datasource.cache_timeout = None
|
||||
mock_datasource.changed_on = None
|
||||
mock_datasource.get_extra_cache_keys.return_value = []
|
||||
mock_datasource.data = MagicMock()
|
||||
mock_datasource.data.get.return_value = {}
|
||||
|
||||
processor = QueryContextProcessor(mock_query_context)
|
||||
processor._qc_datasource = mock_datasource
|
||||
|
||||
query_obj = QueryObject(
|
||||
datasource=mock_datasource,
|
||||
columns=["col1"],
|
||||
)
|
||||
|
||||
with patch(
|
||||
"superset.common.query_context_processor.QueryCacheManager"
|
||||
) as mock_cache_manager:
|
||||
mock_cache = MagicMock()
|
||||
mock_cache.is_loaded = True
|
||||
mock_cache.df = pd.DataFrame({"col1": [1, 2, 3]})
|
||||
mock_cache.query = "SELECT col1 FROM table"
|
||||
mock_cache.error_message = None
|
||||
mock_cache.status = "success"
|
||||
mock_cache.applied_filter_columns = ["col1"]
|
||||
mock_cache.applied_template_filters = []
|
||||
mock_cache.rejected_filter_columns = []
|
||||
mock_cache.annotation_data = {}
|
||||
mock_cache.is_cached = True
|
||||
mock_cache.sql_rowcount = 3
|
||||
mock_cache.cache_dttm = "2024-01-01T00:00:00"
|
||||
mock_cache.queried_dttm = "2024-01-01T00:00:00"
|
||||
mock_cache.bq_memory_limited = True
|
||||
mock_cache.bq_memory_limited_row_count = 5000
|
||||
mock_cache_manager.get.return_value = mock_cache
|
||||
|
||||
with patch.object(query_obj, "validate", return_value=None):
|
||||
with patch.object(processor, "query_cache_key", return_value="key"):
|
||||
with patch.object(processor, "get_cache_timeout", return_value=3600):
|
||||
result = processor.get_df_payload(query_obj, force_cached=False)
|
||||
|
||||
assert result["warning"] is not None
|
||||
assert "Chart 42" in result["warning"]
|
||||
assert "5,000 rows" in result["warning"]
|
||||
assert "memory constraints" in result["warning"]
|
||||
|
||||
|
||||
def test_get_df_payload_no_warning_when_not_memory_limited():
|
||||
"""
|
||||
Test that get_df_payload does not include a warning when BigQuery
|
||||
results were not truncated.
|
||||
"""
|
||||
from superset.common.query_object import QueryObject
|
||||
|
||||
mock_query_context = MagicMock()
|
||||
mock_query_context.force = False
|
||||
mock_query_context.form_data = {}
|
||||
|
||||
mock_datasource = MagicMock()
|
||||
mock_datasource.column_names = ["col1"]
|
||||
mock_datasource.uid = "test_ds"
|
||||
mock_datasource.cache_timeout = None
|
||||
mock_datasource.changed_on = None
|
||||
mock_datasource.get_extra_cache_keys.return_value = []
|
||||
mock_datasource.data = MagicMock()
|
||||
mock_datasource.data.get.return_value = {}
|
||||
|
||||
processor = QueryContextProcessor(mock_query_context)
|
||||
processor._qc_datasource = mock_datasource
|
||||
|
||||
query_obj = QueryObject(
|
||||
datasource=mock_datasource,
|
||||
columns=["col1"],
|
||||
)
|
||||
|
||||
with patch(
|
||||
"superset.common.query_context_processor.QueryCacheManager"
|
||||
) as mock_cache_manager:
|
||||
mock_cache = MagicMock()
|
||||
mock_cache.is_loaded = True
|
||||
mock_cache.df = pd.DataFrame({"col1": [1, 2]})
|
||||
mock_cache.query = "SELECT col1 FROM table"
|
||||
mock_cache.error_message = None
|
||||
mock_cache.status = "success"
|
||||
mock_cache.applied_filter_columns = ["col1"]
|
||||
mock_cache.applied_template_filters = []
|
||||
mock_cache.rejected_filter_columns = []
|
||||
mock_cache.annotation_data = {}
|
||||
mock_cache.is_cached = True
|
||||
mock_cache.sql_rowcount = 2
|
||||
mock_cache.cache_dttm = "2024-01-01T00:00:00"
|
||||
mock_cache.queried_dttm = "2024-01-01T00:00:00"
|
||||
mock_cache.bq_memory_limited = False
|
||||
mock_cache_manager.get.return_value = mock_cache
|
||||
|
||||
with patch.object(query_obj, "validate", return_value=None):
|
||||
with patch.object(processor, "query_cache_key", return_value="key"):
|
||||
with patch.object(processor, "get_cache_timeout", return_value=3600):
|
||||
result = processor.get_df_payload(query_obj, force_cached=False)
|
||||
|
||||
assert result["warning"] is None
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
# pylint: disable=line-too-long, import-outside-toplevel, protected-access, invalid-name
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
@@ -528,3 +528,133 @@ def test_get_view_names_excludes_materialized_views() -> None:
|
||||
assert "table_type = 'VIEW'" in executed_query
|
||||
# Ensure it's not querying for materialized views
|
||||
assert "MATERIALIZED VIEW" not in executed_query
|
||||
|
||||
|
||||
def _patch_bq_fetch_deps(
|
||||
mocker: MockerFixture, max_mb: int = 200
|
||||
) -> tuple[mock.MagicMock, mock.MagicMock]:
|
||||
"""Helper to patch Flask g and current_app for BigQuery fetch_data tests."""
|
||||
flask_g = mocker.patch("superset.db_engine_specs.bigquery.g")
|
||||
app = mocker.patch("superset.db_engine_specs.bigquery.current_app")
|
||||
# Make current_app truthy and .config.get() return a plain int
|
||||
app.__bool__ = mock.Mock(return_value=True)
|
||||
app.config = mock.MagicMock()
|
||||
app.config.get = mock.Mock(return_value=max_mb)
|
||||
return flask_g, app
|
||||
|
||||
|
||||
def test_fetch_data_within_memory_limit(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Test that fetch_data returns all rows when the result fits within the
|
||||
configured memory limit.
|
||||
"""
|
||||
from superset.db_engine_specs.bigquery import BigQueryEngineSpec
|
||||
|
||||
rows = [(1, "a"), (2, "b"), (3, "c")]
|
||||
|
||||
cursor = mock.MagicMock()
|
||||
# First fetchmany returns all rows; the result set is smaller than limit
|
||||
cursor.fetchmany.return_value = rows
|
||||
|
||||
flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=200)
|
||||
|
||||
result = BigQueryEngineSpec.fetch_data(cursor, limit=100)
|
||||
|
||||
assert result == rows
|
||||
assert flask_g.bq_memory_limited is False
|
||||
assert flask_g.bq_memory_limited_row_count == 3
|
||||
|
||||
|
||||
def test_fetch_data_truncated_by_memory_limit(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Test that fetch_data truncates results and sets the memory_limited flag
|
||||
when the memory budget is exceeded.
|
||||
|
||||
We use a very small budget (1 MB) so that after the first batch the
|
||||
method computes ``remaining_rows <= 0``, hitting the truncation path.
|
||||
"""
|
||||
from superset.db_engine_specs.bigquery import BigQueryEngineSpec
|
||||
|
||||
# 1000 rows of ~10KB each --> first batch ~10 MB >> 1 MB budget
|
||||
first_batch = [(i, "x" * 10_000) for i in range(1000)]
|
||||
|
||||
cursor = mock.MagicMock()
|
||||
cursor.fetchmany.return_value = first_batch
|
||||
|
||||
# 1 MB budget: first batch exceeds it, so remaining_rows <= 0
|
||||
flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=1)
|
||||
|
||||
result = BigQueryEngineSpec.fetch_data(cursor, limit=None)
|
||||
|
||||
assert result == first_batch
|
||||
assert flask_g.bq_memory_limited is True
|
||||
assert flask_g.bq_memory_limited_row_count == len(first_batch)
|
||||
|
||||
|
||||
def test_fetch_data_empty_result(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Test that fetch_data handles an empty result set gracefully.
|
||||
"""
|
||||
from superset.db_engine_specs.bigquery import BigQueryEngineSpec
|
||||
|
||||
cursor = mock.MagicMock()
|
||||
cursor.fetchmany.return_value = []
|
||||
|
||||
flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=200)
|
||||
|
||||
result = BigQueryEngineSpec.fetch_data(cursor, limit=100)
|
||||
|
||||
assert result == []
|
||||
assert flask_g.bq_memory_limited is False
|
||||
assert flask_g.bq_memory_limited_row_count == 0
|
||||
|
||||
|
||||
def test_fetch_data_fallback_on_exception(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Test that fetch_data falls back to the parent implementation when the
|
||||
progressive fetch raises an exception.
|
||||
"""
|
||||
from superset.db_engine_specs.bigquery import BigQueryEngineSpec
|
||||
|
||||
cursor = mock.MagicMock()
|
||||
cursor.fetchmany.side_effect = RuntimeError("cursor error")
|
||||
cursor.fetchall.return_value = [(1, "a"), (2, "b")]
|
||||
cursor.description = [("col1", None), ("col2", None)]
|
||||
|
||||
flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=200)
|
||||
|
||||
result = BigQueryEngineSpec.fetch_data(cursor, limit=None)
|
||||
|
||||
assert result == [(1, "a"), (2, "b")]
|
||||
assert flask_g.bq_memory_limited is False
|
||||
assert flask_g.bq_memory_limited_row_count == 2
|
||||
|
||||
|
||||
def test_fetch_data_converts_bigquery_row_objects(mocker: MockerFixture) -> None:
|
||||
"""
|
||||
Test that BigQuery Row objects are converted to plain values.
|
||||
"""
|
||||
from superset.db_engine_specs.bigquery import BigQueryEngineSpec
|
||||
|
||||
class FakeRow:
|
||||
"""Mimics google.cloud.bigquery.table.Row"""
|
||||
|
||||
def __init__(self, vals: tuple[Any, ...]) -> None:
|
||||
self._vals = vals
|
||||
|
||||
def values(self) -> tuple[Any, ...]:
|
||||
return self._vals
|
||||
|
||||
FakeRow.__name__ = "Row"
|
||||
|
||||
rows = [FakeRow((1, "a")), FakeRow((2, "b"))]
|
||||
|
||||
cursor = mock.MagicMock()
|
||||
cursor.fetchmany.return_value = rows
|
||||
|
||||
flask_g, _ = _patch_bq_fetch_deps(mocker, max_mb=200)
|
||||
|
||||
result = BigQueryEngineSpec.fetch_data(cursor, limit=100)
|
||||
|
||||
assert result == [(1, "a"), (2, "b")]
|
||||
assert flask_g.bq_memory_limited is False
|
||||
|
||||
Reference in New Issue
Block a user