mirror of
https://github.com/apache/superset.git
synced 2026-04-20 08:34:37 +00:00
feat(chart-data): add rowcount, timegrain and column result types (#13271)
* feat(chart-data): add rowcount, timegrain and column result types * break out actions from query_context * rename module
This commit is contained in:
@@ -14,10 +14,8 @@
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
import logging
|
||||
import math
|
||||
from typing import Any, cast, ClassVar, Dict, List, Optional, Union
|
||||
from typing import Any, ClassVar, Dict, List, Optional, Union
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
@@ -26,6 +24,7 @@ from flask_babel import _
|
||||
from superset import app, db, is_feature_enabled
|
||||
from superset.annotation_layers.dao import AnnotationLayerDAO
|
||||
from superset.charts.dao import ChartDAO
|
||||
from superset.common.query_actions import get_query_results
|
||||
from superset.common.query_object import QueryObject
|
||||
from superset.connectors.base.models import BaseDatasource
|
||||
from superset.connectors.connector_registry import ConnectorRegistry
|
||||
@@ -36,9 +35,18 @@ from superset.exceptions import (
|
||||
)
|
||||
from superset.extensions import cache_manager, security_manager
|
||||
from superset.stats_logger import BaseStatsLogger
|
||||
from superset.utils import core as utils
|
||||
from superset.utils.cache import generate_cache_key, set_and_log_cache
|
||||
from superset.utils.core import DTTM_ALIAS
|
||||
from superset.utils.core import (
|
||||
ChartDataResultFormat,
|
||||
ChartDataResultType,
|
||||
DatasourceDict,
|
||||
DTTM_ALIAS,
|
||||
error_msg_from_exception,
|
||||
get_column_names_from_metrics,
|
||||
get_stacktrace,
|
||||
normalize_dttm_col,
|
||||
QueryStatus,
|
||||
)
|
||||
from superset.views.utils import get_viz
|
||||
|
||||
config = app.config
|
||||
@@ -59,19 +67,19 @@ class QueryContext:
|
||||
queries: List[QueryObject]
|
||||
force: bool
|
||||
custom_cache_timeout: Optional[int]
|
||||
result_type: utils.ChartDataResultType
|
||||
result_format: utils.ChartDataResultFormat
|
||||
result_type: ChartDataResultType
|
||||
result_format: ChartDataResultFormat
|
||||
|
||||
# TODO: Type datasource and query_object dictionary with TypedDict when it becomes
|
||||
# a vanilla python type https://github.com/python/mypy/issues/5288
|
||||
def __init__( # pylint: disable=too-many-arguments
|
||||
self,
|
||||
datasource: Dict[str, Any],
|
||||
datasource: DatasourceDict,
|
||||
queries: List[Dict[str, Any]],
|
||||
force: bool = False,
|
||||
custom_cache_timeout: Optional[int] = None,
|
||||
result_type: Optional[utils.ChartDataResultType] = None,
|
||||
result_format: Optional[utils.ChartDataResultFormat] = None,
|
||||
result_type: Optional[ChartDataResultType] = None,
|
||||
result_format: Optional[ChartDataResultFormat] = None,
|
||||
) -> None:
|
||||
self.datasource = ConnectorRegistry.get_datasource(
|
||||
str(datasource["type"]), int(datasource["id"]), db.session
|
||||
@@ -79,8 +87,8 @@ class QueryContext:
|
||||
self.queries = [QueryObject(**query_obj) for query_obj in queries]
|
||||
self.force = force
|
||||
self.custom_cache_timeout = custom_cache_timeout
|
||||
self.result_type = result_type or utils.ChartDataResultType.FULL
|
||||
self.result_format = result_format or utils.ChartDataResultFormat.JSON
|
||||
self.result_type = result_type or ChartDataResultType.FULL
|
||||
self.result_format = result_format or ChartDataResultFormat.JSON
|
||||
self.cache_values = {
|
||||
"datasource": datasource,
|
||||
"queries": queries,
|
||||
@@ -111,7 +119,7 @@ class QueryContext:
|
||||
# If the datetime format is unix, the parse will use the corresponding
|
||||
# parsing logic
|
||||
if not df.empty:
|
||||
df = utils.normalize_dttm_col(
|
||||
df = normalize_dttm_col(
|
||||
df=df,
|
||||
timestamp_format=timestamp_format,
|
||||
offset=self.datasource.offset,
|
||||
@@ -141,77 +149,24 @@ class QueryContext:
|
||||
df[col] = df[col].infer_objects()
|
||||
|
||||
def get_data(self, df: pd.DataFrame,) -> Union[str, List[Dict[str, Any]]]:
|
||||
if self.result_format == utils.ChartDataResultFormat.CSV:
|
||||
if self.result_format == ChartDataResultFormat.CSV:
|
||||
include_index = not isinstance(df.index, pd.RangeIndex)
|
||||
result = df.to_csv(index=include_index, **config["CSV_EXPORT"])
|
||||
return result or ""
|
||||
|
||||
return df.to_dict(orient="records")
|
||||
|
||||
def get_single_payload(
|
||||
self, query_obj: QueryObject, force_cached: Optional[bool] = False,
|
||||
) -> Dict[str, Any]:
|
||||
"""Return results payload for a single quey"""
|
||||
if self.result_type == utils.ChartDataResultType.QUERY:
|
||||
return {
|
||||
"query": self.datasource.get_query_str(query_obj.to_dict()),
|
||||
"language": self.datasource.query_language,
|
||||
}
|
||||
|
||||
if self.result_type == utils.ChartDataResultType.SAMPLES:
|
||||
row_limit = query_obj.row_limit or math.inf
|
||||
query_obj = copy.copy(query_obj)
|
||||
query_obj.is_timeseries = False
|
||||
query_obj.orderby = []
|
||||
query_obj.groupby = []
|
||||
query_obj.metrics = []
|
||||
query_obj.post_processing = []
|
||||
query_obj.row_limit = min(row_limit, config["SAMPLES_ROW_LIMIT"])
|
||||
query_obj.row_offset = 0
|
||||
query_obj.columns = [o.column_name for o in self.datasource.columns]
|
||||
|
||||
payload = self.get_df_payload(query_obj, force_cached=force_cached)
|
||||
df = payload["df"]
|
||||
status = payload["status"]
|
||||
if status != utils.QueryStatus.FAILED:
|
||||
payload["colnames"] = list(df.columns)
|
||||
payload["coltypes"] = utils.extract_dataframe_dtypes(df)
|
||||
payload["data"] = self.get_data(df)
|
||||
del payload["df"]
|
||||
|
||||
filters = query_obj.filter
|
||||
filter_columns = cast(List[str], [flt.get("col") for flt in filters])
|
||||
columns = set(self.datasource.column_names)
|
||||
applied_time_columns, rejected_time_columns = utils.get_time_filter_status(
|
||||
self.datasource, query_obj.applied_time_extras
|
||||
)
|
||||
payload["applied_filters"] = [
|
||||
{"column": col} for col in filter_columns if col in columns
|
||||
] + applied_time_columns
|
||||
payload["rejected_filters"] = [
|
||||
{"reason": "not_in_datasource", "column": col}
|
||||
for col in filter_columns
|
||||
if col not in columns
|
||||
] + rejected_time_columns
|
||||
|
||||
if (
|
||||
self.result_type == utils.ChartDataResultType.RESULTS
|
||||
and status != utils.QueryStatus.FAILED
|
||||
):
|
||||
return {"data": payload["data"]}
|
||||
return payload
|
||||
|
||||
def get_payload(
|
||||
self,
|
||||
cache_query_context: Optional[bool] = False,
|
||||
force_cached: Optional[bool] = False,
|
||||
self, cache_query_context: Optional[bool] = False, force_cached: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
"""Returns the query results with both metadata and data"""
|
||||
|
||||
# Get all the payloads from the QueryObjects
|
||||
query_results = [
|
||||
self.get_single_payload(query_object, force_cached=force_cached)
|
||||
for query_object in self.queries
|
||||
get_query_results(
|
||||
query_obj.result_type or self.result_type, self, query_obj, force_cached
|
||||
)
|
||||
for query_obj in self.queries
|
||||
]
|
||||
return_value = {"queries": query_results}
|
||||
|
||||
@@ -326,7 +281,7 @@ class QueryContext:
|
||||
payload = viz_obj.get_payload()
|
||||
return payload["data"]
|
||||
except SupersetException as ex:
|
||||
raise QueryObjectValidationError(utils.error_msg_from_exception(ex))
|
||||
raise QueryObjectValidationError(error_msg_from_exception(ex))
|
||||
|
||||
def get_annotation_data(self, query_obj: QueryObject) -> Dict[str, Any]:
|
||||
"""
|
||||
@@ -368,13 +323,13 @@ class QueryContext:
|
||||
df = cache_value["df"]
|
||||
query = cache_value["query"]
|
||||
annotation_data = cache_value.get("annotation_data", {})
|
||||
status = utils.QueryStatus.SUCCESS
|
||||
status = QueryStatus.SUCCESS
|
||||
is_loaded = True
|
||||
stats_logger.incr("loaded_from_cache")
|
||||
except KeyError as ex:
|
||||
logger.exception(ex)
|
||||
logger.error(
|
||||
"Error reading cache: %s", utils.error_msg_from_exception(ex)
|
||||
"Error reading cache: %s", error_msg_from_exception(ex)
|
||||
)
|
||||
logger.info("Serving from cache")
|
||||
|
||||
@@ -390,7 +345,7 @@ class QueryContext:
|
||||
col
|
||||
for col in query_obj.columns
|
||||
+ query_obj.groupby
|
||||
+ utils.get_column_names_from_metrics(query_obj.metrics)
|
||||
+ get_column_names_from_metrics(query_obj.metrics)
|
||||
if col not in self.datasource.column_names and col != DTTM_ALIAS
|
||||
]
|
||||
if invalid_columns:
|
||||
@@ -407,22 +362,22 @@ class QueryContext:
|
||||
df = query_result["df"]
|
||||
annotation_data = self.get_annotation_data(query_obj)
|
||||
|
||||
if status != utils.QueryStatus.FAILED:
|
||||
if status != QueryStatus.FAILED:
|
||||
stats_logger.incr("loaded_from_source")
|
||||
if not self.force:
|
||||
stats_logger.incr("loaded_from_source_without_force")
|
||||
is_loaded = True
|
||||
except QueryObjectValidationError as ex:
|
||||
error_message = str(ex)
|
||||
status = utils.QueryStatus.FAILED
|
||||
status = QueryStatus.FAILED
|
||||
except Exception as ex: # pylint: disable=broad-except
|
||||
logger.exception(ex)
|
||||
if not error_message:
|
||||
error_message = str(ex)
|
||||
status = utils.QueryStatus.FAILED
|
||||
stacktrace = utils.get_stacktrace()
|
||||
status = QueryStatus.FAILED
|
||||
stacktrace = get_stacktrace()
|
||||
|
||||
if is_loaded and cache_key and status != utils.QueryStatus.FAILED:
|
||||
if is_loaded and cache_key and status != QueryStatus.FAILED:
|
||||
set_and_log_cache(
|
||||
cache_manager.data_cache,
|
||||
cache_key,
|
||||
|
||||
Reference in New Issue
Block a user