mirror of
https://github.com/apache/superset.git
synced 2026-04-18 23:55:00 +00:00
fix: Upload CSV as Dataset (#34763)
This commit is contained in:
@@ -33,6 +33,10 @@ from superset.commands.database.uploaders.base import (
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Fixed error limit to avoid huge payloads and poor UX given that a file
|
||||
# might contain thousands of errors.
|
||||
MAX_DISPLAYED_ERRORS = 5
|
||||
|
||||
ROWS_TO_READ_METADATA = 100
|
||||
DEFAULT_ENCODING = "utf-8"
|
||||
ENCODING_FALLBACKS = ["utf-8", "latin-1", "cp1252", "iso-8859-1"]
|
||||
@@ -123,6 +127,205 @@ class CSVReader(BaseDataReader):
|
||||
)
|
||||
return "c"
|
||||
|
||||
@staticmethod
|
||||
def _find_invalid_values_numeric(df: pd.DataFrame, column: str) -> pd.Series:
|
||||
"""
|
||||
Find invalid values for numeric type conversion.
|
||||
|
||||
Identifies rows where values cannot be converted to numeric types using
|
||||
pandas to_numeric with error coercing. Returns a boolean mask indicating
|
||||
which values are invalid (non-null but unconvertible).
|
||||
|
||||
:param df: DataFrame containing the data
|
||||
:param column: Name of the column to check for invalid values
|
||||
|
||||
:return: Boolean Series indicating which rows have invalid
|
||||
values for numeric conversion
|
||||
"""
|
||||
converted = pd.to_numeric(df[column], errors="coerce")
|
||||
return converted.isna() & df[column].notna()
|
||||
|
||||
@staticmethod
|
||||
def _find_invalid_values_non_numeric(
|
||||
df: pd.DataFrame, column: str, dtype: str
|
||||
) -> pd.Series:
|
||||
"""
|
||||
Find invalid values for non-numeric type conversion.
|
||||
|
||||
Identifies rows where values cannot be converted to the specified non-numeric
|
||||
data type by attempting conversion and catching exceptions. This is used for
|
||||
string, categorical, or other non-numeric type conversions.
|
||||
|
||||
:param df: DataFrame containing the data
|
||||
:param column: Name of the column to check for invalid values
|
||||
:param dtype: Target data type for conversion (e.g., 'string', 'category')
|
||||
|
||||
:return: Boolean Series indicating which rows have
|
||||
invalid values for the target type
|
||||
"""
|
||||
invalid_mask = pd.Series([False] * len(df), index=df.index)
|
||||
for idx, value in df[column].items():
|
||||
if pd.notna(value):
|
||||
try:
|
||||
pd.Series([value]).astype(dtype)
|
||||
except (ValueError, TypeError):
|
||||
invalid_mask[idx] = True
|
||||
return invalid_mask
|
||||
|
||||
@staticmethod
|
||||
def _get_error_details(
|
||||
df: pd.DataFrame,
|
||||
column: str,
|
||||
dtype: str,
|
||||
invalid_mask: pd.Series,
|
||||
kwargs: dict[str, Any],
|
||||
) -> tuple[list[str], int]:
|
||||
"""
|
||||
Get detailed error information for invalid values in type conversion.
|
||||
|
||||
Extracts detailed information about conversion errors, including specific
|
||||
invalid values and their line numbers. Limits the number of detailed errors
|
||||
shown to avoid overwhelming output while providing total error count.
|
||||
|
||||
:param df: DataFrame containing the data
|
||||
:param column: Name of the column with conversion errors
|
||||
:param dtype: Target data type that failed conversion
|
||||
:param invalid_mask: Boolean mask indicating which rows have invalid values
|
||||
:param kwargs: Additional parameters including header row information
|
||||
|
||||
:return: Tuple containing:
|
||||
- List of formatted error detail strings (limited by MAX_DISPLAYED_ERRORS)
|
||||
- Total count of errors found
|
||||
"""
|
||||
if not invalid_mask.any():
|
||||
return [], 0
|
||||
|
||||
invalid_indices = invalid_mask[invalid_mask].index.tolist()
|
||||
total_errors = len(invalid_indices)
|
||||
|
||||
error_details = []
|
||||
for idx in invalid_indices[:MAX_DISPLAYED_ERRORS]:
|
||||
invalid_value = df.loc[idx, column]
|
||||
line_number = idx + kwargs.get("header", 0) + 2
|
||||
error_details.append(
|
||||
f" • Line {line_number}: '{invalid_value}' cannot be converted to "
|
||||
f"{dtype}"
|
||||
)
|
||||
|
||||
return error_details, total_errors
|
||||
|
||||
@staticmethod
|
||||
def _create_error_message(
|
||||
df: pd.DataFrame,
|
||||
column: str,
|
||||
dtype: str,
|
||||
invalid_mask: pd.Series,
|
||||
kwargs: dict[str, Any],
|
||||
original_error: Exception,
|
||||
) -> str:
|
||||
"""
|
||||
Create detailed error message for type conversion failure.
|
||||
|
||||
Constructs a comprehensive error message that includes:
|
||||
- Column name and target type
|
||||
- Total count of errors found
|
||||
- Detailed list of first few errors with line numbers and values
|
||||
- Summary of remaining errors if exceeding display limit
|
||||
|
||||
:param df: DataFrame containing the data
|
||||
:param column: Name of the column that failed conversion
|
||||
:param dtype: Target data type that failed
|
||||
:param invalid_mask: Boolean mask indicating which rows have invalid values
|
||||
:param kwargs: Additional parameters including header information
|
||||
:param original_error: Original exception that triggered the error handling
|
||||
|
||||
:return: Formatted error message string ready for display to user
|
||||
"""
|
||||
error_details, total_errors = CSVReader._get_error_details(
|
||||
df, column, dtype, invalid_mask, kwargs
|
||||
)
|
||||
|
||||
if error_details:
|
||||
base_msg = (
|
||||
f"Cannot convert column '{column}' to {dtype}. "
|
||||
f"Found {total_errors} error(s):"
|
||||
)
|
||||
detailed_errors = "\n".join(error_details)
|
||||
|
||||
if total_errors > MAX_DISPLAYED_ERRORS:
|
||||
remaining = total_errors - MAX_DISPLAYED_ERRORS
|
||||
additional_msg = f"\n ... and {remaining} more error(s)"
|
||||
return f"{base_msg}\n{detailed_errors}{additional_msg}"
|
||||
else:
|
||||
return f"{base_msg}\n{detailed_errors}"
|
||||
else:
|
||||
return f"Cannot convert column '{column}' to {dtype}. {str(original_error)}"
|
||||
|
||||
@staticmethod
|
||||
def _cast_single_column(
|
||||
df: pd.DataFrame, column: str, dtype: str, kwargs: dict[str, Any]
|
||||
) -> None:
|
||||
"""
|
||||
Cast a single DataFrame column to the specified data type.
|
||||
|
||||
Attempts to convert a column to the target data type with enhanced error
|
||||
handling. For numeric types, uses pandas to_numeric for better performance
|
||||
and error detection. If conversion fails, provides detailed
|
||||
error messages including specific invalid values and their line numbers.
|
||||
|
||||
:param df: DataFrame to modify (modified in-place)
|
||||
:param column: Name of the column to cast
|
||||
:param dtype: Target data type (e.g., 'int64', 'float64', 'string')
|
||||
:param kwargs: Additional parameters including header row information
|
||||
|
||||
:raises DatabaseUploadFailed: If type conversion fails,
|
||||
with detailed error message
|
||||
"""
|
||||
numeric_types = {"int64", "int32", "float64", "float32"}
|
||||
|
||||
try:
|
||||
if dtype in numeric_types:
|
||||
df[column] = pd.to_numeric(df[column], errors="raise")
|
||||
df[column] = df[column].astype(dtype)
|
||||
else:
|
||||
df[column] = df[column].astype(dtype)
|
||||
except (ValueError, TypeError) as ex:
|
||||
try:
|
||||
if dtype in numeric_types:
|
||||
invalid_mask = CSVReader._find_invalid_values_numeric(df, column)
|
||||
else:
|
||||
invalid_mask = CSVReader._find_invalid_values_non_numeric(
|
||||
df, column, dtype
|
||||
)
|
||||
|
||||
error_msg = CSVReader._create_error_message(
|
||||
df, column, dtype, invalid_mask, kwargs, ex
|
||||
)
|
||||
except Exception:
|
||||
error_msg = f"Cannot convert column '{column}' to {dtype}. {str(ex)}"
|
||||
|
||||
raise DatabaseUploadFailed(message=error_msg) from ex
|
||||
|
||||
@staticmethod
|
||||
def _cast_column_types(
|
||||
df: pd.DataFrame, types: dict[str, str], kwargs: dict[str, Any]
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
Cast DataFrame columns to specified types with detailed
|
||||
error reporting.
|
||||
|
||||
:param df: DataFrame to cast
|
||||
:param types: Dictionary mapping column names to target types
|
||||
:param kwargs: Original read_csv kwargs for line number calculation
|
||||
:return: DataFrame with casted columns
|
||||
:raises DatabaseUploadFailed: If type conversion fails with detailed error info
|
||||
"""
|
||||
for column, dtype in types.items():
|
||||
if column not in df.columns:
|
||||
continue
|
||||
CSVReader._cast_single_column(df, column, dtype, kwargs)
|
||||
return df
|
||||
|
||||
@staticmethod
|
||||
def _read_csv( # noqa: C901
|
||||
file: FileStorage,
|
||||
@@ -154,6 +357,7 @@ class CSVReader(BaseDataReader):
|
||||
kwargs["low_memory"] = False
|
||||
|
||||
try:
|
||||
types = kwargs.pop("dtype", None)
|
||||
if "chunksize" in kwargs:
|
||||
chunks = []
|
||||
total_rows = 0
|
||||
@@ -188,13 +392,19 @@ class CSVReader(BaseDataReader):
|
||||
index_col = kwargs.get("index_col")
|
||||
if isinstance(index_col, str):
|
||||
result.index.name = index_col
|
||||
return result
|
||||
return pd.DataFrame()
|
||||
df = result
|
||||
else:
|
||||
df = pd.read_csv(
|
||||
filepath_or_buffer=file.stream,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
return pd.read_csv(
|
||||
filepath_or_buffer=file.stream,
|
||||
**kwargs,
|
||||
)
|
||||
if types:
|
||||
df = CSVReader._cast_column_types(df, types, kwargs)
|
||||
|
||||
return df
|
||||
except DatabaseUploadFailed:
|
||||
raise
|
||||
except UnicodeDecodeError as ex:
|
||||
if encoding != DEFAULT_ENCODING:
|
||||
raise DatabaseUploadFailed(
|
||||
|
||||
Reference in New Issue
Block a user