mirror of
https://github.com/apache/superset.git
synced 2026-05-12 19:35:17 +00:00
feat: Refactor asyncEvent middleware and add websocket support (#13696)
This commit is contained in:
@@ -16,8 +16,7 @@
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
import { Dispatch, Middleware, MiddlewareAPI } from 'redux';
|
||||
import { makeApi, SupersetClient } from '@superset-ui/core';
|
||||
import { ensureIsArray, makeApi, SupersetClient } from '@superset-ui/core';
|
||||
import { SupersetError } from 'src/components/ErrorMessage/types';
|
||||
import { FeatureFlag, isFeatureEnabled } from '../featureFlags';
|
||||
import {
|
||||
@@ -25,178 +24,240 @@ import {
|
||||
parseErrorJson,
|
||||
} from '../utils/getClientErrorObject';
|
||||
|
||||
export type AsyncEvent = {
|
||||
id: string;
|
||||
type AsyncEvent = {
|
||||
id?: string | null;
|
||||
channel_id: string;
|
||||
job_id: string;
|
||||
user_id: string;
|
||||
user_id?: string;
|
||||
status: string;
|
||||
errors: SupersetError[];
|
||||
result_url: string;
|
||||
};
|
||||
|
||||
type AsyncEventOptions = {
|
||||
config: {
|
||||
GLOBAL_ASYNC_QUERIES_TRANSPORT: string;
|
||||
GLOBAL_ASYNC_QUERIES_POLLING_DELAY: number;
|
||||
};
|
||||
getPendingComponents: (state: any) => any[];
|
||||
successAction: (componentId: number, componentData: any) => { type: string };
|
||||
errorAction: (componentId: number, response: any) => { type: string };
|
||||
processEventsCallback?: (events: AsyncEvent[]) => void; // this is currently used only for tests
|
||||
errors?: SupersetError[];
|
||||
result_url: string | null;
|
||||
};
|
||||
|
||||
type CachedDataResponse = {
|
||||
componentId: number;
|
||||
status: string;
|
||||
data: any;
|
||||
};
|
||||
type AppConfig = Record<string, any>;
|
||||
type ListenerFn = (asyncEvent: AsyncEvent) => Promise<any>;
|
||||
|
||||
const initAsyncEvents = (options: AsyncEventOptions) => {
|
||||
// TODO: implement websocket support
|
||||
const TRANSPORT_POLLING = 'polling';
|
||||
const {
|
||||
config,
|
||||
getPendingComponents,
|
||||
successAction,
|
||||
errorAction,
|
||||
processEventsCallback,
|
||||
} = options;
|
||||
const transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING;
|
||||
const polling_delay = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500;
|
||||
const TRANSPORT_POLLING = 'polling';
|
||||
const TRANSPORT_WS = 'ws';
|
||||
const JOB_STATUS = {
|
||||
PENDING: 'pending',
|
||||
RUNNING: 'running',
|
||||
ERROR: 'error',
|
||||
DONE: 'done',
|
||||
};
|
||||
const LOCALSTORAGE_KEY = 'last_async_event_id';
|
||||
const POLLING_URL = '/api/v1/async_event/';
|
||||
const MAX_RETRIES = 6;
|
||||
const RETRY_DELAY = 100;
|
||||
|
||||
const middleware: Middleware = (store: MiddlewareAPI) => (next: Dispatch) => {
|
||||
const JOB_STATUS = {
|
||||
PENDING: 'pending',
|
||||
RUNNING: 'running',
|
||||
ERROR: 'error',
|
||||
DONE: 'done',
|
||||
};
|
||||
const LOCALSTORAGE_KEY = 'last_async_event_id';
|
||||
const POLLING_URL = '/api/v1/async_event/';
|
||||
let lastReceivedEventId: string | null;
|
||||
let config: AppConfig;
|
||||
let transport: string;
|
||||
let polling_delay: number;
|
||||
let listenersByJobId: Record<string, ListenerFn>;
|
||||
let retriesByJobId: Record<string, number>;
|
||||
let lastReceivedEventId: string | null | undefined;
|
||||
|
||||
try {
|
||||
lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
|
||||
} catch (err) {
|
||||
console.warn('Failed to fetch last event Id from localStorage');
|
||||
export const init = (appConfig?: AppConfig) => {
|
||||
if (!isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES)) return;
|
||||
|
||||
listenersByJobId = {};
|
||||
retriesByJobId = {};
|
||||
lastReceivedEventId = null;
|
||||
|
||||
if (appConfig) {
|
||||
config = appConfig;
|
||||
} else {
|
||||
// load bootstrap data from DOM
|
||||
const appContainer = document.getElementById('app');
|
||||
if (appContainer) {
|
||||
const bootstrapData = JSON.parse(
|
||||
appContainer?.getAttribute('data-bootstrap') || '{}',
|
||||
);
|
||||
config = bootstrapData?.common?.conf;
|
||||
} else {
|
||||
config = {};
|
||||
console.warn('asyncEvent: app config data not found');
|
||||
}
|
||||
}
|
||||
transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING;
|
||||
polling_delay = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500;
|
||||
|
||||
const fetchEvents = makeApi<
|
||||
{ last_id?: string | null },
|
||||
{ result: AsyncEvent[] }
|
||||
>({
|
||||
method: 'GET',
|
||||
endpoint: POLLING_URL,
|
||||
});
|
||||
try {
|
||||
lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
|
||||
} catch (err) {
|
||||
console.warn('Failed to fetch last event Id from localStorage');
|
||||
}
|
||||
|
||||
const fetchCachedData = async (
|
||||
asyncEvent: AsyncEvent,
|
||||
componentId: number,
|
||||
): Promise<CachedDataResponse> => {
|
||||
let status = 'success';
|
||||
let data;
|
||||
try {
|
||||
const { json } = await SupersetClient.get({
|
||||
endpoint: asyncEvent.result_url,
|
||||
});
|
||||
data = 'result' in json ? json.result : json;
|
||||
} catch (response) {
|
||||
status = 'error';
|
||||
data = await getClientErrorObject(response);
|
||||
}
|
||||
|
||||
return { componentId, status, data };
|
||||
};
|
||||
|
||||
const setLastId = (asyncEvent: AsyncEvent) => {
|
||||
lastReceivedEventId = asyncEvent.id;
|
||||
try {
|
||||
localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string);
|
||||
} catch (err) {
|
||||
console.warn('Error saving event Id to localStorage', err);
|
||||
}
|
||||
};
|
||||
|
||||
const processEvents = async () => {
|
||||
let queuedComponents = getPendingComponents(store.getState());
|
||||
const eventArgs = lastReceivedEventId
|
||||
? { last_id: lastReceivedEventId }
|
||||
: {};
|
||||
const events: AsyncEvent[] = [];
|
||||
if (queuedComponents && queuedComponents.length) {
|
||||
try {
|
||||
const { result: events } = await fetchEvents(eventArgs);
|
||||
// refetch queuedComponents due to race condition where results are available
|
||||
// before component state is updated with asyncJobId
|
||||
queuedComponents = getPendingComponents(store.getState());
|
||||
if (events && events.length) {
|
||||
const componentsByJobId = queuedComponents.reduce((acc, item) => {
|
||||
acc[item.asyncJobId] = item;
|
||||
return acc;
|
||||
}, {});
|
||||
const fetchDataEvents: Promise<CachedDataResponse>[] = [];
|
||||
events.forEach((asyncEvent: AsyncEvent) => {
|
||||
const component = componentsByJobId[asyncEvent.job_id];
|
||||
if (!component) {
|
||||
console.warn(
|
||||
'Component not found for job_id',
|
||||
asyncEvent.job_id,
|
||||
);
|
||||
return setLastId(asyncEvent);
|
||||
}
|
||||
const componentId = component.id;
|
||||
switch (asyncEvent.status) {
|
||||
case JOB_STATUS.DONE:
|
||||
fetchDataEvents.push(
|
||||
fetchCachedData(asyncEvent, componentId),
|
||||
);
|
||||
break;
|
||||
case JOB_STATUS.ERROR:
|
||||
store.dispatch(
|
||||
errorAction(componentId, [parseErrorJson(asyncEvent)]),
|
||||
);
|
||||
break;
|
||||
default:
|
||||
console.warn('Received event with status', asyncEvent.status);
|
||||
}
|
||||
|
||||
return setLastId(asyncEvent);
|
||||
});
|
||||
|
||||
const fetchResults = await Promise.all(fetchDataEvents);
|
||||
fetchResults.forEach(result => {
|
||||
const data = Array.isArray(result.data)
|
||||
? result.data
|
||||
: [result.data];
|
||||
if (result.status === 'success') {
|
||||
store.dispatch(successAction(result.componentId, data));
|
||||
} else {
|
||||
store.dispatch(errorAction(result.componentId, data));
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn(err);
|
||||
}
|
||||
}
|
||||
|
||||
if (processEventsCallback) processEventsCallback(events);
|
||||
|
||||
return setTimeout(processEvents, polling_delay);
|
||||
};
|
||||
|
||||
if (
|
||||
isFeatureEnabled(FeatureFlag.GLOBAL_ASYNC_QUERIES) &&
|
||||
transport === TRANSPORT_POLLING
|
||||
) {
|
||||
processEvents();
|
||||
}
|
||||
|
||||
return action => next(action);
|
||||
};
|
||||
|
||||
return middleware;
|
||||
if (transport === TRANSPORT_POLLING) {
|
||||
loadEventsFromApi();
|
||||
}
|
||||
if (transport === TRANSPORT_WS) {
|
||||
wsConnect();
|
||||
}
|
||||
};
|
||||
|
||||
export default initAsyncEvents;
|
||||
const addListener = (id: string, fn: any) => {
|
||||
listenersByJobId[id] = fn;
|
||||
};
|
||||
|
||||
const removeListener = (id: string) => {
|
||||
if (!listenersByJobId[id]) return;
|
||||
delete listenersByJobId[id];
|
||||
};
|
||||
|
||||
export const waitForAsyncData = async (asyncResponse: AsyncEvent) =>
|
||||
new Promise((resolve, reject) => {
|
||||
const jobId = asyncResponse.job_id;
|
||||
const listener = async (asyncEvent: AsyncEvent) => {
|
||||
switch (asyncEvent.status) {
|
||||
case JOB_STATUS.DONE: {
|
||||
let { data, status } = await fetchCachedData(asyncEvent); // eslint-disable-line prefer-const
|
||||
data = ensureIsArray(data);
|
||||
if (status === 'success') {
|
||||
resolve(data);
|
||||
} else {
|
||||
reject(data);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case JOB_STATUS.ERROR: {
|
||||
const err = parseErrorJson(asyncEvent);
|
||||
reject(err);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
console.warn('received event with status', asyncEvent.status);
|
||||
}
|
||||
}
|
||||
removeListener(jobId);
|
||||
};
|
||||
addListener(jobId, listener);
|
||||
});
|
||||
|
||||
const fetchEvents = makeApi<
|
||||
{ last_id?: string | null },
|
||||
{ result: AsyncEvent[] }
|
||||
>({
|
||||
method: 'GET',
|
||||
endpoint: POLLING_URL,
|
||||
});
|
||||
|
||||
const fetchCachedData = async (
|
||||
asyncEvent: AsyncEvent,
|
||||
): Promise<CachedDataResponse> => {
|
||||
let status = 'success';
|
||||
let data;
|
||||
try {
|
||||
const { json } = await SupersetClient.get({
|
||||
endpoint: String(asyncEvent.result_url),
|
||||
});
|
||||
data = 'result' in json ? json.result : json;
|
||||
} catch (response) {
|
||||
status = 'error';
|
||||
data = await getClientErrorObject(response);
|
||||
}
|
||||
|
||||
return { status, data };
|
||||
};
|
||||
|
||||
const setLastId = (asyncEvent: AsyncEvent) => {
|
||||
lastReceivedEventId = asyncEvent.id;
|
||||
try {
|
||||
localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string);
|
||||
} catch (err) {
|
||||
console.warn('Error saving event Id to localStorage', err);
|
||||
}
|
||||
};
|
||||
|
||||
const loadEventsFromApi = async () => {
|
||||
const eventArgs = lastReceivedEventId ? { last_id: lastReceivedEventId } : {};
|
||||
if (Object.keys(listenersByJobId).length) {
|
||||
try {
|
||||
const { result: events } = await fetchEvents(eventArgs);
|
||||
if (events && events.length) await processEvents(events);
|
||||
} catch (err) {
|
||||
console.warn(err);
|
||||
}
|
||||
}
|
||||
|
||||
if (transport === TRANSPORT_POLLING) {
|
||||
setTimeout(loadEventsFromApi, polling_delay);
|
||||
}
|
||||
};
|
||||
|
||||
export const processEvents = async (events: AsyncEvent[]) => {
|
||||
events.forEach((asyncEvent: AsyncEvent) => {
|
||||
const jobId = asyncEvent.job_id;
|
||||
const listener = listenersByJobId[jobId];
|
||||
if (listener) {
|
||||
listener(asyncEvent);
|
||||
delete retriesByJobId[jobId];
|
||||
} else {
|
||||
// handle race condition where event is received
|
||||
// before listener is registered
|
||||
if (!retriesByJobId[jobId]) retriesByJobId[jobId] = 0;
|
||||
retriesByJobId[jobId] += 1;
|
||||
|
||||
if (retriesByJobId[jobId] <= MAX_RETRIES) {
|
||||
setTimeout(() => {
|
||||
processEvents([asyncEvent]);
|
||||
}, RETRY_DELAY * retriesByJobId[jobId]);
|
||||
} else {
|
||||
delete retriesByJobId[jobId];
|
||||
console.warn('listener not found for job_id', asyncEvent.job_id);
|
||||
}
|
||||
}
|
||||
setLastId(asyncEvent);
|
||||
});
|
||||
};
|
||||
|
||||
const wsConnectMaxRetries = 6;
|
||||
const wsConnectErrorDelay = 2500;
|
||||
let wsConnectRetries = 0;
|
||||
let wsConnectTimeout: any;
|
||||
let ws: WebSocket;
|
||||
|
||||
const wsConnect = (): void => {
|
||||
let url = config.GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL;
|
||||
if (lastReceivedEventId) url += `?last_id=${lastReceivedEventId}`;
|
||||
ws = new WebSocket(url);
|
||||
|
||||
ws.addEventListener('open', event => {
|
||||
console.log('WebSocket connected');
|
||||
clearTimeout(wsConnectTimeout);
|
||||
wsConnectRetries = 0;
|
||||
});
|
||||
|
||||
ws.addEventListener('close', event => {
|
||||
wsConnectTimeout = setTimeout(() => {
|
||||
wsConnectRetries += 1;
|
||||
if (wsConnectRetries <= wsConnectMaxRetries) {
|
||||
wsConnect();
|
||||
} else {
|
||||
console.warn('WebSocket not available, falling back to async polling');
|
||||
loadEventsFromApi();
|
||||
}
|
||||
}, wsConnectErrorDelay);
|
||||
});
|
||||
|
||||
ws.addEventListener('error', event => {
|
||||
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
|
||||
if (ws.readyState < 2) ws.close();
|
||||
});
|
||||
|
||||
ws.addEventListener('message', async event => {
|
||||
let events: AsyncEvent[] = [];
|
||||
try {
|
||||
events = [JSON.parse(event.data)];
|
||||
await processEvents(events);
|
||||
} catch (err) {
|
||||
console.warn(err);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
init();
|
||||
|
||||
Reference in New Issue
Block a user