fix: remove unnecessary app context on celery (#15422)

* fix: remove unnecessary app context on celery

* fix lint

* fix lint
This commit is contained in:
Daniel Vaz Gaspar
2021-06-29 12:16:16 +01:00
committed by GitHub
parent f109da479d
commit 727847d2e5
2 changed files with 96 additions and 109 deletions

View File

@@ -22,7 +22,6 @@ from typing import Any, cast, Dict, Optional
from celery.exceptions import SoftTimeLimitExceeded
from flask import current_app, g
from superset import app
from superset.exceptions import SupersetVizException
from superset.extensions import (
async_query_manager,
@@ -53,32 +52,27 @@ def load_chart_data_into_cache(
) -> None:
from superset.charts.commands.data import ChartDataCommand
with app.app_context(): # type: ignore
try:
ensure_user_is_set(job_metadata.get("user_id"))
command = ChartDataCommand()
command.set_query_context(form_data)
result = command.run(cache=True)
cache_key = result["cache_key"]
result_url = f"/api/v1/chart/data/{cache_key}"
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_DONE, result_url=result_url,
)
except SoftTimeLimitExceeded as exc:
logger.warning(
"A timeout occurred while loading chart data, error: %s", exc
)
raise exc
except Exception as exc:
# TODO: QueryContext should support SIP-40 style errors
error = exc.message if hasattr(exc, "message") else str(exc) # type: ignore # pylint: disable=no-member
errors = [{"message": error}]
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
raise exc
return None
try:
ensure_user_is_set(job_metadata.get("user_id"))
command = ChartDataCommand()
command.set_query_context(form_data)
result = command.run(cache=True)
cache_key = result["cache_key"]
result_url = f"/api/v1/chart/data/{cache_key}"
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_DONE, result_url=result_url,
)
except SoftTimeLimitExceeded as exc:
logger.warning("A timeout occurred while loading chart data, error: %s", exc)
raise exc
except Exception as exc:
# TODO: QueryContext should support SIP-40 style errors
error = exc.message if hasattr(exc, "message") else str(exc) # type: ignore # pylint: disable=no-member
errors = [{"message": error}]
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
raise exc
@celery_app.task(name="load_explore_json_into_cache", soft_time_limit=query_timeout)
@@ -88,58 +82,53 @@ def load_explore_json_into_cache( # pylint: disable=too-many-locals
response_type: Optional[str] = None,
force: bool = False,
) -> None:
with app.app_context(): # type: ignore
cache_key_prefix = "ejr-" # ejr: explore_json request
try:
ensure_user_is_set(job_metadata.get("user_id"))
datasource_id, datasource_type = get_datasource_info(None, None, form_data)
cache_key_prefix = "ejr-" # ejr: explore_json request
try:
ensure_user_is_set(job_metadata.get("user_id"))
datasource_id, datasource_type = get_datasource_info(None, None, form_data)
# Perform a deep copy here so that below we can cache the original
# value of the form_data object. This is necessary since the viz
# objects modify the form_data object. If the modified version were
# to be cached here, it will lead to a cache miss when clients
# attempt to retrieve the value of the completed async query.
original_form_data = copy.deepcopy(form_data)
# Perform a deep copy here so that below we can cache the original
# value of the form_data object. This is necessary since the viz
# objects modify the form_data object. If the modified version were
# to be cached here, it will lead to a cache miss when clients
# attempt to retrieve the value of the completed async query.
original_form_data = copy.deepcopy(form_data)
viz_obj = get_viz(
datasource_type=cast(str, datasource_type),
datasource_id=datasource_id,
form_data=form_data,
force=force,
viz_obj = get_viz(
datasource_type=cast(str, datasource_type),
datasource_id=datasource_id,
form_data=form_data,
force=force,
)
# run query & cache results
payload = viz_obj.get_payload()
if viz_obj.has_error(payload):
raise SupersetVizException(errors=payload["errors"])
# Cache the original form_data value for async retrieval
cache_value = {
"form_data": original_form_data,
"response_type": response_type,
}
cache_key = generate_cache_key(cache_value, cache_key_prefix)
set_and_log_cache(cache_manager.cache, cache_key, cache_value)
result_url = f"/superset/explore_json/data/{cache_key}"
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_DONE, result_url=result_url,
)
except SoftTimeLimitExceeded as ex:
logger.warning("A timeout occurred while loading explore json, error: %s", ex)
raise ex
except Exception as exc:
if isinstance(exc, SupersetVizException):
errors = exc.errors # pylint: disable=no-member
else:
error = (
exc.message if hasattr(exc, "message") else str(exc) # type: ignore # pylint: disable=no-member
)
# run query & cache results
payload = viz_obj.get_payload()
if viz_obj.has_error(payload):
raise SupersetVizException(errors=payload["errors"])
errors = [error]
# Cache the original form_data value for async retrieval
cache_value = {
"form_data": original_form_data,
"response_type": response_type,
}
cache_key = generate_cache_key(cache_value, cache_key_prefix)
set_and_log_cache(cache_manager.cache, cache_key, cache_value)
result_url = f"/superset/explore_json/data/{cache_key}"
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_DONE, result_url=result_url,
)
except SoftTimeLimitExceeded as ex:
logger.warning(
"A timeout occurred while loading explore json, error: %s", ex
)
raise ex
except Exception as exc:
if isinstance(exc, SupersetVizException):
errors = exc.errors # pylint: disable=no-member
else:
error = (
exc.message if hasattr(exc, "message") else str(exc) # type: ignore # pylint: disable=no-member
)
errors = [error]
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
raise exc
return None
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
raise exc

View File

@@ -22,7 +22,7 @@ from typing import Optional
from flask import current_app
from superset import app, security_manager, thumbnail_cache
from superset import security_manager, thumbnail_cache
from superset.extensions import celery_app
from superset.utils.celery import session_scope
from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot
@@ -39,40 +39,38 @@ def cache_chart_thumbnail(
window_size: Optional[WindowSize] = None,
thumb_size: Optional[WindowSize] = None,
) -> None:
with app.app_context(): # type: ignore
if not thumbnail_cache:
logger.warning("No cache set, refusing to compute")
return None
logger.info("Caching chart: %s", url)
screenshot = ChartScreenshot(url, digest)
with session_scope(nullpool=True) as session:
user = security_manager.get_user_by_username(
current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
)
screenshot.compute_and_cache(
user=user,
cache=thumbnail_cache,
force=force,
window_size=window_size,
thumb_size=thumb_size,
)
if not thumbnail_cache:
logger.warning("No cache set, refusing to compute")
return None
logger.info("Caching chart: %s", url)
screenshot = ChartScreenshot(url, digest)
with session_scope(nullpool=True) as session:
user = security_manager.get_user_by_username(
current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
)
screenshot.compute_and_cache(
user=user,
cache=thumbnail_cache,
force=force,
window_size=window_size,
thumb_size=thumb_size,
)
return None
@celery_app.task(name="cache_dashboard_thumbnail", soft_time_limit=300)
def cache_dashboard_thumbnail(
url: str, digest: str, force: bool = False, thumb_size: Optional[WindowSize] = None
) -> None:
with app.app_context(): # type: ignore
if not thumbnail_cache:
logging.warning("No cache set, refusing to compute")
return
logger.info("Caching dashboard: %s", url)
screenshot = DashboardScreenshot(url, digest)
with session_scope(nullpool=True) as session:
user = security_manager.get_user_by_username(
current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
)
screenshot.compute_and_cache(
user=user, cache=thumbnail_cache, force=force, thumb_size=thumb_size,
)
if not thumbnail_cache:
logging.warning("No cache set, refusing to compute")
return
logger.info("Caching dashboard: %s", url)
screenshot = DashboardScreenshot(url, digest)
with session_scope(nullpool=True) as session:
user = security_manager.get_user_by_username(
current_app.config["THUMBNAIL_SELENIUM_USER"], session=session
)
screenshot.compute_and_cache(
user=user, cache=thumbnail_cache, force=force, thumb_size=thumb_size,
)