mirror of
https://github.com/apache/superset.git
synced 2026-04-18 15:44:57 +00:00
add more precise types to hive table from csv (#5267)
This commit is contained in:
@@ -34,6 +34,7 @@ six==1.11.0
|
||||
sqlalchemy==1.2.2
|
||||
sqlalchemy-utils==0.32.21
|
||||
sqlparse==0.2.4
|
||||
tableschema==1.1.0
|
||||
thrift==0.11.0
|
||||
thrift-sasl==0.3.0
|
||||
unicodecsv==0.14.1
|
||||
|
||||
1
setup.py
1
setup.py
@@ -90,6 +90,7 @@ setup(
|
||||
'sqlalchemy',
|
||||
'sqlalchemy-utils',
|
||||
'sqlparse',
|
||||
'tableschema',
|
||||
'thrift>=0.9.3',
|
||||
'thrift-sasl>=0.2.1',
|
||||
'unicodecsv',
|
||||
|
||||
@@ -37,7 +37,7 @@ from sqlalchemy.engine.url import make_url
|
||||
from sqlalchemy.sql import text
|
||||
from sqlalchemy.sql.expression import TextAsFrom
|
||||
import sqlparse
|
||||
import unicodecsv
|
||||
from tableschema import Table
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from superset import app, cache_util, conf, db, utils
|
||||
@@ -134,7 +134,7 @@ class BaseEngineSpec(object):
|
||||
@staticmethod
|
||||
def csv_to_df(**kwargs):
|
||||
kwargs['filepath_or_buffer'] = \
|
||||
app.config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
|
||||
config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
|
||||
kwargs['encoding'] = 'utf-8'
|
||||
kwargs['iterator'] = True
|
||||
chunks = pandas.read_csv(**kwargs)
|
||||
@@ -156,7 +156,7 @@ class BaseEngineSpec(object):
|
||||
def _allowed_file(filename):
|
||||
# Only allow specific file extensions as specified in the config
|
||||
extension = os.path.splitext(filename)[1]
|
||||
return extension and extension[1:] in app.config['ALLOWED_EXTENSIONS']
|
||||
return extension and extension[1:] in config['ALLOWED_EXTENSIONS']
|
||||
|
||||
filename = secure_filename(form.csv_file.data.filename)
|
||||
if not _allowed_file(filename):
|
||||
@@ -973,9 +973,15 @@ class HiveEngineSpec(PrestoEngineSpec):
|
||||
@staticmethod
|
||||
def create_table_from_csv(form, table):
|
||||
"""Uploads a csv file and creates a superset datasource in Hive."""
|
||||
def get_column_names(filepath):
|
||||
with open(filepath, 'rb') as f:
|
||||
return next(unicodecsv.reader(f, encoding='utf-8-sig'))
|
||||
def convert_to_hive_type(col_type):
|
||||
"""maps tableschema's types to hive types"""
|
||||
tableschema_to_hive_types = {
|
||||
'boolean': 'BOOLEAN',
|
||||
'integer': 'INT',
|
||||
'number': 'DOUBLE',
|
||||
'string': 'STRING',
|
||||
}
|
||||
return tableschema_to_hive_types.get(col_type, 'STRING')
|
||||
|
||||
table_name = form.name.data
|
||||
if config.get('UPLOADED_CSV_HIVE_NAMESPACE'):
|
||||
@@ -988,21 +994,27 @@ class HiveEngineSpec(PrestoEngineSpec):
|
||||
config.get('UPLOADED_CSV_HIVE_NAMESPACE'), table_name)
|
||||
filename = form.csv_file.data.filename
|
||||
|
||||
bucket_path = app.config['CSV_TO_HIVE_UPLOAD_S3_BUCKET']
|
||||
bucket_path = config['CSV_TO_HIVE_UPLOAD_S3_BUCKET']
|
||||
|
||||
if not bucket_path:
|
||||
logging.info('No upload bucket specified')
|
||||
raise Exception(
|
||||
'No upload bucket specified. You can specify one in the config file.')
|
||||
|
||||
upload_prefix = app.config['CSV_TO_HIVE_UPLOAD_DIRECTORY']
|
||||
dest_path = os.path.join(table_name, filename)
|
||||
table_name = form.name.data
|
||||
filename = form.csv_file.data.filename
|
||||
upload_prefix = config['CSV_TO_HIVE_UPLOAD_DIRECTORY']
|
||||
|
||||
upload_path = app.config['UPLOAD_FOLDER'] + \
|
||||
upload_path = config['UPLOAD_FOLDER'] + \
|
||||
secure_filename(form.csv_file.data.filename)
|
||||
column_names = get_column_names(upload_path)
|
||||
schema_definition = ', '.join(
|
||||
[s + ' STRING ' for s in column_names])
|
||||
|
||||
hive_table_schema = Table(upload_path).infer()
|
||||
column_name_and_type = []
|
||||
for column_info in hive_table_schema['fields']:
|
||||
column_name_and_type.append(
|
||||
'{} {}'.format(
|
||||
column_info['name'], convert_to_hive_type(column_info['type'])))
|
||||
schema_definition = ', '.join(column_name_and_type)
|
||||
|
||||
s3 = boto3.client('s3')
|
||||
location = os.path.join('s3a://', bucket_path, upload_prefix, table_name)
|
||||
|
||||
Reference in New Issue
Block a user