Replace pandas.DataFrame with PyArrow.Table for nullable int typing (#8733)

* Use PyArrow Table for query result serialization

* Cleanup dev comments

* Additional cleanup

* WIP: tests

* Remove explicit dtype logic from db_engine_specs

* Remove obsolete  column property

* SupersetTable column types

* Port SupersetDataFrame methods to SupersetTable

* Add test for nullable boolean columns

* Support datetime values with timezone offsets

* Black formatting

* Pylint

* More linting/formatting

* Resolve issue with timezones not appearing in results

* Types

* Enable running of tests in tests/db_engine_specs

* Resolve application context errors

* Refactor and add tests for pyodbc.Row conversion

* Appease isort, regardless of isort:skip

* Re-enable RESULTS_BACKEND_USE_MSGPACK default based on benchmarks

* Dataframe typing and nits

* Renames to reduce ambiguity
This commit is contained in:
Rob DiCiuccio
2020-01-03 16:55:39 +00:00
committed by Maxime Beauchemin
parent 4f8bf2b04d
commit 6537d5ed8c
16 changed files with 438 additions and 513 deletions

View File

@@ -19,7 +19,7 @@ import uuid
from contextlib import closing
from datetime import datetime
from sys import getsizeof
from typing import Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union
import backoff
import msgpack
@@ -39,10 +39,11 @@ from superset import (
results_backend_use_msgpack,
security_manager,
)
from superset.dataframe import SupersetDataFrame
from superset.dataframe import df_to_records
from superset.db_engine_specs import BaseEngineSpec
from superset.extensions import celery_app
from superset.models.sql_lab import Query
from superset.result_set import SupersetResultSet
from superset.sql_parse import ParsedQuery
from superset.utils.core import json_iso_dttm_ser, QueryStatus, sources, zlib_compress
from superset.utils.dates import now_as_float
@@ -251,7 +252,7 @@ def execute_sql_statement(sql_statement, query, user_name, session, cursor, log_
logger.debug(f"Query {query.id}: Fetching cursor description")
cursor_description = cursor.description
return SupersetDataFrame(data, cursor_description, db_engine_spec)
return SupersetResultSet(data, cursor_description, db_engine_spec)
def _serialize_payload(
@@ -265,13 +266,13 @@ def _serialize_payload(
def _serialize_and_expand_data(
cdf: SupersetDataFrame,
result_set: SupersetResultSet,
db_engine_spec: BaseEngineSpec,
use_msgpack: Optional[bool] = False,
expand_data: bool = False,
) -> Tuple[Union[bytes, str], list, list, list]:
selected_columns: list = cdf.columns or []
expanded_columns: list
selected_columns: List[Dict] = result_set.columns
expanded_columns: List[Dict]
if use_msgpack:
with stats_timing(
@@ -279,14 +280,17 @@ def _serialize_and_expand_data(
):
data = (
pa.default_serialization_context()
.serialize(cdf.raw_df)
.serialize(result_set.pa_table)
.to_buffer()
.to_pybytes()
)
# expand when loading data from results backend
all_columns, expanded_columns = (selected_columns, [])
else:
data = cdf.data or []
df = result_set.to_pandas_df()
data = df_to_records(df) or []
if expand_data:
all_columns, data, expanded_columns = db_engine_spec.expand_data(
selected_columns, data
@@ -356,7 +360,7 @@ def execute_sql_statements(
query.set_extra_json_key("progress", msg)
session.commit()
try:
cdf = execute_sql_statement(
result_set = execute_sql_statement(
statement, query, user_name, session, cursor, log_params
)
except Exception as e: # pylint: disable=broad-except
@@ -367,7 +371,7 @@ def execute_sql_statements(
return payload
# Success, updating the query entry in database
query.rows = cdf.size
query.rows = result_set.size
query.progress = 100
query.set_extra_json_key("progress", None)
if query.select_as_cta:
@@ -381,9 +385,13 @@ def execute_sql_statements(
query.end_time = now_as_float()
data, selected_columns, all_columns, expanded_columns = _serialize_and_expand_data(
cdf, db_engine_spec, store_results and results_backend_use_msgpack, expand_data
result_set,
db_engine_spec,
store_results and results_backend_use_msgpack,
expand_data,
)
# TODO: data should be saved separately from metadata (likely in Parquet)
payload.update(
{
"status": QueryStatus.SUCCESS,