diff --git a/superset/commands/database/uploaders/csv_reader.py b/superset/commands/database/uploaders/csv_reader.py index 0072a7d5cfe..c8c43a25f1c 100644 --- a/superset/commands/database/uploaders/csv_reader.py +++ b/superset/commands/database/uploaders/csv_reader.py @@ -15,12 +15,15 @@ # specific language governing permissions and limitations # under the License. import logging +from importlib import util from typing import Any, Optional import pandas as pd +from flask import current_app from flask_babel import lazy_gettext as _ from werkzeug.datastructures import FileStorage +from superset import is_feature_enabled from superset.commands.database.exceptions import DatabaseUploadFailed from superset.commands.database.uploaders.base import ( BaseDataReader, @@ -30,8 +33,9 @@ from superset.commands.database.uploaders.base import ( logger = logging.getLogger(__name__) -READ_CSV_CHUNK_SIZE = 1000 -ROWS_TO_READ_METADATA = 2 +ROWS_TO_READ_METADATA = 100 +DEFAULT_ENCODING = "utf-8" +ENCODING_FALLBACKS = ["utf-8", "latin-1", "cp1252", "iso-8859-1"] class CSVReaderOptions(ReaderOptions, total=False): @@ -61,23 +65,153 @@ class CSVReader(BaseDataReader): ) @staticmethod - def _read_csv(file: FileStorage, kwargs: dict[str, Any]) -> pd.DataFrame: + def _detect_encoding(file: FileStorage) -> str: + """Detect file encoding with progressive sampling""" + # Try progressively larger samples to improve detection reliability + sample_sizes = [1024, 8192, 32768, 65536] + + for sample_size in sample_sizes: + file.seek(0) + sample = file.read(sample_size) + if not sample: # Empty file or reached end + break + + for encoding in ENCODING_FALLBACKS: + try: + sample.decode(encoding) + file.seek(0) + return encoding + except UnicodeDecodeError: + continue + + file.seek(0) + return DEFAULT_ENCODING + + @staticmethod + def _select_optimal_engine() -> str: + """Select the best available CSV parsing engine""" + try: + # Check if pyarrow is available as a separate package + pyarrow_spec = util.find_spec("pyarrow") + if not pyarrow_spec: + return "c" + + # Import pyarrow to verify it works properly + import pyarrow as pa # noqa: F401 + + # Check if pandas has built-in pyarrow support + pandas_version = str(pd.__version__) + has_builtin_pyarrow = "pyarrow" in pandas_version + + if has_builtin_pyarrow: + # Pandas has built-in pyarrow, safer to use c engine + logger.info("Pandas has built-in pyarrow support, using 'c' engine") + return "c" + else: + # External pyarrow available, can safely use it + logger.info("Using 'pyarrow' engine for CSV parsing") + return "pyarrow" + + except ImportError: + # PyArrow import failed, fall back to c engine + logger.info("PyArrow not properly installed, falling back to 'c' engine") + return "c" + except Exception as ex: + # Any other error, fall back to c engine + logger.warning( + f"Error selecting CSV engine: {ex}, falling back to 'c' engine" + ) + return "c" + + @staticmethod + def _read_csv( # noqa: C901 + file: FileStorage, + kwargs: dict[str, Any], + ) -> pd.DataFrame: + encoding = kwargs.get("encoding", DEFAULT_ENCODING) + + # PyArrow engine doesn't support iterator/chunksize/nrows + # It also has known issues with date parsing and missing values + # Default to "c" engine for stability + has_unsupported_options = ( + "chunksize" in kwargs + or "iterator" in kwargs + or kwargs.get("nrows") is not None + or kwargs.get("parse_dates") # Has bugs with multiple date columns + or kwargs.get("na_values") # Has bugs with missing value handling + ) + + # Use PyArrow engine if feature flag is enabled and options are compatible + if ( + is_feature_enabled("CSV_UPLOAD_PYARROW_ENGINE") + and not has_unsupported_options + ): + kwargs["engine"] = CSVReader._select_optimal_engine() + else: + # Default to c engine for reliability + kwargs["engine"] = "c" + + kwargs["low_memory"] = False + try: if "chunksize" in kwargs: - return pd.concat( - pd.read_csv( - filepath_or_buffer=file.stream, - **kwargs, - ) + chunks = [] + total_rows = 0 + max_rows = kwargs.get("nrows") + chunk_iterator = pd.read_csv( + filepath_or_buffer=file.stream, + **kwargs, ) + + for chunk in chunk_iterator: + # Check if adding this chunk would exceed the row limit + if max_rows is not None and total_rows + len(chunk) > max_rows: + # Only take the needed rows from this chunk + remaining_rows = max_rows - total_rows + chunk = chunk.iloc[:remaining_rows] + chunks.append(chunk) + break + + chunks.append(chunk) + total_rows += len(chunk) + + # Break if we've reached the desired number of rows + if max_rows is not None and total_rows >= max_rows: + break + + if chunks: + result = pd.concat(chunks, ignore_index=False) + # When using chunking, we need to reset and rebuild the index + if kwargs.get("index_col") is not None: + # The index was already set by pandas during read_csv + # Just need to ensure it's properly named after concatenation + index_col = kwargs.get("index_col") + if isinstance(index_col, str): + result.index.name = index_col + return result + return pd.DataFrame() + return pd.read_csv( filepath_or_buffer=file.stream, **kwargs, ) + except UnicodeDecodeError as ex: + if encoding != DEFAULT_ENCODING: + raise DatabaseUploadFailed( + message=_("Parsing error: %(error)s", error=str(ex)) + ) from ex + + file.seek(0) + detected_encoding = CSVReader._detect_encoding(file) + if detected_encoding != encoding: + kwargs["encoding"] = detected_encoding + return CSVReader._read_csv(file, kwargs) + raise DatabaseUploadFailed( + message=_("Parsing error: %(error)s", error=str(ex)) + ) from ex except ( pd.errors.ParserError, pd.errors.EmptyDataError, - UnicodeDecodeError, ValueError, ) as ex: raise DatabaseUploadFailed( @@ -93,31 +227,46 @@ class CSVReader(BaseDataReader): :return: pandas DataFrame :throws DatabaseUploadFailed: if there is an error reading the file """ + rows_to_read = self._options.get("rows_to_read") + chunk_size = current_app.config.get("READ_CSV_CHUNK_SIZE", 1000) + + use_chunking = rows_to_read is None or rows_to_read > chunk_size * 2 + kwargs = { - "chunksize": READ_CSV_CHUNK_SIZE, - "encoding": "utf-8", + "encoding": self._options.get("encoding", DEFAULT_ENCODING), "header": self._options.get("header_row", 0), "decimal": self._options.get("decimal_character", "."), "index_col": self._options.get("index_column"), "dayfirst": self._options.get("day_first", False), - "iterator": True, "keep_default_na": not self._options.get("null_values"), - "usecols": self._options.get("columns_read") - if self._options.get("columns_read") # None if an empty list - else None, - "na_values": self._options.get("null_values") - if self._options.get("null_values") # None if an empty list - else None, - "nrows": self._options.get("rows_to_read"), + "usecols": ( + self._options.get("columns_read") + if self._options.get("columns_read") # None if an empty list + else None + ), + "na_values": ( + self._options.get("null_values") + if self._options.get("null_values") # None if an empty list + else None + ), + "nrows": rows_to_read, "parse_dates": self._options.get("column_dates"), "sep": self._options.get("delimiter", ","), "skip_blank_lines": self._options.get("skip_blank_lines", False), "skipinitialspace": self._options.get("skip_initial_space", False), "skiprows": self._options.get("skip_rows", 0), - "dtype": self._options.get("column_data_types") - if self._options.get("column_data_types") - else None, + "dtype": ( + self._options.get("column_data_types") + if self._options.get("column_data_types") + else None + ), + "cache_dates": True, } + + if use_chunking: + kwargs["chunksize"] = chunk_size + kwargs["iterator"] = True + return self._read_csv(file, kwargs) def file_metadata(self, file: FileStorage) -> FileMetadata: @@ -131,6 +280,8 @@ class CSVReader(BaseDataReader): "nrows": ROWS_TO_READ_METADATA, "header": self._options.get("header_row", 0), "sep": self._options.get("delimiter", ","), + "encoding": self._options.get("encoding", DEFAULT_ENCODING), + "low_memory": False, } df = self._read_csv(file, kwargs) return { diff --git a/superset/config.py b/superset/config.py index 579ad02139d..48ae8c24cd3 100644 --- a/superset/config.py +++ b/superset/config.py @@ -508,6 +508,8 @@ DEFAULT_FEATURE_FLAGS: dict[str, bool] = { # geospatial ones) by inputting javascript in controls. This exposes # an XSS security vulnerability "ENABLE_JAVASCRIPT_CONTROLS": False, # deprecated + # Experimental PyArrow engine for CSV parsing (may have issues with dates/nulls) + "CSV_UPLOAD_PYARROW_ENGINE": False, # When this feature is enabled, nested types in Presto will be # expanded into extra columns and/or arrays. This is experimental, # and doesn't work with all nested types. @@ -1322,6 +1324,10 @@ ALLOWED_USER_CSV_SCHEMA_FUNC = allowed_schemas_for_csv_upload # Values that should be treated as nulls for the csv uploads. CSV_DEFAULT_NA_NAMES = list(STR_NA_VALUES) +# Chunk size for reading CSV files during uploads +# Smaller values use less memory but may be slower for large files +READ_CSV_CHUNK_SIZE = 1000 + # A dictionary of items that gets merged into the Jinja context for # SQL Lab. The existing context gets updated with this dictionary, # meaning values for existing keys get overwritten by the content of this diff --git a/superset/db_engine_specs/clickhouse.py b/superset/db_engine_specs/clickhouse.py index ada2f9f5e40..d743e09be37 100644 --- a/superset/db_engine_specs/clickhouse.py +++ b/superset/db_engine_specs/clickhouse.py @@ -54,6 +54,7 @@ class ClickHouseBaseEngineSpec(BaseEngineSpec): """Shared engine spec for ClickHouse.""" time_groupby_inline = True + supports_multivalues_insert = True _time_grain_expressions = { None: "{col}", diff --git a/superset/db_engine_specs/db2.py b/superset/db_engine_specs/db2.py index 8dd7b00a6b9..8a09f5af3c1 100644 --- a/superset/db_engine_specs/db2.py +++ b/superset/db_engine_specs/db2.py @@ -36,6 +36,7 @@ class Db2EngineSpec(BaseEngineSpec): max_column_name_length = 30 supports_dynamic_schema = True + supports_multivalues_insert = True _time_grain_expressions = { None: "{col}", diff --git a/superset/db_engine_specs/duckdb.py b/superset/db_engine_specs/duckdb.py index dbaa5f9383d..b213f56013d 100644 --- a/superset/db_engine_specs/duckdb.py +++ b/superset/db_engine_specs/duckdb.py @@ -196,6 +196,7 @@ class DuckDBEngineSpec(DuckDBParametersMixin, BaseEngineSpec): default_driver = "duckdb_engine" sqlalchemy_uri_placeholder = "duckdb:////path/to/duck.db" + supports_multivalues_insert = True # DuckDB-specific column type mappings to ensure float/double types are recognized column_type_mappings = ( diff --git a/superset/db_engine_specs/mysql.py b/superset/db_engine_specs/mysql.py index fc0606697be..8c071f8dab6 100644 --- a/superset/db_engine_specs/mysql.py +++ b/superset/db_engine_specs/mysql.py @@ -74,6 +74,7 @@ class MySQLEngineSpec(BasicParametersMixin, BaseEngineSpec): encryption_parameters = {"ssl": "1"} supports_dynamic_schema = True + supports_multivalues_insert = True column_type_mappings = ( ( diff --git a/superset/db_engine_specs/oracle.py b/superset/db_engine_specs/oracle.py index 37c779b8aa9..e40223730b1 100644 --- a/superset/db_engine_specs/oracle.py +++ b/superset/db_engine_specs/oracle.py @@ -28,6 +28,7 @@ class OracleEngineSpec(BaseEngineSpec): engine_name = "Oracle" force_column_alias_quotes = True max_column_name_length = 128 + supports_multivalues_insert = True _time_grain_expressions = { None: "{col}", diff --git a/superset/db_engine_specs/postgres.py b/superset/db_engine_specs/postgres.py index b86172cb3a3..3e95ecc7901 100644 --- a/superset/db_engine_specs/postgres.py +++ b/superset/db_engine_specs/postgres.py @@ -99,6 +99,7 @@ class PostgresBaseEngineSpec(BaseEngineSpec): engine = "" engine_name = "PostgreSQL" + supports_multivalues_insert = True _time_grain_expressions = { None: "{col}", diff --git a/superset/db_engine_specs/sqlite.py b/superset/db_engine_specs/sqlite.py index a7e9a96e337..f8604bcfead 100644 --- a/superset/db_engine_specs/sqlite.py +++ b/superset/db_engine_specs/sqlite.py @@ -42,6 +42,7 @@ class SqliteEngineSpec(BaseEngineSpec): engine_name = "SQLite" disable_ssh_tunneling = True + supports_multivalues_insert = True _time_grain_expressions = { None: "{col}", diff --git a/tests/unit_tests/commands/databases/csv_reader_test.py b/tests/unit_tests/commands/databases/csv_reader_test.py index 2fb6bc5c6ce..effbad56398 100644 --- a/tests/unit_tests/commands/databases/csv_reader_test.py +++ b/tests/unit_tests/commands/databases/csv_reader_test.py @@ -18,6 +18,7 @@ import io from datetime import datetime import numpy as np +import pandas as pd import pytest from werkzeug.datastructures import FileStorage @@ -321,16 +322,63 @@ def test_csv_reader_invalid_file(): def test_csv_reader_invalid_encoding(): + """Test that encoding detection automatically handles problematic encoding.""" csv_reader = CSVReader( options=CSVReaderOptions(), ) binary_data = b"col1,col2,col3\nv1,v2,\xba\nv3,v4,v5\n" + # The new encoding detection should automatically handle this + df = csv_reader.file_to_dataframe(FileStorage(io.BytesIO(binary_data))) + assert df.columns.tolist() == ["col1", "col2", "col3"] + assert len(df) == 2 # Should have 2 data rows + + +def test_csv_reader_encoding_detection_latin1(): + """Test automatic encoding detection for Latin-1 encoded files.""" + csv_reader = CSVReader( + options=CSVReaderOptions(), + ) + # Create a Latin-1 encoded file with special characters + binary_data = "col1,col2,col3\nCafé,Résumé,naïve\n".encode("latin-1") + df = csv_reader.file_to_dataframe(FileStorage(io.BytesIO(binary_data))) + assert df.columns.tolist() == ["col1", "col2", "col3"] + assert df.values.tolist() == [["Café", "Résumé", "naïve"]] + + +def test_csv_reader_encoding_detection_iso88591(): + """Test automatic encoding detection for ISO-8859-1 encoded files.""" + csv_reader = CSVReader( + options=CSVReaderOptions(), + ) + # Create an ISO-8859-1 encoded file with special characters + binary_data = "col1,col2\nCafé,naïve\n".encode("iso-8859-1") + df = csv_reader.file_to_dataframe(FileStorage(io.BytesIO(binary_data))) + assert df.columns.tolist() == ["col1", "col2"] + assert df.values.tolist() == [["Café", "naïve"]] + + +def test_csv_reader_explicit_encoding(): + """Test that explicit encoding is respected.""" + csv_reader = CSVReader( + options=CSVReaderOptions(encoding="latin-1"), + ) + # Create a Latin-1 encoded file + binary_data = "col1,col2\nCafé,naïve\n".encode("latin-1") + df = csv_reader.file_to_dataframe(FileStorage(io.BytesIO(binary_data))) + assert df.columns.tolist() == ["col1", "col2"] + assert df.values.tolist() == [["Café", "naïve"]] + + +def test_csv_reader_encoding_detection_failure(): + """Test that undecodable files raise appropriate error.""" + csv_reader = CSVReader( + options=CSVReaderOptions(encoding="ascii"), # Force ASCII encoding + ) + # Create data that can't be decoded as ASCII + binary_data = b"col1,col2\n\xff\xfe,test\n" with pytest.raises(DatabaseUploadFailed) as ex: csv_reader.file_to_dataframe(FileStorage(io.BytesIO(binary_data))) - assert str(ex.value) == ( - "Parsing error: 'utf-8' codec can't decode byte 0xba in" - " position 21: invalid start byte" - ) + assert "Parsing error" in str(ex.value) def test_csv_reader_file_metadata(): @@ -371,3 +419,354 @@ def test_csv_reader_file_metadata_invalid_file(): "Parsing error: Error tokenizing data. C error:" " Expected 3 fields in line 3, saw 7\n" ) + + +def test_csv_reader_chunking_large_file(): + """Test that chunking is used for large files.""" + # Create a large CSV with more than 100k rows + large_data = [["col1", "col2", "col3"]] + for i in range(100001): + large_data.append([f"val{i}", str(i), f"data{i}"]) + + csv_reader = CSVReader( + options=CSVReaderOptions(), + ) + df = csv_reader.file_to_dataframe(create_csv_file(large_data)) + assert len(df) == 100001 + assert df.columns.tolist() == ["col1", "col2", "col3"] + assert df.iloc[0].tolist() == ["val0", 0, "data0"] + assert df.iloc[-1].tolist() == ["val100000", 100000, "data100000"] + + +def test_csv_reader_chunking_with_rows_limit(): + """Test that chunking respects rows_to_read limit.""" + # Create a CSV with more than the chunk size + large_data = [["col1", "col2"]] + for i in range(60000): # More than chunk size of 50000 + large_data.append([f"val{i}", str(i)]) + + csv_reader = CSVReader( + options=CSVReaderOptions(rows_to_read=55000), + ) + df = csv_reader.file_to_dataframe(create_csv_file(large_data)) + assert len(df) == 55000 + assert df.columns.tolist() == ["col1", "col2"] + + +def test_csv_reader_no_chunking_small_file(): + """Test that chunking is not used for small files.""" + # Create a small CSV (less than 2 * chunk size) + small_data = [["col1", "col2"]] + for i in range(1000): # Much less than chunk size + small_data.append([f"val{i}", str(i)]) + + csv_reader = CSVReader( + options=CSVReaderOptions(rows_to_read=1000), + ) + df = csv_reader.file_to_dataframe(create_csv_file(small_data)) + assert len(df) == 1000 + assert df.columns.tolist() == ["col1", "col2"] + + +def test_csv_reader_engine_selection(): + """Test engine selection based on feature flag.""" + from unittest.mock import MagicMock, patch + + csv_reader = CSVReader( + options=CSVReaderOptions(), + ) + + # Test 1: Feature flag disabled (default) - should use c engine + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + with patch( + "superset.commands.database.uploaders.csv_reader.is_feature_enabled" + ) as mock_flag: + mock_flag.return_value = False + mock_pd.__version__ = "2.0.0" + mock_pd.read_csv = MagicMock(return_value=pd.DataFrame({"col1": [1, 2, 3]})) + mock_pd.DataFrame = pd.DataFrame + + file = create_csv_file([["col1"], ["1"], ["2"], ["3"]]) + csv_reader.file_to_dataframe(file) + + # Check that c engine is selected when feature flag is disabled + call_kwargs = mock_pd.read_csv.call_args[1] + assert call_kwargs.get("engine") == "c" + + # Test 2: Feature flag enabled - pyarrow would be used but chunking prevents it + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + with patch( + "superset.commands.database.uploaders.csv_reader.is_feature_enabled" + ) as mock_flag: + with patch("importlib.util") as mock_util: + mock_flag.return_value = True + mock_pd.__version__ = "2.0.0" + mock_pd.read_csv = MagicMock( + return_value=pd.DataFrame({"col1": [1, 2, 3]}) + ) + mock_pd.DataFrame = pd.DataFrame + mock_pd.concat = MagicMock( + return_value=pd.DataFrame({"col1": [1, 2, 3]}) + ) + mock_util.find_spec = MagicMock(return_value=True) + + file = create_csv_file([["col1"], ["1"], ["2"], ["3"]]) + csv_reader.file_to_dataframe(file) + + # Check that c engine is selected due to chunking (default behavior) + # Even with feature flag enabled, chunking prevents pyarrow usage + call_kwargs = mock_pd.read_csv.call_args[1] + assert call_kwargs.get("engine") == "c" + + # Test 3: Feature flag enabled but unsupported options - should use c engine + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + with patch( + "superset.commands.database.uploaders.csv_reader.is_feature_enabled" + ) as mock_flag: + mock_flag.return_value = True + mock_pd.__version__ = "2.0.0" + mock_pd.read_csv = MagicMock(return_value=pd.DataFrame({"col1": [1, 2, 3]})) + mock_pd.DataFrame = pd.DataFrame + + # Create reader with date parsing (unsupported by pyarrow) + csv_reader_with_dates = CSVReader( + options=CSVReaderOptions(column_dates=["date_col"]), + ) + file = create_csv_file([["date_col"], ["2023-01-01"]]) + csv_reader_with_dates.file_to_dataframe(file) + + # Check that c engine is selected due to unsupported options + call_kwargs = mock_pd.read_csv.call_args[1] + assert call_kwargs.get("engine") == "c" + + +def test_csv_reader_low_memory_setting(): + """Test that low_memory is set to False.""" + from unittest.mock import MagicMock, patch + + csv_reader = CSVReader( + options=CSVReaderOptions(), + ) + + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + mock_pd.__version__ = "2.0.0" + mock_pd.read_csv = MagicMock(return_value=pd.DataFrame({"col1": [1, 2, 3]})) + mock_pd.DataFrame = pd.DataFrame + + file = create_csv_file([["col1"], ["1"], ["2"], ["3"]]) + csv_reader.file_to_dataframe(file) + + # Check that low_memory=False was set + call_kwargs = mock_pd.read_csv.call_args[1] + assert call_kwargs.get("low_memory") is False + + +def test_csv_reader_cache_dates_setting(): + """Test that cache_dates is set to True for performance.""" + from unittest.mock import MagicMock, patch + + csv_reader = CSVReader( + options=CSVReaderOptions(column_dates=["date_col"]), + ) + + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + mock_pd.__version__ = "2.0.0" + mock_pd.read_csv = MagicMock( + return_value=pd.DataFrame({"date_col": ["2023-01-01"]}) + ) + mock_pd.DataFrame = pd.DataFrame + + file = create_csv_file([["date_col"], ["2023-01-01"]]) + csv_reader.file_to_dataframe(file) + + # Check that cache_dates=True was set + call_kwargs = mock_pd.read_csv.call_args[1] + assert call_kwargs.get("cache_dates") is True + + +def test_csv_reader_pyarrow_feature_flag(): + """ + Test that the CSV_UPLOAD_PYARROW_ENGINE feature flag controls engine selection. + """ + import io + from unittest.mock import MagicMock, patch + + from werkzeug.datastructures import FileStorage + + # Test _read_csv directly to avoid the file_to_dataframe chunking logic + with patch( + "superset.commands.database.uploaders.csv_reader.is_feature_enabled" + ) as mock_flag: + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + with patch.object( + CSVReader, "_select_optimal_engine" + ) as mock_engine_select: + # Test 1: FF enabled, pyarrow available, no unsupported options + mock_flag.return_value = True + mock_pd.__version__ = "2.0.0" + mock_pd.read_csv = MagicMock(return_value=pd.DataFrame({"col1": [1]})) + mock_engine_select.return_value = "pyarrow" + + # Create clean kwargs without any problematic options + clean_kwargs = { + "encoding": "utf-8", + "low_memory": False, + # No chunksize, iterator, nrows, parse_dates, or na_values + } + + file = FileStorage(io.StringIO("col1\nval1")) + CSVReader._read_csv(file, clean_kwargs) + + # Verify feature flag was checked + mock_flag.assert_called_with("CSV_UPLOAD_PYARROW_ENGINE") + + # Verify engine selection method was called + mock_engine_select.assert_called_once() + + # Verify pyarrow engine was selected + call_kwargs = mock_pd.read_csv.call_args[1] + assert call_kwargs.get("engine") == "pyarrow" + + # Test 2: Feature flag disabled + with patch( + "superset.commands.database.uploaders.csv_reader.is_feature_enabled" + ) as mock_flag: + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + mock_flag.return_value = False + mock_pd.__version__ = "2.0.0" + mock_pd.read_csv = MagicMock(return_value=pd.DataFrame({"col1": [1]})) + + clean_kwargs = { + "encoding": "utf-8", + "low_memory": False, + } + + file = FileStorage(io.StringIO("col1\nval1")) + CSVReader._read_csv(file, clean_kwargs) + + # Verify feature flag was checked + mock_flag.assert_called_with("CSV_UPLOAD_PYARROW_ENGINE") + + # Verify c engine was selected when flag is disabled + call_kwargs = mock_pd.read_csv.call_args[1] + assert call_kwargs.get("engine") == "c" + + # Test 3: Feature flag enabled but unsupported options present + with patch( + "superset.commands.database.uploaders.csv_reader.is_feature_enabled" + ) as mock_flag: + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + mock_flag.return_value = True + mock_pd.__version__ = "2.0.0" + mock_pd.read_csv = MagicMock(return_value=pd.DataFrame({"col1": [1]})) + + # Include unsupported options + unsupported_kwargs = { + "encoding": "utf-8", + "low_memory": False, + "nrows": 100, # Unsupported by pyarrow + } + + file = FileStorage(io.StringIO("col1\nval1")) + CSVReader._read_csv(file, unsupported_kwargs) + + # Verify c engine was selected due to unsupported options + call_kwargs = mock_pd.read_csv.call_args[1] + assert call_kwargs.get("engine") == "c" + + +def test_csv_reader_select_optimal_engine(): + """Test the _select_optimal_engine method with different scenarios.""" + from unittest.mock import MagicMock, patch + + # Test 1: PyArrow available, no built-in support + with patch("superset.commands.database.uploaders.csv_reader.util") as mock_util: + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + with patch("superset.commands.database.uploaders.csv_reader.logger"): + mock_util.find_spec = MagicMock( + return_value=MagicMock() + ) # PyArrow found + mock_pd.__version__ = "2.0.0" # No pyarrow in version + + # Mock successful pyarrow import + with patch.dict("sys.modules", {"pyarrow": MagicMock()}): + result = CSVReader._select_optimal_engine() + assert result == "pyarrow" + + # Test 2: PyArrow not available + with patch("superset.commands.database.uploaders.csv_reader.util") as mock_util: + with patch("superset.commands.database.uploaders.csv_reader.logger"): + mock_util.find_spec = MagicMock(return_value=None) # PyArrow not found + + result = CSVReader._select_optimal_engine() + assert result == "c" + + # Test 3: Pandas with built-in pyarrow + with patch("superset.commands.database.uploaders.csv_reader.util") as mock_util: + with patch("superset.commands.database.uploaders.csv_reader.pd") as mock_pd: + with patch("superset.commands.database.uploaders.csv_reader.logger"): + mock_util.find_spec = MagicMock( + return_value=MagicMock() + ) # PyArrow found + mock_pd.__version__ = "2.0.0+pyarrow" # Has pyarrow in version + + # Mock successful pyarrow import + with patch.dict("sys.modules", {"pyarrow": MagicMock()}): + result = CSVReader._select_optimal_engine() + assert result == "c" + + # Test 4: PyArrow import fails + with patch("superset.commands.database.uploaders.csv_reader.util") as mock_util: + with patch("superset.commands.database.uploaders.csv_reader.logger"): + mock_util.find_spec = MagicMock(return_value=MagicMock()) # PyArrow found + + # Mock import error + with patch( + "builtins.__import__", side_effect=ImportError("PyArrow import failed") + ): + result = CSVReader._select_optimal_engine() + assert result == "c" + + +def test_csv_reader_progressive_encoding_detection(): + """Test that progressive encoding detection uses multiple sample sizes.""" + import io + + from werkzeug.datastructures import FileStorage + + # Create a file with latin-1 encoding that will require detection + content = "col1,col2,col3\n" + "café,résumé,naïve\n" + binary_data = content.encode("latin-1") + + file = FileStorage(io.BytesIO(binary_data)) + + # Track read calls to verify progressive sampling + original_read = file.read + read_calls = [] + read_sizes = [] + + def track_read(size): + read_calls.append(size) + read_sizes.append(size) + file.seek(0) # Reset position for consistent reading + result = original_read(size) + file.seek(0) # Reset again + return result + + file.read = track_read + + # Call encoding detection + detected_encoding = CSVReader._detect_encoding(file) + + # Should detect the correct encoding + assert detected_encoding in [ + "latin-1", + "utf-8", + ], f"Should detect valid encoding, got {detected_encoding}" + + # Should have made multiple read attempts with different sizes + # (The method tries multiple sample sizes until it finds a working encoding) + assert len(read_calls) >= 1, f"Should have made read calls, got {read_calls}" + + # Test that the method handles the sample sizes properly + assert all(size > 0 for size in read_sizes), "All sample sizes should be positive"