diff --git a/superset/tasks/async_queries.py b/superset/tasks/async_queries.py index 5fbe39e1511..8e7d2ea7a3f 100644 --- a/superset/tasks/async_queries.py +++ b/superset/tasks/async_queries.py @@ -22,7 +22,6 @@ from typing import Any, cast, Dict, Optional from celery.exceptions import SoftTimeLimitExceeded from flask import current_app, g -from superset import app from superset.exceptions import SupersetVizException from superset.extensions import ( async_query_manager, @@ -53,32 +52,27 @@ def load_chart_data_into_cache( ) -> None: from superset.charts.commands.data import ChartDataCommand - with app.app_context(): # type: ignore - try: - ensure_user_is_set(job_metadata.get("user_id")) - command = ChartDataCommand() - command.set_query_context(form_data) - result = command.run(cache=True) - cache_key = result["cache_key"] - result_url = f"/api/v1/chart/data/{cache_key}" - async_query_manager.update_job( - job_metadata, async_query_manager.STATUS_DONE, result_url=result_url, - ) - except SoftTimeLimitExceeded as exc: - logger.warning( - "A timeout occurred while loading chart data, error: %s", exc - ) - raise exc - except Exception as exc: - # TODO: QueryContext should support SIP-40 style errors - error = exc.message if hasattr(exc, "message") else str(exc) # type: ignore # pylint: disable=no-member - errors = [{"message": error}] - async_query_manager.update_job( - job_metadata, async_query_manager.STATUS_ERROR, errors=errors - ) - raise exc - - return None + try: + ensure_user_is_set(job_metadata.get("user_id")) + command = ChartDataCommand() + command.set_query_context(form_data) + result = command.run(cache=True) + cache_key = result["cache_key"] + result_url = f"/api/v1/chart/data/{cache_key}" + async_query_manager.update_job( + job_metadata, async_query_manager.STATUS_DONE, result_url=result_url, + ) + except SoftTimeLimitExceeded as exc: + logger.warning("A timeout occurred while loading chart data, error: %s", exc) + raise exc + except Exception as exc: + # TODO: QueryContext should support SIP-40 style errors + error = exc.message if hasattr(exc, "message") else str(exc) # type: ignore # pylint: disable=no-member + errors = [{"message": error}] + async_query_manager.update_job( + job_metadata, async_query_manager.STATUS_ERROR, errors=errors + ) + raise exc @celery_app.task(name="load_explore_json_into_cache", soft_time_limit=query_timeout) @@ -88,58 +82,53 @@ def load_explore_json_into_cache( # pylint: disable=too-many-locals response_type: Optional[str] = None, force: bool = False, ) -> None: - with app.app_context(): # type: ignore - cache_key_prefix = "ejr-" # ejr: explore_json request - try: - ensure_user_is_set(job_metadata.get("user_id")) - datasource_id, datasource_type = get_datasource_info(None, None, form_data) + cache_key_prefix = "ejr-" # ejr: explore_json request + try: + ensure_user_is_set(job_metadata.get("user_id")) + datasource_id, datasource_type = get_datasource_info(None, None, form_data) - # Perform a deep copy here so that below we can cache the original - # value of the form_data object. This is necessary since the viz - # objects modify the form_data object. If the modified version were - # to be cached here, it will lead to a cache miss when clients - # attempt to retrieve the value of the completed async query. - original_form_data = copy.deepcopy(form_data) + # Perform a deep copy here so that below we can cache the original + # value of the form_data object. This is necessary since the viz + # objects modify the form_data object. If the modified version were + # to be cached here, it will lead to a cache miss when clients + # attempt to retrieve the value of the completed async query. + original_form_data = copy.deepcopy(form_data) - viz_obj = get_viz( - datasource_type=cast(str, datasource_type), - datasource_id=datasource_id, - form_data=form_data, - force=force, + viz_obj = get_viz( + datasource_type=cast(str, datasource_type), + datasource_id=datasource_id, + form_data=form_data, + force=force, + ) + # run query & cache results + payload = viz_obj.get_payload() + if viz_obj.has_error(payload): + raise SupersetVizException(errors=payload["errors"]) + + # Cache the original form_data value for async retrieval + cache_value = { + "form_data": original_form_data, + "response_type": response_type, + } + cache_key = generate_cache_key(cache_value, cache_key_prefix) + set_and_log_cache(cache_manager.cache, cache_key, cache_value) + result_url = f"/superset/explore_json/data/{cache_key}" + async_query_manager.update_job( + job_metadata, async_query_manager.STATUS_DONE, result_url=result_url, + ) + except SoftTimeLimitExceeded as ex: + logger.warning("A timeout occurred while loading explore json, error: %s", ex) + raise ex + except Exception as exc: + if isinstance(exc, SupersetVizException): + errors = exc.errors # pylint: disable=no-member + else: + error = ( + exc.message if hasattr(exc, "message") else str(exc) # type: ignore # pylint: disable=no-member ) - # run query & cache results - payload = viz_obj.get_payload() - if viz_obj.has_error(payload): - raise SupersetVizException(errors=payload["errors"]) + errors = [error] - # Cache the original form_data value for async retrieval - cache_value = { - "form_data": original_form_data, - "response_type": response_type, - } - cache_key = generate_cache_key(cache_value, cache_key_prefix) - set_and_log_cache(cache_manager.cache, cache_key, cache_value) - result_url = f"/superset/explore_json/data/{cache_key}" - async_query_manager.update_job( - job_metadata, async_query_manager.STATUS_DONE, result_url=result_url, - ) - except SoftTimeLimitExceeded as ex: - logger.warning( - "A timeout occurred while loading explore json, error: %s", ex - ) - raise ex - except Exception as exc: - if isinstance(exc, SupersetVizException): - errors = exc.errors # pylint: disable=no-member - else: - error = ( - exc.message if hasattr(exc, "message") else str(exc) # type: ignore # pylint: disable=no-member - ) - errors = [error] - - async_query_manager.update_job( - job_metadata, async_query_manager.STATUS_ERROR, errors=errors - ) - raise exc - - return None + async_query_manager.update_job( + job_metadata, async_query_manager.STATUS_ERROR, errors=errors + ) + raise exc diff --git a/superset/tasks/thumbnails.py b/superset/tasks/thumbnails.py index 4ca4f2770e7..5e4b8dfb755 100644 --- a/superset/tasks/thumbnails.py +++ b/superset/tasks/thumbnails.py @@ -22,7 +22,7 @@ from typing import Optional from flask import current_app -from superset import app, security_manager, thumbnail_cache +from superset import security_manager, thumbnail_cache from superset.extensions import celery_app from superset.utils.celery import session_scope from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot @@ -39,40 +39,38 @@ def cache_chart_thumbnail( window_size: Optional[WindowSize] = None, thumb_size: Optional[WindowSize] = None, ) -> None: - with app.app_context(): # type: ignore - if not thumbnail_cache: - logger.warning("No cache set, refusing to compute") - return None - logger.info("Caching chart: %s", url) - screenshot = ChartScreenshot(url, digest) - with session_scope(nullpool=True) as session: - user = security_manager.get_user_by_username( - current_app.config["THUMBNAIL_SELENIUM_USER"], session=session - ) - screenshot.compute_and_cache( - user=user, - cache=thumbnail_cache, - force=force, - window_size=window_size, - thumb_size=thumb_size, - ) + if not thumbnail_cache: + logger.warning("No cache set, refusing to compute") return None + logger.info("Caching chart: %s", url) + screenshot = ChartScreenshot(url, digest) + with session_scope(nullpool=True) as session: + user = security_manager.get_user_by_username( + current_app.config["THUMBNAIL_SELENIUM_USER"], session=session + ) + screenshot.compute_and_cache( + user=user, + cache=thumbnail_cache, + force=force, + window_size=window_size, + thumb_size=thumb_size, + ) + return None @celery_app.task(name="cache_dashboard_thumbnail", soft_time_limit=300) def cache_dashboard_thumbnail( url: str, digest: str, force: bool = False, thumb_size: Optional[WindowSize] = None ) -> None: - with app.app_context(): # type: ignore - if not thumbnail_cache: - logging.warning("No cache set, refusing to compute") - return - logger.info("Caching dashboard: %s", url) - screenshot = DashboardScreenshot(url, digest) - with session_scope(nullpool=True) as session: - user = security_manager.get_user_by_username( - current_app.config["THUMBNAIL_SELENIUM_USER"], session=session - ) - screenshot.compute_and_cache( - user=user, cache=thumbnail_cache, force=force, thumb_size=thumb_size, - ) + if not thumbnail_cache: + logging.warning("No cache set, refusing to compute") + return + logger.info("Caching dashboard: %s", url) + screenshot = DashboardScreenshot(url, digest) + with session_scope(nullpool=True) as session: + user = security_manager.get_user_by_username( + current_app.config["THUMBNAIL_SELENIUM_USER"], session=session + ) + screenshot.compute_and_cache( + user=user, cache=thumbnail_cache, force=force, thumb_size=thumb_size, + )