feat: cancel impala query on stop (#30412)

This commit is contained in:
wugeer
2024-10-30 07:44:22 +08:00
committed by GitHub
parent 576ad85eb4
commit 60cd2550a7
3 changed files with 135 additions and 6 deletions

View File

@@ -58,8 +58,8 @@ from sqlalchemy.sql.expression import ColumnClause, Select, TextAsFrom, TextClau
from sqlalchemy.types import TypeEngine
from sqlparse.tokens import CTE
from superset import sql_parse
from superset.constants import TimeGrain as TimeGrainConstants
from superset import db, sql_parse
from superset.constants import QUERY_CANCEL_KEY, TimeGrain as TimeGrainConstants
from superset.databases.utils import get_table_metadata, make_url_safe
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import DisallowedSQLFunction, OAuth2Error, OAuth2RedirectError
@@ -437,6 +437,14 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
# Driver-specific exception that should be mapped to OAuth2RedirectError
oauth2_exception = OAuth2RedirectError
# Does the query id related to the connection?
# The default value is True, which means that the query id is determined when
# the connection is created.
# When this is changed to false in a DB engine spec it means the query id
# is determined only after the specific query is executed and it will update
# the `cancel_query` value in the `extra` field of the `query` object
has_query_id_before_execute = True
@classmethod
def is_oauth2_enabled(cls) -> bool:
return (
@@ -1316,6 +1324,7 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
# TODO: Fix circular import error caused by importing sql_lab.Query
@classmethod
# pylint: disable=consider-using-transaction
def execute_with_cursor(
cls,
cursor: Any,
@@ -1333,6 +1342,13 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods
"""
logger.debug("Query %d: Running query: %s", query.id, sql)
cls.execute(cursor, sql, query.database, async_=True)
if not cls.has_query_id_before_execute:
cancel_query_id = query.database.db_engine_spec.get_cancel_query_id(
cursor, query
)
if cancel_query_id is not None:
query.set_extra_json_key(QUERY_CANCEL_KEY, cancel_query_id)
db.session.commit()
logger.debug("Query %d: Handling cursor", query.id)
cls.handle_cursor(cursor, query)