mirror of
https://github.com/apache/superset.git
synced 2026-04-19 08:04:53 +00:00
Stabilizing Celery / SQL Lab (#2981)
* upgrade celery to 4.0.2 * using Redis for unit tests (sqla broker not supported in Celery 4) * Setting Celery's soft_time_limit based on `SQLLAB_ASYNC_TIME_LIMIT_SEC` config * Better error handling in async tasks * Better statsd logging in async tasks * show [pending/running] query status in Results tab * systematically using sqla NullPool on worker (async) to limit number of database connections
This commit is contained in:
committed by
GitHub
parent
de88764e93
commit
06fcaa3095
@@ -1,4 +1,3 @@
|
||||
import celery
|
||||
from time import sleep
|
||||
from datetime import datetime
|
||||
import json
|
||||
@@ -7,6 +6,7 @@ import pandas as pd
|
||||
import sqlalchemy
|
||||
import uuid
|
||||
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from sqlalchemy.pool import NullPool
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
@@ -20,6 +20,12 @@ from superset.utils import QueryStatus, get_celery_app
|
||||
|
||||
config = app.config
|
||||
celery_app = get_celery_app(config)
|
||||
stats_logger = app.config.get('STATS_LOGGER')
|
||||
SQLLAB_TIMEOUT = config.get('SQLLAB_ASYNC_TIME_LIMIT_SEC', 600)
|
||||
|
||||
|
||||
class SqlLabException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def dedup(l, suffix='__'):
|
||||
@@ -43,28 +49,63 @@ def dedup(l, suffix='__'):
|
||||
return new_l
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def get_sql_results(self, query_id, return_results=True, store_results=False):
|
||||
"""Executes the sql query returns the results."""
|
||||
if not self.request.called_directly:
|
||||
def get_query(query_id, session, retry_count=5):
|
||||
"""attemps to get the query and retry if it cannot"""
|
||||
query = None
|
||||
attempt = 0
|
||||
while not query and attempt < retry_count:
|
||||
try:
|
||||
query = session.query(Query).filter_by(id=query_id).one()
|
||||
except Exception:
|
||||
attempt += 1
|
||||
logging.error(
|
||||
"Query with id `{}` could not be retrieved".format(query_id))
|
||||
stats_logger.incr('error_attempting_orm_query_' + str(attempt))
|
||||
logging.error("Sleeping for a sec before retrying...")
|
||||
sleep(1)
|
||||
if not query:
|
||||
stats_logger.incr('error_failed_at_getting_orm_query')
|
||||
raise SqlLabException("Failed at getting query")
|
||||
return query
|
||||
|
||||
|
||||
def get_session(nullpool):
|
||||
if nullpool:
|
||||
engine = sqlalchemy.create_engine(
|
||||
app.config.get('SQLALCHEMY_DATABASE_URI'), poolclass=NullPool)
|
||||
session_class = sessionmaker()
|
||||
session_class.configure(bind=engine)
|
||||
session = session_class()
|
||||
return session_class()
|
||||
else:
|
||||
session = db.session()
|
||||
session.commit() # HACK
|
||||
return session
|
||||
|
||||
|
||||
@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT)
|
||||
def get_sql_results(
|
||||
ctask, query_id, return_results=True, store_results=False):
|
||||
"""Executes the sql query returns the results."""
|
||||
try:
|
||||
query = session.query(Query).filter_by(id=query_id).one()
|
||||
return execute_sql(
|
||||
ctask, query_id, return_results, store_results)
|
||||
except Exception as e:
|
||||
logging.error(
|
||||
"Query with id `{}` could not be retrieved".format(query_id))
|
||||
logging.error("Sleeping for a sec and retrying...")
|
||||
# Nasty hack to get around a race condition where the worker
|
||||
# cannot find the query it's supposed to run
|
||||
sleep(1)
|
||||
query = session.query(Query).filter_by(id=query_id).one()
|
||||
logging.exception(e)
|
||||
stats_logger.incr('error_sqllab_unhandled')
|
||||
sesh = get_session(not ctask.request.called_directly)
|
||||
query = get_query(query_id, sesh)
|
||||
query.error_message = str(e)
|
||||
query.status = QueryStatus.FAILED
|
||||
query.tmp_table_name = None
|
||||
sesh.commit()
|
||||
|
||||
|
||||
def execute_sql(ctask, query_id, return_results=True, store_results=False):
|
||||
"""Executes the sql query returns the results."""
|
||||
session = get_session(not ctask.request.called_directly)
|
||||
|
||||
query = get_query(query_id, session)
|
||||
payload = dict(query_id=query_id)
|
||||
|
||||
database = query.database
|
||||
db_engine_spec = database.db_engine_spec
|
||||
@@ -76,22 +117,27 @@ def get_sql_results(self, query_id, return_results=True, store_results=False):
|
||||
query.status = QueryStatus.FAILED
|
||||
query.tmp_table_name = None
|
||||
session.commit()
|
||||
raise Exception(query.error_message)
|
||||
payload.update({
|
||||
'status': query.status,
|
||||
'error_essage': msg,
|
||||
})
|
||||
return payload
|
||||
|
||||
if store_results and not results_backend:
|
||||
handle_error("Results backend isn't configured.")
|
||||
return handle_error("Results backend isn't configured.")
|
||||
|
||||
# Limit enforced only for retrieving the data, not for the CTA queries.
|
||||
superset_query = SupersetQuery(query.sql)
|
||||
executed_sql = superset_query.stripped()
|
||||
if not superset_query.is_select() and not database.allow_dml:
|
||||
handle_error(
|
||||
return handle_error(
|
||||
"Only `SELECT` statements are allowed against this database")
|
||||
if query.select_as_cta:
|
||||
if not superset_query.is_select():
|
||||
handle_error(
|
||||
return handle_error(
|
||||
"Only `SELECT` statements can be used with the CREATE TABLE "
|
||||
"feature.")
|
||||
return
|
||||
if not query.tmp_table_name:
|
||||
start_dttm = datetime.fromtimestamp(query.start_time)
|
||||
query.tmp_table_name = 'tmp_{}_table_{}'.format(
|
||||
@@ -112,7 +158,7 @@ def get_sql_results(self, query_id, return_results=True, store_results=False):
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
msg = "Template rendering failed: " + utils.error_msg_from_exception(e)
|
||||
handle_error(msg)
|
||||
return handle_error(msg)
|
||||
|
||||
query.executed_sql = executed_sql
|
||||
query.status = QueryStatus.RUNNING
|
||||
@@ -121,28 +167,31 @@ def get_sql_results(self, query_id, return_results=True, store_results=False):
|
||||
session.commit()
|
||||
logging.info("Set query to 'running'")
|
||||
|
||||
engine = database.get_sqla_engine(
|
||||
schema=query.schema, nullpool=not ctask.request.called_directly)
|
||||
try:
|
||||
engine = database.get_sqla_engine(schema=query.schema)
|
||||
engine = database.get_sqla_engine(
|
||||
schema=query.schema, nullpool=not ctask.request.called_directly)
|
||||
conn = engine.raw_connection()
|
||||
cursor = conn.cursor()
|
||||
logging.info("Running query: \n{}".format(executed_sql))
|
||||
logging.info(query.executed_sql)
|
||||
cursor.execute(
|
||||
query.executed_sql, **db_engine_spec.cursor_execute_kwargs)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
conn.close()
|
||||
handle_error(db_engine_spec.extract_error_message(e))
|
||||
|
||||
try:
|
||||
logging.info("Handling cursor")
|
||||
db_engine_spec.handle_cursor(cursor, query, session)
|
||||
logging.info("Fetching data: {}".format(query.to_dict()))
|
||||
data = db_engine_spec.fetch_data(cursor, query.limit)
|
||||
except SoftTimeLimitExceeded as e:
|
||||
logging.exception(e)
|
||||
conn.close()
|
||||
return handle_error(
|
||||
"SQL Lab timeout. This environment's policy is to kill queries "
|
||||
"after {} seconds.".format(SQLLAB_TIMEOUT))
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
conn.close()
|
||||
handle_error(db_engine_spec.extract_error_message(e))
|
||||
return handle_error(db_engine_spec.extract_error_message(e))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
@@ -175,19 +224,17 @@ def get_sql_results(self, query_id, return_results=True, store_results=False):
|
||||
session.merge(query)
|
||||
session.flush()
|
||||
|
||||
payload = {
|
||||
'query_id': query.id,
|
||||
payload.update({
|
||||
'status': query.status,
|
||||
'data': cdf.data if cdf.data else [],
|
||||
'columns': cdf.columns if cdf.columns else [],
|
||||
'query': query.to_dict(),
|
||||
}
|
||||
payload = json.dumps(payload, default=utils.json_iso_dttm_ser)
|
||||
|
||||
})
|
||||
if store_results:
|
||||
key = '{}'.format(uuid.uuid4())
|
||||
logging.info("Storing results in results backend, key: {}".format(key))
|
||||
results_backend.set(key, utils.zlib_compress(payload))
|
||||
json_payload = json.dumps(payload, default=utils.json_iso_dttm_ser)
|
||||
results_backend.set(key, utils.zlib_compress(json_payload))
|
||||
query.results_key = key
|
||||
query.end_result_backend_time = utils.now_as_float()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user