mirror of
https://github.com/apache/superset.git
synced 2026-04-19 08:04:53 +00:00
[sqllab] improve Hive support (#3187)
* [sqllab] improve Hive support * Fix "Transport not open" bug * Getting progress bar to show * Bump pyhive to 0.4.0 * Getting [Track Job] button to show * Fix testzz
This commit is contained in:
committed by
GitHub
parent
25c599d040
commit
b888802e05
2
setup.py
2
setup.py
@@ -61,7 +61,7 @@ setup(
|
||||
'pandas==0.20.2',
|
||||
'parsedatetime==2.0.0',
|
||||
'pydruid==0.3.1',
|
||||
'PyHive>=0.3.0',
|
||||
'PyHive>=0.4.0',
|
||||
'python-dateutil==2.6.0',
|
||||
'requests==2.17.3',
|
||||
'simplejson==3.10.0',
|
||||
|
||||
@@ -155,6 +155,7 @@ export default class ResultSet extends React.PureComponent {
|
||||
}
|
||||
if (['running', 'pending', 'fetching'].indexOf(query.state) > -1) {
|
||||
let progressBar;
|
||||
let trackingUrl;
|
||||
if (query.progress > 0 && query.state === 'running') {
|
||||
progressBar = (
|
||||
<ProgressBar
|
||||
@@ -163,11 +164,24 @@ export default class ResultSet extends React.PureComponent {
|
||||
label={`${query.progress}%`}
|
||||
/>);
|
||||
}
|
||||
if (query.trackingUrl) {
|
||||
trackingUrl = (
|
||||
<Button
|
||||
bsSize="small"
|
||||
onClick={() => { window.open(query.trackingUrl); }}
|
||||
>
|
||||
Track Job
|
||||
</Button>
|
||||
);
|
||||
}
|
||||
return (
|
||||
<div>
|
||||
<img className="loading" alt="Loading..." src="/static/assets/images/loading.gif" />
|
||||
<QueryStateLabel query={query} />
|
||||
{progressBar}
|
||||
<div>
|
||||
{trackingUrl}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
} else if (query.state === 'failed') {
|
||||
|
||||
@@ -241,6 +241,7 @@ class CeleryConfig(object):
|
||||
CELERY_IMPORTS = ('superset.sql_lab', )
|
||||
CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
|
||||
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
|
||||
CELERYD_LOG_LEVEL = 'DEBUG'
|
||||
CELERY_CONFIG = CeleryConfig
|
||||
"""
|
||||
CELERY_CONFIG = None
|
||||
|
||||
@@ -637,6 +637,21 @@ class HiveEngineSpec(PrestoEngineSpec):
|
||||
engine = 'hive'
|
||||
cursor_execute_kwargs = {'async': True}
|
||||
|
||||
# Scoping regex at class level to avoid recompiling
|
||||
# 17/02/07 19:36:38 INFO ql.Driver: Total jobs = 5
|
||||
jobs_stats_r = re.compile(
|
||||
r'.*INFO.*Total jobs = (?P<max_jobs>[0-9]+)')
|
||||
# 17/02/07 19:37:08 INFO ql.Driver: Launching Job 2 out of 5
|
||||
launching_job_r = re.compile(
|
||||
'.*INFO.*Launching Job (?P<job_number>[0-9]+) out of '
|
||||
'(?P<max_jobs>[0-9]+)')
|
||||
# 17/02/07 19:36:58 INFO exec.Task: 2017-02-07 19:36:58,152 Stage-18
|
||||
# map = 0%, reduce = 0%
|
||||
stage_progress_r = re.compile(
|
||||
r'.*INFO.*Stage-(?P<stage_number>[0-9]+).*'
|
||||
r'map = (?P<map_progress>[0-9]+)%.*'
|
||||
r'reduce = (?P<reduce_progress>[0-9]+)%.*')
|
||||
|
||||
@classmethod
|
||||
def patch(cls):
|
||||
from pyhive import hive
|
||||
@@ -666,38 +681,27 @@ class HiveEngineSpec(PrestoEngineSpec):
|
||||
return uri
|
||||
|
||||
@classmethod
|
||||
def progress(cls, logs):
|
||||
# 17/02/07 19:36:38 INFO ql.Driver: Total jobs = 5
|
||||
jobs_stats_r = re.compile(
|
||||
r'.*INFO.*Total jobs = (?P<max_jobs>[0-9]+)')
|
||||
# 17/02/07 19:37:08 INFO ql.Driver: Launching Job 2 out of 5
|
||||
launching_job_r = re.compile(
|
||||
'.*INFO.*Launching Job (?P<job_number>[0-9]+) out of '
|
||||
'(?P<max_jobs>[0-9]+)')
|
||||
# 17/02/07 19:36:58 INFO exec.Task: 2017-02-07 19:36:58,152 Stage-18
|
||||
# map = 0%, reduce = 0%
|
||||
stage_progress = re.compile(
|
||||
r'.*INFO.*Stage-(?P<stage_number>[0-9]+).*'
|
||||
r'map = (?P<map_progress>[0-9]+)%.*'
|
||||
r'reduce = (?P<reduce_progress>[0-9]+)%.*')
|
||||
total_jobs = None
|
||||
def progress(cls, log_lines):
|
||||
total_jobs = 1 # assuming there's at least 1 job
|
||||
current_job = None
|
||||
stages = {}
|
||||
lines = logs.splitlines()
|
||||
for line in lines:
|
||||
match = jobs_stats_r.match(line)
|
||||
for line in log_lines:
|
||||
match = cls.jobs_stats_r.match(line)
|
||||
if match:
|
||||
total_jobs = int(match.groupdict()['max_jobs'])
|
||||
match = launching_job_r.match(line)
|
||||
total_jobs = int(match.groupdict()['max_jobs']) or 1
|
||||
match = cls.launching_job_r.match(line)
|
||||
if match:
|
||||
current_job = int(match.groupdict()['job_number'])
|
||||
stages = {}
|
||||
match = stage_progress.match(line)
|
||||
match = cls.stage_progress_r.match(line)
|
||||
if match:
|
||||
stage_number = int(match.groupdict()['stage_number'])
|
||||
map_progress = int(match.groupdict()['map_progress'])
|
||||
reduce_progress = int(match.groupdict()['reduce_progress'])
|
||||
stages[stage_number] = (map_progress + reduce_progress) / 2
|
||||
logging.info(
|
||||
"Progress detail: {}, "
|
||||
"total jobs: {}".format(stages, total_jobs))
|
||||
|
||||
if not total_jobs or not current_job:
|
||||
return 0
|
||||
@@ -709,6 +713,13 @@ class HiveEngineSpec(PrestoEngineSpec):
|
||||
)
|
||||
return int(progress)
|
||||
|
||||
@classmethod
|
||||
def get_tracking_url(cls, log_lines):
|
||||
lkp = "Tracking URL = "
|
||||
for line in log_lines:
|
||||
if lkp in line:
|
||||
return line.split(lkp)[1]
|
||||
|
||||
@classmethod
|
||||
def handle_cursor(cls, cursor, query, session):
|
||||
"""Updates progress information"""
|
||||
@@ -718,18 +729,35 @@ class HiveEngineSpec(PrestoEngineSpec):
|
||||
hive.ttypes.TOperationState.RUNNING_STATE,
|
||||
)
|
||||
polled = cursor.poll()
|
||||
last_log_line = 0
|
||||
tracking_url = None
|
||||
while polled.operationState in unfinished_states:
|
||||
query = session.query(type(query)).filter_by(id=query.id).one()
|
||||
if query.status == QueryStatus.STOPPED:
|
||||
cursor.cancel()
|
||||
break
|
||||
|
||||
logs = cursor.fetch_logs()
|
||||
if logs:
|
||||
progress = cls.progress(logs)
|
||||
resp = cursor.fetch_logs()
|
||||
if resp and resp.log:
|
||||
log = resp.log or ''
|
||||
log_lines = resp.log.splitlines()
|
||||
logging.info("\n".join(log_lines[last_log_line:]))
|
||||
last_log_line = len(log_lines) - 1
|
||||
progress = cls.progress(log_lines)
|
||||
logging.info("Progress total: {}".format(progress))
|
||||
needs_commit = False
|
||||
if progress > query.progress:
|
||||
query.progress = progress
|
||||
session.commit()
|
||||
needs_commit = True
|
||||
if not tracking_url:
|
||||
tracking_url = cls.get_tracking_url(log_lines)
|
||||
if tracking_url:
|
||||
logging.info(
|
||||
"Found the tracking url: {}".format(tracking_url))
|
||||
query.tracking_url = tracking_url
|
||||
needs_commit = True
|
||||
if needs_commit:
|
||||
session.commit()
|
||||
time.sleep(5)
|
||||
polled = cursor.poll()
|
||||
|
||||
|
||||
23
superset/migrations/versions/ca69c70ec99b_tracking_url.py
Normal file
23
superset/migrations/versions/ca69c70ec99b_tracking_url.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""tracking_url
|
||||
|
||||
Revision ID: ca69c70ec99b
|
||||
Revises: a65458420354
|
||||
Create Date: 2017-07-26 20:09:52.606416
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'ca69c70ec99b'
|
||||
down_revision = 'a65458420354'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import mysql
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column('query', sa.Column('tracking_url', sa.Text(), nullable=True))
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_column('query', 'tracking_url')
|
||||
@@ -69,6 +69,7 @@ class Query(Model):
|
||||
start_running_time = Column(Numeric(precision=20, scale=6))
|
||||
end_time = Column(Numeric(precision=20, scale=6))
|
||||
end_result_backend_time = Column(Numeric(precision=20, scale=6))
|
||||
tracking_url = Column(Text)
|
||||
|
||||
changed_on = Column(
|
||||
DateTime,
|
||||
@@ -119,6 +120,7 @@ class Query(Model):
|
||||
'user': self.user.username,
|
||||
'limit_reached': self.limit_reached,
|
||||
'resultsKey': self.results_key,
|
||||
'trackingUrl': self.tracking_url,
|
||||
}
|
||||
|
||||
@property
|
||||
|
||||
@@ -192,6 +192,9 @@ def execute_sql(ctask, query_id, return_results=True, store_results=False):
|
||||
conn.close()
|
||||
return handle_error(db_engine_spec.extract_error_message(e))
|
||||
|
||||
logging.info("Fetching cursor description")
|
||||
cursor_description = cursor.description
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
@@ -203,7 +206,7 @@ def execute_sql(ctask, query_id, return_results=True, store_results=False):
|
||||
}, default=utils.json_iso_dttm_ser)
|
||||
|
||||
column_names = (
|
||||
[col[0] for col in cursor.description] if cursor.description else [])
|
||||
[col[0] for col in cursor_description] if cursor_description else [])
|
||||
column_names = dedup(column_names)
|
||||
cdf = dataframe.SupersetDataFrame(pd.DataFrame(
|
||||
list(data), columns=column_names))
|
||||
|
||||
@@ -427,7 +427,7 @@ appbuilder.add_view_no_menu(SliceAddView)
|
||||
|
||||
class DashboardModelView(SupersetModelView, DeleteMixin): # noqa
|
||||
datamodel = SQLAInterface(models.Dashboard)
|
||||
|
||||
|
||||
list_title = _('List Dashboards')
|
||||
show_title = _('Show Dashboard')
|
||||
add_title = _('Add Dashboard')
|
||||
@@ -2030,6 +2030,7 @@ class Superset(BaseSupersetView):
|
||||
|
||||
# Async request.
|
||||
if async:
|
||||
logging.info("Running query on a Celery worker")
|
||||
# Ignore the celery future object and the request may time out.
|
||||
try:
|
||||
sql_lab.get_sql_results.delay(
|
||||
|
||||
@@ -5,7 +5,7 @@ from __future__ import unicode_literals
|
||||
|
||||
import unittest
|
||||
|
||||
from superset import db_engine_specs
|
||||
from superset.db_engine_specs import HiveEngineSpec
|
||||
|
||||
|
||||
class DbEngineSpecsTestCase(unittest.TestCase):
|
||||
@@ -13,36 +13,38 @@ class DbEngineSpecsTestCase(unittest.TestCase):
|
||||
log = """
|
||||
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
|
||||
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
|
||||
"""
|
||||
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
|
||||
""".split('\n')
|
||||
self.assertEquals(
|
||||
0, HiveEngineSpec.progress(log))
|
||||
|
||||
def test_0_progress(self):
|
||||
log = """
|
||||
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
|
||||
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
|
||||
"""
|
||||
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
|
||||
""".split('\n')
|
||||
self.assertEquals(
|
||||
0, HiveEngineSpec.progress(log))
|
||||
|
||||
def test_number_of_jobs_progress(self):
|
||||
log = """
|
||||
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
|
||||
"""
|
||||
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
|
||||
""".split('\n')
|
||||
self.assertEquals(0, HiveEngineSpec.progress(log))
|
||||
|
||||
def test_job_1_launched_progress(self):
|
||||
log = """
|
||||
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
|
||||
17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2
|
||||
"""
|
||||
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
|
||||
""".split('\n')
|
||||
self.assertEquals(0, HiveEngineSpec.progress(log))
|
||||
|
||||
def test_job_1_launched_stage_1_0_progress(self):
|
||||
log = """
|
||||
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
|
||||
17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
|
||||
"""
|
||||
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
|
||||
""".split('\n')
|
||||
self.assertEquals(0, HiveEngineSpec.progress(log))
|
||||
|
||||
def test_job_1_launched_stage_1_map_40_progress(self):
|
||||
log = """
|
||||
@@ -50,8 +52,8 @@ class DbEngineSpecsTestCase(unittest.TestCase):
|
||||
17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0%
|
||||
"""
|
||||
self.assertEquals(10, db_engine_specs.HiveEngineSpec.progress(log))
|
||||
""".split('\n')
|
||||
self.assertEquals(10, HiveEngineSpec.progress(log))
|
||||
|
||||
def test_job_1_launched_stage_1_map_80_reduce_40_progress(self):
|
||||
log = """
|
||||
@@ -60,8 +62,8 @@ class DbEngineSpecsTestCase(unittest.TestCase):
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0%
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 80%, reduce = 40%
|
||||
"""
|
||||
self.assertEquals(30, db_engine_specs.HiveEngineSpec.progress(log))
|
||||
""".split('\n')
|
||||
self.assertEquals(30, HiveEngineSpec.progress(log))
|
||||
|
||||
def test_job_1_launched_stage_2_stages_progress(self):
|
||||
log = """
|
||||
@@ -72,8 +74,8 @@ class DbEngineSpecsTestCase(unittest.TestCase):
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 80%, reduce = 40%
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-2 map = 0%, reduce = 0%
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 100%, reduce = 0%
|
||||
"""
|
||||
self.assertEquals(12, db_engine_specs.HiveEngineSpec.progress(log))
|
||||
""".split('\n')
|
||||
self.assertEquals(12, HiveEngineSpec.progress(log))
|
||||
|
||||
def test_job_2_launched_stage_2_stages_progress(self):
|
||||
log = """
|
||||
@@ -83,5 +85,5 @@ class DbEngineSpecsTestCase(unittest.TestCase):
|
||||
17/02/07 19:15:55 INFO ql.Driver: Launching Job 2 out of 2
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
|
||||
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0%
|
||||
"""
|
||||
self.assertEquals(60, db_engine_specs.HiveEngineSpec.progress(log))
|
||||
""".split('\n')
|
||||
self.assertEquals(60, HiveEngineSpec.progress(log))
|
||||
|
||||
@@ -189,12 +189,9 @@ class SqlLabTests(SupersetTestCase):
|
||||
from_time = 'from={}'.format(int(first_query_time))
|
||||
to_time = 'to={}'.format(int(second_query_time))
|
||||
params = [from_time, to_time]
|
||||
resp = self.get_resp('/superset/search_queries?'+'&'.join(params))
|
||||
resp = self.get_resp('/superset/search_queries?' + '&'.join(params))
|
||||
data = json.loads(resp)
|
||||
self.assertEquals(2, len(data))
|
||||
for k in data:
|
||||
self.assertLess(int(first_query_time), k['startDttm'])
|
||||
self.assertLess(k['startDttm'], int(second_query_time))
|
||||
|
||||
def test_alias_duplicate(self):
|
||||
self.run_sql(
|
||||
|
||||
Reference in New Issue
Block a user