mirror of
https://github.com/apache/superset.git
synced 2026-04-20 00:24:38 +00:00
[sqllab] add retries for stop_query (#8139)
* [sqllab] add retries for stop_query * use backoff for retries * address PR comments * import statement order
This commit is contained in:
@@ -10,6 +10,7 @@ apispec[yaml]==1.3.3 # via flask-appbuilder
|
||||
asn1crypto==0.24.0 # via cryptography
|
||||
attrs==19.1.0 # via jsonschema
|
||||
babel==2.7.0 # via flask-babel
|
||||
backoff==1.8.0
|
||||
billiard==3.6.0.0 # via celery
|
||||
bleach==3.1.0
|
||||
celery==4.3.0
|
||||
|
||||
1
setup.py
1
setup.py
@@ -65,6 +65,7 @@ setup(
|
||||
zip_safe=False,
|
||||
scripts=["superset/bin/superset"],
|
||||
install_requires=[
|
||||
"backoff>=1.8.0",
|
||||
"bleach>=3.0.2, <4.0.0",
|
||||
"celery>=4.3.0, <5.0.0",
|
||||
"click>=6.0, <7.0.0", # `click`>=7 forces "-" instead of "_"
|
||||
|
||||
@@ -19,10 +19,10 @@ from contextlib import closing
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from sys import getsizeof
|
||||
from time import sleep
|
||||
from typing import Optional, Tuple, Union
|
||||
import uuid
|
||||
|
||||
import backoff
|
||||
from celery.exceptions import SoftTimeLimitExceeded
|
||||
from contextlib2 import contextmanager
|
||||
from flask_babel import lazy_gettext as _
|
||||
@@ -82,23 +82,31 @@ def handle_query_error(msg, query, session, payload=None):
|
||||
return payload
|
||||
|
||||
|
||||
def get_query(query_id, session, retry_count=5):
|
||||
"""attemps to get the query and retry if it cannot"""
|
||||
query = None
|
||||
attempt = 0
|
||||
while not query and attempt < retry_count:
|
||||
try:
|
||||
query = session.query(Query).filter_by(id=query_id).one()
|
||||
except Exception:
|
||||
attempt += 1
|
||||
logging.error(f"Query with id `{query_id}` could not be retrieved")
|
||||
stats_logger.incr("error_attempting_orm_query_" + str(attempt))
|
||||
logging.error(f"Query {query_id}: Sleeping for a sec before retrying...")
|
||||
sleep(1)
|
||||
if not query:
|
||||
stats_logger.incr("error_failed_at_getting_orm_query")
|
||||
def get_query_backoff_handler(details):
|
||||
query_id = details["kwargs"]["query_id"]
|
||||
logging.error(f"Query with id `{query_id}` could not be retrieved")
|
||||
stats_logger.incr("error_attempting_orm_query_{}".format(details["tries"] - 1))
|
||||
logging.error(f"Query {query_id}: Sleeping for a sec before retrying...")
|
||||
|
||||
|
||||
def get_query_giveup_handler(details):
|
||||
stats_logger.incr("error_failed_at_getting_orm_query")
|
||||
|
||||
|
||||
@backoff.on_exception(
|
||||
backoff.constant,
|
||||
SqlLabException,
|
||||
interval=1,
|
||||
on_backoff=get_query_backoff_handler,
|
||||
on_giveup=get_query_giveup_handler,
|
||||
max_tries=5,
|
||||
)
|
||||
def get_query(query_id, session):
|
||||
"""attempts to get the query and retry if it cannot"""
|
||||
try:
|
||||
return session.query(Query).filter_by(id=query_id).one()
|
||||
except Exception:
|
||||
raise SqlLabException("Failed at getting query")
|
||||
return query
|
||||
|
||||
|
||||
@contextmanager
|
||||
|
||||
@@ -22,6 +22,7 @@ import re
|
||||
from typing import Dict, List, Optional, Union # noqa: F401
|
||||
from urllib import parse
|
||||
|
||||
import backoff
|
||||
from flask import (
|
||||
abort,
|
||||
flash,
|
||||
@@ -45,6 +46,7 @@ import pandas as pd
|
||||
import pyarrow as pa
|
||||
import simplejson as json
|
||||
from sqlalchemy import and_, or_, select
|
||||
from sqlalchemy.exc import DatabaseError
|
||||
from werkzeug.routing import BaseConverter
|
||||
|
||||
from superset import (
|
||||
@@ -2466,14 +2468,30 @@ class Superset(BaseSupersetView):
|
||||
@has_access_api
|
||||
@expose("/stop_query/", methods=["POST"])
|
||||
@event_logger.log_this
|
||||
@backoff.on_exception(
|
||||
backoff.constant,
|
||||
DatabaseError,
|
||||
interval=1,
|
||||
on_backoff=lambda details: db.session.rollback(),
|
||||
on_giveup=lambda details: db.session.rollback(),
|
||||
max_tries=5,
|
||||
)
|
||||
def stop_query(self):
|
||||
client_id = request.form.get("client_id")
|
||||
try:
|
||||
query = db.session.query(Query).filter_by(client_id=client_id).one()
|
||||
query.status = QueryStatus.STOPPED
|
||||
db.session.commit()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
query = db.session.query(Query).filter_by(client_id=client_id).one()
|
||||
if query.status in [
|
||||
QueryStatus.FAILED,
|
||||
QueryStatus.SUCCESS,
|
||||
QueryStatus.TIMED_OUT,
|
||||
]:
|
||||
logging.error(
|
||||
f"Query with client_id {client_id} could not be stopped: query already complete"
|
||||
)
|
||||
return self.json_response("OK")
|
||||
query.status = QueryStatus.STOPPED
|
||||
db.session.commit()
|
||||
|
||||
return self.json_response("OK")
|
||||
|
||||
@has_access_api
|
||||
|
||||
Reference in New Issue
Block a user