mirror of
https://github.com/apache/superset.git
synced 2026-04-19 16:14:52 +00:00
Import CSV (#3643)
* add upload csv button to sources dropdown * upload csv to non-hive datasources * upload csv to hive datasource * update FAQ page * add tests * fix linting errors and merge conflicts * Update .travis.yml * Update tox.ini
This commit is contained in:
committed by
Maxime Beauchemin
parent
c5ddf57124
commit
268edcfedd
@@ -45,6 +45,13 @@ visualizations.
|
||||
https://github.com/airbnb/superset/issues?q=label%3Aexample+is%3Aclosed
|
||||
|
||||
|
||||
Can I upload and visualize csv data?
|
||||
-------------------------------------
|
||||
|
||||
Yes, using the ``Upload a CSV`` button under the ``Sources``
|
||||
menu item. This brings up a form that allows you specify required information. After creating the table from CSV, it can then be loadede like any other on the ``Sources -> Tables``page.
|
||||
|
||||
|
||||
Why are my queries timing out?
|
||||
------------------------------
|
||||
|
||||
|
||||
@@ -58,9 +58,9 @@ SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||
SECRET_KEY = '\2\1thisismyscretkey\1\2\e\y\y\h' # noqa
|
||||
|
||||
# The SQLAlchemy connection string.
|
||||
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db')
|
||||
# SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db')
|
||||
# SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp'
|
||||
# SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
|
||||
SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
|
||||
|
||||
# In order to hook up a custom password store for all SQLACHEMY connections
|
||||
# implement a function that takes a single argument of type 'sqla.engine.url',
|
||||
@@ -188,6 +188,10 @@ TABLE_NAMES_CACHE_CONFIG = {'CACHE_TYPE': 'null'}
|
||||
ENABLE_CORS = False
|
||||
CORS_OPTIONS = {}
|
||||
|
||||
# Allowed format types for upload on Database view
|
||||
# TODO: Add processing of other spreadsheet formats (xls, xlsx etc)
|
||||
ALLOWED_EXTENSIONS = set(['csv'])
|
||||
|
||||
# CSV Options: key/value pairs that will be passed as argument to DataFrame.to_csv method
|
||||
# note: index option should not be overridden
|
||||
CSV_EXPORT = {
|
||||
@@ -298,6 +302,14 @@ SQLLAB_ASYNC_TIME_LIMIT_SEC = 60 * 60 * 6
|
||||
# in SQL Lab by using the "Run Async" button/feature
|
||||
RESULTS_BACKEND = None
|
||||
|
||||
# The S3 bucket where you want to store your external hive tables created
|
||||
# from CSV files. For example, 'companyname-superset'
|
||||
CSV_TO_HIVE_UPLOAD_S3_BUCKET = None
|
||||
|
||||
# The directory within the bucket specified above that will
|
||||
# contain all the external tables
|
||||
CSV_TO_HIVE_UPLOAD_DIRECTORY = 'EXTERNAL_HIVE_TABLES/'
|
||||
|
||||
# A dictionary of items that gets merged into the Jinja context for
|
||||
# SQL Lab. The existing context gets updated with this dictionary,
|
||||
# meaning values for existing keys get overwritten by the content of this
|
||||
|
||||
@@ -17,21 +17,30 @@ from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from collections import defaultdict, namedtuple
|
||||
import csv
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import textwrap
|
||||
import time
|
||||
|
||||
import boto3
|
||||
from flask import g
|
||||
from flask_babel import lazy_gettext as _
|
||||
import pandas
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.engine import create_engine
|
||||
from sqlalchemy.engine.url import make_url
|
||||
from sqlalchemy.sql import text
|
||||
import sqlparse
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from superset import cache_util, conf, utils
|
||||
from superset import app, cache_util, conf, db, utils
|
||||
from superset.utils import QueryStatus, SupersetTemplateException
|
||||
|
||||
config = app.config
|
||||
|
||||
tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER')
|
||||
|
||||
Grain = namedtuple('Grain', 'name label function')
|
||||
@@ -73,6 +82,65 @@ class BaseEngineSpec(object):
|
||||
"""Returns engine-specific table metadata"""
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def csv_to_df(**kwargs):
|
||||
kwargs['filepath_or_buffer'] = \
|
||||
app.config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
|
||||
kwargs['encoding'] = 'utf-8'
|
||||
kwargs['iterator'] = True
|
||||
chunks = pandas.read_csv(**kwargs)
|
||||
df = pandas.DataFrame()
|
||||
df = pandas.concat(chunk for chunk in chunks)
|
||||
return df
|
||||
|
||||
@staticmethod
|
||||
def df_to_db(df, table, **kwargs):
|
||||
df.to_sql(**kwargs)
|
||||
table.user_id = g.user.id
|
||||
table.schema = kwargs['schema']
|
||||
table.fetch_metadata()
|
||||
db.session.add(table)
|
||||
db.session.commit()
|
||||
|
||||
@staticmethod
|
||||
def create_table_from_csv(form, table):
|
||||
def _allowed_file(filename):
|
||||
# Only allow specific file extensions as specified in the config
|
||||
extension = os.path.splitext(filename)[1]
|
||||
return extension and extension[1:] in app.config['ALLOWED_EXTENSIONS']
|
||||
|
||||
filename = secure_filename(form.csv_file.data.filename)
|
||||
if not _allowed_file(filename):
|
||||
raise Exception('Invalid file type selected')
|
||||
kwargs = {
|
||||
'filepath_or_buffer': filename,
|
||||
'sep': form.sep.data,
|
||||
'header': form.header.data if form.header.data else 0,
|
||||
'index_col': form.index_col.data,
|
||||
'mangle_dupe_cols': form.mangle_dupe_cols.data,
|
||||
'skipinitialspace': form.skipinitialspace.data,
|
||||
'skiprows': form.skiprows.data,
|
||||
'nrows': form.nrows.data,
|
||||
'skip_blank_lines': form.skip_blank_lines.data,
|
||||
'parse_dates': form.parse_dates.data,
|
||||
'infer_datetime_format': form.infer_datetime_format.data,
|
||||
'chunksize': 10000,
|
||||
}
|
||||
df = BaseEngineSpec.csv_to_df(**kwargs)
|
||||
|
||||
df_to_db_kwargs = {
|
||||
'table': table,
|
||||
'df': df,
|
||||
'name': form.name.data,
|
||||
'con': create_engine(form.con.data, echo=False),
|
||||
'schema': form.schema.data,
|
||||
'if_exists': form.if_exists.data,
|
||||
'index': form.index.data,
|
||||
'index_label': form.index_label.data,
|
||||
'chunksize': 10000,
|
||||
}
|
||||
BaseEngineSpec.df_to_db(**df_to_db_kwargs)
|
||||
|
||||
@classmethod
|
||||
def escape_sql(cls, sql):
|
||||
"""Escapes the raw SQL"""
|
||||
@@ -721,6 +789,45 @@ class HiveEngineSpec(PrestoEngineSpec):
|
||||
return BaseEngineSpec.fetch_result_sets(
|
||||
db, datasource_type, force=force)
|
||||
|
||||
@staticmethod
|
||||
def create_table_from_csv(form, table):
|
||||
"""Uploads a csv file and creates a superset datasource in Hive."""
|
||||
def get_column_names(filepath):
|
||||
with open(filepath, 'rb') as f:
|
||||
return csv.reader(f).next()
|
||||
|
||||
table_name = form.name.data
|
||||
filename = form.csv_file.data.filename
|
||||
|
||||
bucket_path = app.config['CSV_TO_HIVE_UPLOAD_BUCKET']
|
||||
|
||||
if not bucket_path:
|
||||
logging.info('No upload bucket specified')
|
||||
raise Exception(
|
||||
'No upload bucket specified. You can specify one in the config file.')
|
||||
|
||||
upload_prefix = app.config['CSV_TO_HIVE_UPLOAD_DIRECTORY']
|
||||
dest_path = os.path.join(table_name, filename)
|
||||
|
||||
upload_path = app.config['UPLOAD_FOLDER'] + \
|
||||
secure_filename(form.csv_file.data.filename)
|
||||
column_names = get_column_names(upload_path)
|
||||
schema_definition = ', '.join(
|
||||
[s + ' STRING ' for s in column_names])
|
||||
|
||||
s3 = boto3.client('s3')
|
||||
location = os.path.join('s3a://', bucket_path, upload_prefix, table_name)
|
||||
s3.upload_file(
|
||||
upload_path, 'airbnb-superset',
|
||||
os.path.join(upload_prefix, table_name, filename))
|
||||
sql = """CREATE EXTERNAL TABLE {table_name} ( {schema_definition} )
|
||||
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS
|
||||
TEXTFILE LOCATION '{location}'""".format(**locals())
|
||||
|
||||
logging.info(form.con.data)
|
||||
engine = create_engine(form.con.data)
|
||||
engine.execute(sql)
|
||||
|
||||
@classmethod
|
||||
def convert_dttm(cls, target_type, dttm):
|
||||
tt = target_type.upper()
|
||||
|
||||
123
superset/forms.py
Normal file
123
superset/forms.py
Normal file
@@ -0,0 +1,123 @@
|
||||
"""Contains the logic to create cohesive forms on the explore view"""
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
|
||||
from flask_appbuilder.forms import DynamicForm
|
||||
from flask_babel import lazy_gettext as _
|
||||
from flask_wtf.file import FileAllowed, FileField, FileRequired
|
||||
from wtforms import (
|
||||
BooleanField, IntegerField, SelectField, StringField)
|
||||
from wtforms.validators import DataRequired, NumberRange, Optional
|
||||
|
||||
from superset import app
|
||||
|
||||
config = app.config
|
||||
|
||||
|
||||
class CsvToDatabaseForm(DynamicForm):
|
||||
name = StringField(
|
||||
_('Table Name'),
|
||||
description=_('Name of table to be created from csv data.'),
|
||||
validators=[DataRequired()],
|
||||
widget=BS3TextFieldWidget())
|
||||
csv_file = FileField(
|
||||
_('CSV File'),
|
||||
description=_('Select a CSV file to be uploaded to a database.'),
|
||||
validators=[
|
||||
FileRequired(), FileAllowed(['csv'], _('CSV Files Only!'))])
|
||||
|
||||
con = SelectField(
|
||||
_('Database'),
|
||||
description=_('database in which to add above table.'),
|
||||
validators=[DataRequired()],
|
||||
choices=[])
|
||||
sep = StringField(
|
||||
_('Delimiter'),
|
||||
description=_('Delimiter used by CSV file (for whitespace use \s+).'),
|
||||
validators=[DataRequired()],
|
||||
widget=BS3TextFieldWidget())
|
||||
if_exists = SelectField(
|
||||
_('Table Exists'),
|
||||
description=_(
|
||||
'If table exists do one of the following: '
|
||||
'Fail (do nothing), Replace (drop and recreate table) '
|
||||
'or Append (insert data).'),
|
||||
choices=[
|
||||
('fail', _('Fail')), ('replace', _('Replace')),
|
||||
('append', _('Append'))],
|
||||
validators=[DataRequired()])
|
||||
|
||||
schema = StringField(
|
||||
_('Schema'),
|
||||
description=_('Specify a schema (if database flavour supports this).'),
|
||||
validators=[Optional()],
|
||||
widget=BS3TextFieldWidget(),
|
||||
filters=[lambda x: x or None])
|
||||
header = IntegerField(
|
||||
_('Header Row'),
|
||||
description=_(
|
||||
'Row containing the headers to use as '
|
||||
'column names (0 is first line of data). '
|
||||
'Leave empty if there is no header row.'),
|
||||
validators=[Optional()],
|
||||
widget=BS3TextFieldWidget(),
|
||||
filters=[lambda x: x or None])
|
||||
index_col = IntegerField(
|
||||
_('Index Column'),
|
||||
description=_(
|
||||
'Column to use as the row labels of the '
|
||||
'dataframe. Leave empty if no index column.'),
|
||||
validators=[Optional(), NumberRange(0, 1E+20)],
|
||||
widget=BS3TextFieldWidget(),
|
||||
filters=[lambda x: x or None])
|
||||
mangle_dupe_cols = BooleanField(
|
||||
_('Mangle Duplicate Columns'),
|
||||
description=_('Specify duplicate columns as "X.0, X.1".'))
|
||||
skipinitialspace = BooleanField(
|
||||
_('Skip Initial Space'),
|
||||
description=_('Skip spaces after delimiter.'))
|
||||
skiprows = IntegerField(
|
||||
_('Skip Rows'),
|
||||
description=_('Number of rows to skip at start of file.'),
|
||||
validators=[Optional(), NumberRange(0, 1E+20)],
|
||||
widget=BS3TextFieldWidget(),
|
||||
filters=[lambda x: x or None])
|
||||
nrows = IntegerField(
|
||||
_('Rows to Read'),
|
||||
description=_('Number of rows of file to read.'),
|
||||
validators=[Optional(), NumberRange(0, 1E+20)],
|
||||
widget=BS3TextFieldWidget(),
|
||||
filters=[lambda x: x or None])
|
||||
skip_blank_lines = BooleanField(
|
||||
_('Skip Blank Lines'),
|
||||
description=_(
|
||||
'Skip blank lines rather than interpreting them '
|
||||
'as NaN values.'))
|
||||
parse_dates = BooleanField(
|
||||
_('Parse Dates'),
|
||||
description=_('Parse date values.'))
|
||||
infer_datetime_format = BooleanField(
|
||||
_('Infer Datetime Format'),
|
||||
description=_(
|
||||
'Use Pandas to interpret the datetime format '
|
||||
'automatically.'))
|
||||
decimal = StringField(
|
||||
_('Decimal Character'),
|
||||
description=_('Character to interpret as decimal point.'),
|
||||
validators=[Optional()],
|
||||
widget=BS3TextFieldWidget(),
|
||||
filters=[lambda x: x or '.'])
|
||||
index = BooleanField(
|
||||
_('Dataframe Index'),
|
||||
description=_('Write dataframe index as a column.'))
|
||||
index_label = StringField(
|
||||
_('Column Label(s)'),
|
||||
description=_(
|
||||
'Column label for index column(s). If None is given '
|
||||
'and Dataframe Index is True, Index Names are used.'),
|
||||
validators=[Optional()],
|
||||
widget=BS3TextFieldWidget(),
|
||||
filters=[lambda x: x or None])
|
||||
@@ -7,6 +7,7 @@ from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pickle
|
||||
import re
|
||||
import time
|
||||
@@ -16,7 +17,7 @@ from urllib import parse
|
||||
from flask import (
|
||||
flash, g, Markup, redirect, render_template, request, Response, url_for,
|
||||
)
|
||||
from flask_appbuilder import expose
|
||||
from flask_appbuilder import expose, SimpleFormView
|
||||
from flask_appbuilder.actions import action
|
||||
from flask_appbuilder.models.sqla.interface import SQLAInterface
|
||||
from flask_appbuilder.security.decorators import has_access_api
|
||||
@@ -28,12 +29,15 @@ import sqlalchemy as sqla
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.engine.url import make_url
|
||||
from werkzeug.routing import BaseConverter
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
from superset import (
|
||||
app, appbuilder, cache, db, results_backend, security, sm, sql_lab, utils,
|
||||
viz,
|
||||
)
|
||||
from superset.connectors.connector_registry import ConnectorRegistry
|
||||
from superset.connectors.sqla.models import SqlaTable
|
||||
from superset.forms import CsvToDatabaseForm
|
||||
from superset.legacy import cast_form_data
|
||||
import superset.models.core as models
|
||||
from superset.models.sql_lab import Query
|
||||
@@ -305,6 +309,71 @@ class DatabaseAsync(DatabaseView):
|
||||
appbuilder.add_view_no_menu(DatabaseAsync)
|
||||
|
||||
|
||||
class CsvToDatabaseView(SimpleFormView):
|
||||
form = CsvToDatabaseForm
|
||||
form_title = _('CSV to Database configuration')
|
||||
add_columns = ['database', 'schema', 'table_name']
|
||||
|
||||
def form_get(self, form):
|
||||
form.sep.data = ','
|
||||
form.header.data = 0
|
||||
form.mangle_dupe_cols.data = True
|
||||
form.skipinitialspace.data = False
|
||||
form.skip_blank_lines.data = True
|
||||
form.parse_dates.data = True
|
||||
form.infer_datetime_format.data = True
|
||||
form.decimal.data = '.'
|
||||
form.if_exists.data = 'append'
|
||||
all_datasources = (
|
||||
db.session.query(
|
||||
models.Database.sqlalchemy_uri,
|
||||
models.Database.database_name)
|
||||
.all()
|
||||
)
|
||||
form.con.choices += all_datasources
|
||||
|
||||
def form_post(self, form):
|
||||
def _upload_file(csv_file):
|
||||
if csv_file and csv_file.filename:
|
||||
filename = secure_filename(csv_file.filename)
|
||||
csv_file.save(os.path.join(config['UPLOAD_FOLDER'], filename))
|
||||
return filename
|
||||
|
||||
csv_file = form.csv_file.data
|
||||
_upload_file(csv_file)
|
||||
table = SqlaTable(table_name=form.name.data)
|
||||
database = (
|
||||
db.session.query(models.Database)
|
||||
.filter_by(sqlalchemy_uri=form.data.get('con'))
|
||||
.one()
|
||||
)
|
||||
table.database = database
|
||||
table.database_id = database.id
|
||||
try:
|
||||
database.db_engine_spec.create_table_from_csv(form, table)
|
||||
except Exception as e:
|
||||
os.remove(os.path.join(config['UPLOAD_FOLDER'], csv_file.filename))
|
||||
flash(e, 'error')
|
||||
return redirect('/tablemodelview/list/')
|
||||
|
||||
os.remove(os.path.join(config['UPLOAD_FOLDER'], csv_file.filename))
|
||||
# Go back to welcome page / splash screen
|
||||
db_name = (
|
||||
db.session.query(models.Database.database_name)
|
||||
.filter_by(sqlalchemy_uri=form.data.get('con'))
|
||||
.one()
|
||||
)
|
||||
message = _('CSV file "{0}" uploaded to table "{1}" in '
|
||||
'database "{2}"'.format(form.csv_file.data.filename,
|
||||
form.name.data,
|
||||
db_name[0]))
|
||||
flash(message, 'info')
|
||||
return redirect('/tablemodelview/list/')
|
||||
|
||||
|
||||
appbuilder.add_view_no_menu(CsvToDatabaseView)
|
||||
|
||||
|
||||
class DatabaseTablesAsync(DatabaseView):
|
||||
list_columns = ['id', 'all_table_names', 'all_schema_names']
|
||||
|
||||
@@ -2459,6 +2528,16 @@ appbuilder.add_link(
|
||||
category_label=__('SQL Lab'),
|
||||
)
|
||||
|
||||
appbuilder.add_link(
|
||||
'Upload a CSV',
|
||||
label=__('Upload a CSV'),
|
||||
href='/csvtodatabaseview/form',
|
||||
icon='fa-upload',
|
||||
category='Sources',
|
||||
category_label=__('Sources'),
|
||||
category_icon='fa-wrench',)
|
||||
appbuilder.add_separator('Sources')
|
||||
|
||||
|
||||
@app.after_request
|
||||
def apply_caching(response):
|
||||
|
||||
@@ -10,7 +10,9 @@ import doctest
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import unittest
|
||||
|
||||
from flask import escape
|
||||
@@ -789,6 +791,46 @@ class CoreTests(SupersetTestCase):
|
||||
{'name': ' NULL', 'sum__num': 0},
|
||||
)
|
||||
|
||||
def test_import_csv(self):
|
||||
self.login(username='admin')
|
||||
filename = 'testCSV.csv'
|
||||
table_name = ''.join(
|
||||
random.choice(string.ascii_uppercase) for _ in range(5))
|
||||
|
||||
test_file = open(filename, 'w+')
|
||||
test_file.write('a,b\n')
|
||||
test_file.write('john,1\n')
|
||||
test_file.write('paul,2\n')
|
||||
test_file.close()
|
||||
main_db_uri = db.session.query(
|
||||
models.Database.sqlalchemy_uri)\
|
||||
.filter_by(database_name='main').all()
|
||||
|
||||
test_file = open(filename, 'rb')
|
||||
form_data = {
|
||||
'csv_file': test_file,
|
||||
'sep': ',',
|
||||
'name': table_name,
|
||||
'con': main_db_uri[0][0],
|
||||
'if_exists': 'append',
|
||||
'index_label': 'test_label',
|
||||
'mangle_dupe_cols': False}
|
||||
|
||||
url = '/databaseview/list/'
|
||||
add_datasource_page = self.get_resp(url)
|
||||
assert 'Upload a CSV' in add_datasource_page
|
||||
|
||||
url = '/csvtodatabaseview/form'
|
||||
form_get = self.get_resp(url)
|
||||
assert 'CSV to Database configuration' in form_get
|
||||
|
||||
try:
|
||||
# ensure uploaded successfully
|
||||
form_post = self.get_resp(url, data=form_data)
|
||||
assert 'CSV file \"testCSV.csv\" uploaded to table' in form_post
|
||||
finally:
|
||||
os.remove(filename)
|
||||
|
||||
def test_dataframe_timezone(self):
|
||||
tz = psycopg2.tz.FixedOffsetTimezone(offset=60, name=None)
|
||||
data = [(datetime.datetime(2017, 11, 18, 21, 53, 0, 219225, tzinfo=tz),),
|
||||
|
||||
2
tox.ini
2
tox.ini
@@ -68,7 +68,7 @@ commands =
|
||||
[testenv:py27-mysql]
|
||||
basepython = python2.7
|
||||
setenv =
|
||||
SUPERSET__SQLALCHEMY_DATABASE_URI = mysql://mysqluser:mysqluserpassword@localhost/superset?charset=utf8
|
||||
SUPERSET__SQLALCHEMY_DATABASE_URI = mysql://root@localhost/superset?charset=utf8
|
||||
|
||||
[testenv:py34-mysql]
|
||||
basepython = python3.4
|
||||
|
||||
Reference in New Issue
Block a user