/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import { ensureIsArray, isFeatureEnabled, FeatureFlag, makeApi, SupersetClient, getClientErrorObject, parseErrorJson, SupersetError, } from '@superset-ui/core'; import { logging } from '@apache-superset/core/utils'; import getBootstrapData from 'src/utils/getBootstrapData'; type AsyncEvent = { id?: string | null; channel_id: string; job_id: string; user_id?: string; status: string; errors?: SupersetError[]; result_url: string | null; }; type CachedDataResponse = { status: string; data: any; }; type AppConfig = Record; type ListenerFn = (asyncEvent: AsyncEvent) => Promise; const TRANSPORT_POLLING = 'polling'; const TRANSPORT_WS = 'ws'; const JOB_STATUS = { PENDING: 'pending', RUNNING: 'running', ERROR: 'error', DONE: 'done', }; const LOCALSTORAGE_KEY = 'last_async_event_id'; const POLLING_URL = '/api/v1/async_event/'; const MAX_RETRIES = 6; const RETRY_DELAY = 100; let config: AppConfig; let transport: string; let pollingDelayMs: number; let pollingTimeoutId: number; let listenersByJobId: Record; let retriesByJobId: Record; let lastReceivedEventId: string | null | undefined; const addListener = (id: string, fn: any) => { listenersByJobId[id] = fn; }; const removeListener = (id: string) => { if (!listenersByJobId[id]) return; delete listenersByJobId[id]; }; const fetchCachedData = async ( asyncEvent: AsyncEvent, ): Promise => { let status = 'success'; let data; try { const { json } = await SupersetClient.get({ endpoint: String(asyncEvent.result_url), }); data = 'result' in json ? json.result : json; } catch (response) { status = 'error'; data = await getClientErrorObject(response); } return { status, data }; }; export const waitForAsyncData = async (asyncResponse: AsyncEvent) => new Promise((resolve, reject) => { const jobId = asyncResponse.job_id; const listener = async (asyncEvent: AsyncEvent) => { switch (asyncEvent.status) { case JOB_STATUS.DONE: { let { data, status } = await fetchCachedData(asyncEvent); // eslint-disable-line prefer-const data = ensureIsArray(data); if (status === 'success') { resolve(data); } else { reject(data); } break; } case JOB_STATUS.ERROR: { const err = parseErrorJson(asyncEvent); reject(err); break; } default: { logging.warn('received event with status', asyncEvent.status); } } removeListener(jobId); }; addListener(jobId, listener); }); const fetchEvents = makeApi< { last_id?: string | null }, { result: AsyncEvent[] } >({ method: 'GET', endpoint: POLLING_URL, }); const setLastId = (asyncEvent: AsyncEvent) => { lastReceivedEventId = asyncEvent.id; try { localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string); } catch (err) { logging.warn('Error saving event Id to localStorage', err); } }; export const processEvents = async (events: AsyncEvent[]) => { events.forEach((asyncEvent: AsyncEvent) => { const jobId = asyncEvent.job_id; const listener = listenersByJobId[jobId]; if (listener) { listener(asyncEvent); delete retriesByJobId[jobId]; } else { // handle race condition where event is received // before listener is registered if (!retriesByJobId[jobId]) retriesByJobId[jobId] = 0; retriesByJobId[jobId] += 1; if (retriesByJobId[jobId] <= MAX_RETRIES) { setTimeout(() => { processEvents([asyncEvent]); }, RETRY_DELAY * retriesByJobId[jobId]); } else { delete retriesByJobId[jobId]; logging.warn('listener not found for job_id', asyncEvent.job_id); } } setLastId(asyncEvent); }); }; const loadEventsFromApi = async () => { const eventArgs = lastReceivedEventId ? { last_id: lastReceivedEventId } : {}; if (Object.keys(listenersByJobId).length) { try { const { result: events } = await fetchEvents(eventArgs); if (events?.length) await processEvents(events); } catch (err) { logging.warn(err); } } if (transport === TRANSPORT_POLLING) { pollingTimeoutId = window.setTimeout(loadEventsFromApi, pollingDelayMs); } }; const wsConnectMaxRetries = 6; const wsConnectErrorDelay = 2500; let wsConnectRetries = 0; let wsConnectTimeout: any; let ws: WebSocket; const wsConnect = (): void => { let url = config.GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL; if (lastReceivedEventId) url += `?last_id=${lastReceivedEventId}`; ws = new WebSocket(url); ws.addEventListener('open', () => { logging.log('WebSocket connected'); clearTimeout(wsConnectTimeout); wsConnectRetries = 0; }); ws.addEventListener('close', () => { wsConnectTimeout = setTimeout(() => { wsConnectRetries += 1; if (wsConnectRetries <= wsConnectMaxRetries) { wsConnect(); } else { logging.warn('WebSocket not available, falling back to async polling'); loadEventsFromApi(); } }, wsConnectErrorDelay); }); ws.addEventListener('error', () => { // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState if (ws.readyState < 2) ws.close(); }); ws.addEventListener('message', async event => { let events: AsyncEvent[] = []; try { events = [JSON.parse(event.data)]; await processEvents(events); } catch (err) { logging.warn(err); } }); }; export const init = (appConfig?: AppConfig) => { if (!isFeatureEnabled(FeatureFlag.GlobalAsyncQueries)) return; if (pollingTimeoutId) clearTimeout(pollingTimeoutId); listenersByJobId = {}; retriesByJobId = {}; lastReceivedEventId = null; config = appConfig || getBootstrapData().common.conf; transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING; pollingDelayMs = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500; try { lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY); } catch (err) { logging.warn('Failed to fetch last event Id from localStorage'); } if (transport === TRANSPORT_POLLING) { loadEventsFromApi(); } if (transport === TRANSPORT_WS) { wsConnect(); } }; init();