mirror of
https://github.com/apache/superset.git
synced 2026-04-19 08:04:53 +00:00
feat: cancel db query on stop (#15403)
* feat: cancel db query on stop * fix pylint * Add unit tests * Do not bind multiple times * Stop only running queries * Postgres to cancel only the required query * Remove extra log * Add docstring * Better types, docstring and naming * Use python3 format strings * Update superset/sql_lab.py Co-authored-by: Beto Dealmeida <roberto@dealmeida.net> * Add cancel_query_on_windows_unload option to database * Return cancel_query as bool Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>
This commit is contained in:
committed by
GitHub
parent
a914e3c1cb
commit
02032ee8a4
@@ -73,6 +73,7 @@ SQLLAB_CTAS_NO_LIMIT = config["SQLLAB_CTAS_NO_LIMIT"]
|
||||
SQL_QUERY_MUTATOR = config.get("SQL_QUERY_MUTATOR") or dummy_sql_query_mutator
|
||||
log_query = config["QUERY_LOGGER"]
|
||||
logger = logging.getLogger(__name__)
|
||||
cancel_query_key = "cancel_query"
|
||||
|
||||
|
||||
class SqlLabException(Exception):
|
||||
@@ -83,6 +84,10 @@ class SqlLabSecurityException(SqlLabException):
|
||||
pass
|
||||
|
||||
|
||||
class SqlLabQueryStoppedException(SqlLabException):
|
||||
pass
|
||||
|
||||
|
||||
def handle_query_error(
|
||||
ex: Exception,
|
||||
query: Query,
|
||||
@@ -187,7 +192,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
|
||||
return handle_query_error(ex, query, session)
|
||||
|
||||
|
||||
# pylint: disable=too-many-arguments, too-many-locals
|
||||
# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
|
||||
def execute_sql_statement(
|
||||
sql_statement: str,
|
||||
query: Query,
|
||||
@@ -288,6 +293,12 @@ def execute_sql_statement(
|
||||
)
|
||||
)
|
||||
except Exception as ex:
|
||||
# query is stopped in another thread/worker
|
||||
# stopping raises expected exceptions which we should skip
|
||||
session.refresh(query)
|
||||
if query.status == QueryStatus.STOPPED:
|
||||
raise SqlLabQueryStoppedException()
|
||||
|
||||
logger.error("Query %d: %s", query.id, type(ex), exc_info=True)
|
||||
logger.debug("Query %d: %s", query.id, ex)
|
||||
raise SqlLabException(db_engine_spec.extract_error_message(ex))
|
||||
@@ -438,12 +449,17 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
|
||||
with closing(engine.raw_connection()) as conn:
|
||||
# closing the connection closes the cursor as well
|
||||
cursor = conn.cursor()
|
||||
cancel_query_id = db_engine_spec.get_cancel_query_id(cursor, query)
|
||||
if cancel_query_id is not None:
|
||||
query.set_extra_json_key(cancel_query_key, cancel_query_id)
|
||||
session.commit()
|
||||
statement_count = len(statements)
|
||||
for i, statement in enumerate(statements):
|
||||
# Check if stopped
|
||||
query = get_query(query_id, session)
|
||||
session.refresh(query)
|
||||
if query.status == QueryStatus.STOPPED:
|
||||
return None
|
||||
payload.update({"status": query.status})
|
||||
return payload
|
||||
|
||||
# For CTAS we create the table only on the last statement
|
||||
apply_ctas = query.select_as_cta and (
|
||||
@@ -466,6 +482,9 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
|
||||
log_params,
|
||||
apply_ctas,
|
||||
)
|
||||
except SqlLabQueryStoppedException:
|
||||
payload.update({"status": QueryStatus.STOPPED})
|
||||
return payload
|
||||
except Exception as ex: # pylint: disable=broad-except
|
||||
msg = str(ex)
|
||||
prefix_message = (
|
||||
@@ -562,3 +581,29 @@ def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-loca
|
||||
return payload
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def cancel_query(query: Query, user_name: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Cancel a running query.
|
||||
|
||||
:param query: Query to cancel
|
||||
:param user_name: Default username
|
||||
:return: True if query cancelled successfully, False otherwise
|
||||
"""
|
||||
cancel_query_id = query.extra.get(cancel_query_key, None)
|
||||
if cancel_query_id is None:
|
||||
return False
|
||||
|
||||
database = query.database
|
||||
engine = database.get_sqla_engine(
|
||||
schema=query.schema,
|
||||
nullpool=True,
|
||||
user_name=user_name,
|
||||
source=QuerySource.SQL_LAB,
|
||||
)
|
||||
db_engine_spec = database.db_engine_spec
|
||||
|
||||
with closing(engine.raw_connection()) as conn:
|
||||
with closing(conn.cursor()) as cursor:
|
||||
return db_engine_spec.cancel_query(cursor, query, cancel_query_id)
|
||||
|
||||
Reference in New Issue
Block a user