mirror of
https://github.com/apache/superset.git
synced 2026-04-19 08:04:53 +00:00
Add hive to superset + monkey patch the pyhive (#2134)
* Initial hive implementation * Fix select star query for hive. * Exclude generated code. * Address code coverage and linting. * Exclude generated code from coveralls. * Fix lint errors * Move TCLIService to it's own repo. * Address comments * Implement special postgres case,
This commit is contained in:
@@ -2,6 +2,7 @@ import celery
|
||||
from datetime import datetime
|
||||
import json
|
||||
import logging
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import sqlalchemy
|
||||
import uuid
|
||||
@@ -56,6 +57,7 @@ def get_sql_results(self, query_id, return_results=True, store_results=False):
|
||||
query = session.query(models.Query).filter_by(id=query_id).one()
|
||||
database = query.database
|
||||
db_engine_spec = database.db_engine_spec
|
||||
db_engine_spec.patch()
|
||||
|
||||
def handle_error(msg):
|
||||
"""Local method handling error while processing the SQL"""
|
||||
@@ -91,7 +93,6 @@ def get_sql_results(self, query_id, return_results=True, store_results=False):
|
||||
db_engine_spec.limit_method == LimitMethod.WRAP_SQL):
|
||||
executed_sql = database.wrap_sql_limit(executed_sql, query.limit)
|
||||
query.limit_used = True
|
||||
engine = database.get_sqla_engine(schema=query.schema)
|
||||
try:
|
||||
template_processor = get_template_processor(
|
||||
database=database, query=query)
|
||||
@@ -104,34 +105,42 @@ def get_sql_results(self, query_id, return_results=True, store_results=False):
|
||||
|
||||
query.executed_sql = executed_sql
|
||||
logging.info("Running query: \n{}".format(executed_sql))
|
||||
engine = database.get_sqla_engine(schema=query.schema)
|
||||
conn = engine.raw_connection()
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
result_proxy = engine.execute(query.executed_sql, schema=query.schema)
|
||||
cursor.execute(
|
||||
query.executed_sql, **db_engine_spec.cursor_execute_kwargs)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
conn.close()
|
||||
handle_error(db_engine_spec.extract_error_message(e))
|
||||
|
||||
cursor = result_proxy.cursor
|
||||
query.status = QueryStatus.RUNNING
|
||||
session.flush()
|
||||
db_engine_spec.handle_cursor(cursor, query, session)
|
||||
try:
|
||||
logging.info("Handling cursor")
|
||||
db_engine_spec.handle_cursor(cursor, query, session)
|
||||
logging.info("Fetching data: {}".format(query.to_dict()))
|
||||
data = db_engine_spec.fetch_data(cursor, query.limit)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
conn.close()
|
||||
handle_error(db_engine_spec.extract_error_message(e))
|
||||
|
||||
cdf = None
|
||||
if result_proxy.cursor:
|
||||
column_names = [col[0] for col in result_proxy.cursor.description]
|
||||
column_names = dedup(column_names)
|
||||
if db_engine_spec.limit_method == LimitMethod.FETCH_MANY:
|
||||
data = result_proxy.fetchmany(query.limit)
|
||||
else:
|
||||
data = result_proxy.fetchall()
|
||||
cdf = dataframe.SupersetDataFrame(
|
||||
pd.DataFrame(data, columns=column_names))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
query.rows = result_proxy.rowcount
|
||||
column_names = (
|
||||
[col[0] for col in cursor.description] if cursor.description else [])
|
||||
column_names = dedup(column_names)
|
||||
df_data = np.array(data) if data else []
|
||||
cdf = dataframe.SupersetDataFrame(pd.DataFrame(
|
||||
df_data, columns=column_names))
|
||||
|
||||
query.rows = cdf.size
|
||||
query.progress = 100
|
||||
query.status = QueryStatus.SUCCESS
|
||||
if query.rows == -1 and cdf:
|
||||
# Presto doesn't provide result_proxy.row_count
|
||||
query.rows = cdf.size
|
||||
if query.select_as_cta:
|
||||
query.select_sql = '{}'.format(database.select_star(
|
||||
query.tmp_table_name,
|
||||
@@ -144,11 +153,10 @@ def get_sql_results(self, query_id, return_results=True, store_results=False):
|
||||
payload = {
|
||||
'query_id': query.id,
|
||||
'status': query.status,
|
||||
'data': [],
|
||||
'data': cdf.data if cdf.data else [],
|
||||
'columns': cdf.columns_dict if cdf.columns_dict else {},
|
||||
'query': query.to_dict(),
|
||||
}
|
||||
payload['data'] = cdf.data if cdf else []
|
||||
payload['columns'] = cdf.columns_dict if cdf else []
|
||||
payload['query'] = query.to_dict()
|
||||
payload = json.dumps(payload, default=utils.json_iso_dttm_ser)
|
||||
|
||||
if store_results:
|
||||
|
||||
Reference in New Issue
Block a user