mirror of
https://github.com/apache/superset.git
synced 2026-04-19 08:04:53 +00:00
make queries older than 6 hours timeout
This commit is contained in:
@@ -8,7 +8,7 @@ const $ = require('jquery');
|
||||
|
||||
const QUERY_UPDATE_FREQ = 2000;
|
||||
const QUERY_UPDATE_BUFFER_MS = 5000;
|
||||
const QUERY_POLL_WINDOW = 21600000; // 6 hours.
|
||||
const MAX_QUERY_AGE_TO_POLL = 21600000;
|
||||
|
||||
class QueryAutoRefresh extends React.PureComponent {
|
||||
componentWillMount() {
|
||||
@@ -18,12 +18,14 @@ class QueryAutoRefresh extends React.PureComponent {
|
||||
this.stopTimer();
|
||||
}
|
||||
shouldCheckForQueries() {
|
||||
// if there are started or running queries < 6 hours old, this method should return true
|
||||
// if there are started or running queries, this method should return true
|
||||
const { queries } = this.props;
|
||||
const now = new Date().getTime();
|
||||
return Object.values(queries)
|
||||
.filter(q => (q.startDttm >= this.props.queriesLastUpdate - QUERY_POLL_WINDOW))
|
||||
.some(q =>
|
||||
['running', 'started', 'pending', 'fetching'].indexOf(q.state) >= 0);
|
||||
.some(
|
||||
q => ['running', 'started', 'pending', 'fetching'].indexOf(q.state) >= 0 &&
|
||||
now - q.startDttm < MAX_QUERY_AGE_TO_POLL,
|
||||
);
|
||||
}
|
||||
startTimer() {
|
||||
if (!(this.timer)) {
|
||||
@@ -35,7 +37,7 @@ class QueryAutoRefresh extends React.PureComponent {
|
||||
this.timer = null;
|
||||
}
|
||||
stopwatch() {
|
||||
// only poll /superset/queries/ if there are started or running queries started <6 hours ago
|
||||
// only poll /superset/queries/ if there are started or running queries
|
||||
if (this.shouldCheckForQueries()) {
|
||||
const url = `/superset/queries/${this.props.queriesLastUpdate - QUERY_UPDATE_BUFFER_MS}`;
|
||||
$.getJSON(url, (data) => {
|
||||
|
||||
@@ -162,9 +162,6 @@ export const sqlLabReducer = function (state, action) {
|
||||
return alterInObject(state, 'queries', action.query, { state: 'fetching' });
|
||||
},
|
||||
[actions.QUERY_SUCCESS]() {
|
||||
if (action.query.state === 'stopped') {
|
||||
return state;
|
||||
}
|
||||
let rows;
|
||||
if (action.results.data) {
|
||||
rows = action.results.data.length;
|
||||
@@ -174,7 +171,7 @@ export const sqlLabReducer = function (state, action) {
|
||||
progress: 100,
|
||||
results: action.results,
|
||||
rows,
|
||||
state: 'success',
|
||||
state: action.query.state,
|
||||
errorMessage: null,
|
||||
cached: false,
|
||||
};
|
||||
|
||||
@@ -652,7 +652,7 @@ class PrestoEngineSpec(BaseEngineSpec):
|
||||
stats = polled.get('stats', {})
|
||||
|
||||
query = session.query(type(query)).filter_by(id=query.id).one()
|
||||
if query.status == QueryStatus.STOPPED:
|
||||
if query.status in [QueryStatus.STOPPED, QueryStatus.TIMED_OUT]:
|
||||
cursor.cancel()
|
||||
break
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ from flask_babel import lazy_gettext as _
|
||||
import pandas as pd
|
||||
from six import text_type
|
||||
import sqlalchemy as sqla
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import and_, create_engine, update
|
||||
from sqlalchemy.engine.url import make_url
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from unidecode import unidecode
|
||||
@@ -2541,6 +2541,35 @@ class Superset(BaseSupersetView):
|
||||
.all()
|
||||
)
|
||||
dict_queries = {q.client_id: q.to_dict() for q in sql_queries}
|
||||
|
||||
now = int(round(time.time() * 1000))
|
||||
|
||||
unfinished_states = [
|
||||
utils.QueryStatus.PENDING,
|
||||
utils.QueryStatus.RUNNING,
|
||||
]
|
||||
|
||||
queries_to_timeout = [
|
||||
client_id for client_id, query_dict in dict_queries.items()
|
||||
if (
|
||||
query_dict['state'] in unfinished_states and (
|
||||
now - query_dict['startDttm'] >
|
||||
config.get('SQLLAB_ASYNC_TIME_LIMIT_SEC') * 1000
|
||||
)
|
||||
)
|
||||
]
|
||||
|
||||
if queries_to_timeout:
|
||||
update(Query).where(
|
||||
and_(
|
||||
Query.user_id == g.user.get_id(),
|
||||
Query.client_id in queries_to_timeout,
|
||||
),
|
||||
).values(state=utils.QueryStatus.TIMED_OUT)
|
||||
|
||||
for client_id in queries_to_timeout:
|
||||
dict_queries[client_id]['status'] = utils.QueryStatus.TIMED_OUT
|
||||
|
||||
return json_success(
|
||||
json.dumps(dict_queries, default=utils.json_int_dttm_ser))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user