mirror of
https://github.com/apache/superset.git
synced 2026-05-07 08:54:23 +00:00
453 lines
15 KiB
Python
453 lines
15 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Semantic layer models."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from collections.abc import Hashable
|
|
from dataclasses import dataclass
|
|
from functools import cached_property
|
|
from typing import Any, TYPE_CHECKING
|
|
|
|
import pyarrow as pa
|
|
from flask_appbuilder import Model
|
|
from sqlalchemy import Column, ForeignKey, Integer, String, Text
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy_utils import UUIDType
|
|
from sqlalchemy_utils.types.json import JSONType
|
|
from superset_core.semantic_layers.layer import (
|
|
SemanticLayer as SemanticLayerABC,
|
|
)
|
|
from superset_core.semantic_layers.view import (
|
|
SemanticView as SemanticViewABC,
|
|
)
|
|
|
|
from superset.common.query_object import QueryObject
|
|
from superset.explorables.base import TimeGrainDict
|
|
from superset.extensions import encrypted_field_factory
|
|
from superset.models.helpers import AuditMixinNullable, QueryResult
|
|
from superset.semantic_layers.mapper import get_results
|
|
from superset.semantic_layers.registry import registry
|
|
from superset.utils import json
|
|
from superset.utils.core import GenericDataType
|
|
|
|
if TYPE_CHECKING:
|
|
from superset.superset_typing import ExplorableData, QueryObjectDict
|
|
|
|
|
|
def get_column_type(semantic_type: pa.DataType) -> GenericDataType:
|
|
"""
|
|
Map Arrow data types to generic data types.
|
|
"""
|
|
if pa.types.is_date(semantic_type) or pa.types.is_timestamp(semantic_type):
|
|
return GenericDataType.TEMPORAL
|
|
if pa.types.is_time(semantic_type):
|
|
return GenericDataType.TEMPORAL
|
|
if (
|
|
pa.types.is_integer(semantic_type)
|
|
or pa.types.is_floating(semantic_type)
|
|
or pa.types.is_decimal(semantic_type)
|
|
or pa.types.is_duration(semantic_type)
|
|
):
|
|
return GenericDataType.NUMERIC
|
|
if pa.types.is_boolean(semantic_type):
|
|
return GenericDataType.BOOLEAN
|
|
return GenericDataType.STRING
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MetricMetadata:
|
|
metric_name: str
|
|
expression: str
|
|
verbose_name: str | None = None
|
|
description: str | None = None
|
|
d3format: str | None = None
|
|
currency: dict[str, Any] | None = None
|
|
warning_text: str | None = None
|
|
certified_by: str | None = None
|
|
certification_details: str | None = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ColumnMetadata:
|
|
column_name: str
|
|
type: str
|
|
is_dttm: bool
|
|
verbose_name: str | None = None
|
|
description: str | None = None
|
|
groupby: bool = True
|
|
filterable: bool = True
|
|
expression: str | None = None
|
|
python_date_format: str | None = None
|
|
advanced_data_type: str | None = None
|
|
extra: str | None = None
|
|
|
|
|
|
class SemanticLayer(AuditMixinNullable, Model):
|
|
"""
|
|
Semantic layer model.
|
|
|
|
A semantic layer provides an abstraction over data sources,
|
|
allowing users to query data through a semantic interface.
|
|
"""
|
|
|
|
__tablename__ = "semantic_layers"
|
|
|
|
uuid = Column(UUIDType(binary=True), primary_key=True, default=uuid.uuid4)
|
|
|
|
# Core fields
|
|
name = Column(String(250), nullable=False)
|
|
description = Column(Text, nullable=True)
|
|
type = Column(String(250), nullable=False) # snowflake, etc
|
|
|
|
configuration = Column(encrypted_field_factory.create(JSONType), default="{}")
|
|
# Tracks the schema version of the configuration JSON field to aid with
|
|
# migrations as the configuration schema evolves over time.
|
|
configuration_version = Column(Integer, nullable=False, default=1)
|
|
cache_timeout = Column(Integer, nullable=True)
|
|
|
|
# Semantic views relationship
|
|
semantic_views: list[SemanticView] = relationship(
|
|
"SemanticView",
|
|
back_populates="semantic_layer",
|
|
cascade="all, delete-orphan",
|
|
passive_deletes=True,
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
return self.name or str(self.uuid)
|
|
|
|
@cached_property
|
|
def implementation(
|
|
self,
|
|
) -> SemanticLayerABC[Any, SemanticViewABC]:
|
|
"""
|
|
Return semantic layer implementation.
|
|
"""
|
|
# TODO (betodealmeida):
|
|
# return extension_manager.get_contribution("semanticLayers", self.type)
|
|
class_ = registry[self.type]
|
|
return class_.from_configuration(json.loads(self.configuration))
|
|
|
|
|
|
class SemanticView(AuditMixinNullable, Model):
|
|
"""
|
|
Semantic view model.
|
|
|
|
A semantic view represents a queryable view within a semantic layer.
|
|
"""
|
|
|
|
__tablename__ = "semantic_views"
|
|
|
|
# Use integer as the primary key for cross-database auto-increment
|
|
# compatibility (sa.Identity() is not supported in MySQL or SQLite).
|
|
# The uuid column is a secondary unique identifier used in URLs and perms.
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
uuid = Column(UUIDType(binary=True), unique=True, default=uuid.uuid4)
|
|
|
|
# Core fields
|
|
name = Column(String(250), nullable=False)
|
|
description = Column(Text, nullable=True)
|
|
|
|
configuration = Column(encrypted_field_factory.create(JSONType), default="{}")
|
|
# Tracks the schema version of the configuration JSON field to aid with
|
|
# migrations as the configuration schema evolves over time.
|
|
configuration_version = Column(Integer, nullable=False, default=1)
|
|
cache_timeout = Column(Integer, nullable=True)
|
|
|
|
# Semantic layer relationship
|
|
semantic_layer_uuid = Column(
|
|
UUIDType(binary=True),
|
|
ForeignKey("semantic_layers.uuid", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
semantic_layer: SemanticLayer = relationship(
|
|
"SemanticLayer",
|
|
back_populates="semantic_views",
|
|
foreign_keys=[semantic_layer_uuid],
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
return self.name or str(self.uuid)
|
|
|
|
@cached_property
|
|
def implementation(self) -> SemanticViewABC:
|
|
"""
|
|
Return semantic view implementation.
|
|
"""
|
|
return self.semantic_layer.implementation.get_semantic_view(
|
|
self.name,
|
|
json.loads(self.configuration),
|
|
)
|
|
|
|
# =========================================================================
|
|
# Explorable protocol implementation
|
|
# =========================================================================
|
|
|
|
def get_query_result(self, query_object: QueryObject) -> QueryResult:
|
|
return get_results(query_object)
|
|
|
|
def get_query_str(self, query_obj: QueryObjectDict) -> str:
|
|
return "Not implemented for semantic layers"
|
|
|
|
@property
|
|
def table_name(self) -> str:
|
|
return self.name
|
|
|
|
@property
|
|
def kind(self) -> str:
|
|
return "semantic_view"
|
|
|
|
@property
|
|
def uid(self) -> str:
|
|
return self.implementation.uid()
|
|
|
|
@property
|
|
def type(self) -> str:
|
|
return "semantic_view"
|
|
|
|
@property
|
|
def metrics(self) -> list[MetricMetadata]:
|
|
return [
|
|
MetricMetadata(
|
|
metric_name=metric.name,
|
|
expression=metric.definition,
|
|
description=metric.description,
|
|
)
|
|
for metric in self.implementation.get_metrics()
|
|
]
|
|
|
|
@property
|
|
def columns(self) -> list[ColumnMetadata]:
|
|
return [
|
|
ColumnMetadata(
|
|
column_name=dimension.name,
|
|
type=str(dimension.type),
|
|
is_dttm=pa.types.is_date(dimension.type)
|
|
or pa.types.is_time(dimension.type)
|
|
or pa.types.is_timestamp(dimension.type),
|
|
description=dimension.description,
|
|
expression=dimension.definition,
|
|
extra=json.dumps(
|
|
{"grain": dimension.grain.name if dimension.grain else None}
|
|
),
|
|
)
|
|
for dimension in self.implementation.get_dimensions()
|
|
]
|
|
|
|
@property
|
|
def column_names(self) -> list[str]:
|
|
return [dimension.name for dimension in self.implementation.get_dimensions()]
|
|
|
|
@property
|
|
def data(self) -> ExplorableData:
|
|
return {
|
|
# core
|
|
"id": self.id,
|
|
"uid": self.uid,
|
|
"type": "semantic_view",
|
|
"name": self.name,
|
|
"columns": [
|
|
{
|
|
"advanced_data_type": None,
|
|
"certification_details": None,
|
|
"certified_by": None,
|
|
"column_name": dimension.name,
|
|
"description": dimension.description,
|
|
"expression": dimension.definition,
|
|
"filterable": True,
|
|
"groupby": True,
|
|
"id": None,
|
|
"uuid": None,
|
|
"is_certified": False,
|
|
"is_dttm": pa.types.is_date(dimension.type)
|
|
or pa.types.is_time(dimension.type)
|
|
or pa.types.is_timestamp(dimension.type),
|
|
"python_date_format": None,
|
|
"type": str(dimension.type),
|
|
"type_generic": get_column_type(dimension.type),
|
|
"verbose_name": None,
|
|
"warning_markdown": None,
|
|
}
|
|
for dimension in self.implementation.get_dimensions()
|
|
],
|
|
"metrics": [
|
|
{
|
|
"certification_details": None,
|
|
"certified_by": None,
|
|
"d3format": None,
|
|
"description": metric.description,
|
|
"expression": metric.definition,
|
|
"id": None,
|
|
"uuid": None,
|
|
"is_certified": False,
|
|
"metric_name": metric.name,
|
|
"warning_markdown": None,
|
|
"warning_text": None,
|
|
"verbose_name": None,
|
|
}
|
|
for metric in self.implementation.get_metrics()
|
|
],
|
|
"database": {},
|
|
"parent": {"name": self.semantic_layer.name},
|
|
# UI features
|
|
"verbose_map": {},
|
|
"order_by_choices": [],
|
|
"filter_select": True,
|
|
"filter_select_enabled": True,
|
|
"sql": None,
|
|
"select_star": None,
|
|
"owners": [],
|
|
"description": self.description,
|
|
"table_name": self.name,
|
|
"column_types": [
|
|
get_column_type(dimension.type)
|
|
for dimension in self.implementation.get_dimensions()
|
|
],
|
|
"column_names": [
|
|
dimension.name for dimension in self.implementation.get_dimensions()
|
|
],
|
|
# rare
|
|
"column_formats": {},
|
|
"datasource_name": self.name,
|
|
"perm": self.perm,
|
|
"offset": self.offset,
|
|
"cache_timeout": self.cache_timeout,
|
|
"params": None,
|
|
# sql-specific
|
|
"schema": None,
|
|
"catalog": None,
|
|
"main_dttm_col": None,
|
|
"time_grain_sqla": [],
|
|
"granularity_sqla": [],
|
|
"fetch_values_predicate": None,
|
|
"template_params": None,
|
|
"is_sqllab_view": False,
|
|
"extra": None,
|
|
"always_filter_main_dttm": False,
|
|
"normalize_columns": False,
|
|
"edit_url": "",
|
|
"default_endpoint": None,
|
|
"folders": [],
|
|
"health_check_message": None,
|
|
}
|
|
|
|
def data_for_slices(self, slices: list[Any]) -> ExplorableData:
|
|
return self.data
|
|
|
|
def get_extra_cache_keys(self, query_obj: QueryObjectDict) -> list[Hashable]:
|
|
return []
|
|
|
|
@property
|
|
def perm(self) -> str:
|
|
return self.semantic_layer_uuid.hex + "::" + self.uuid.hex
|
|
|
|
@property
|
|
def catalog_perm(self) -> str | None:
|
|
return None
|
|
|
|
@property
|
|
def schema_perm(self) -> str | None:
|
|
return None
|
|
|
|
@property
|
|
def schema(self) -> str | None:
|
|
return None
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
return f"/semantic_view/{self.uuid}/"
|
|
|
|
@property
|
|
def explore_url(self) -> str:
|
|
return f"/explore/?datasource_type=semantic_view&datasource_id={self.id}"
|
|
|
|
@property
|
|
def offset(self) -> int:
|
|
# always return datetime as UTC
|
|
return 0
|
|
|
|
def get_time_grains(self) -> list[TimeGrainDict]:
|
|
return [
|
|
{
|
|
"name": dimension.grain.name,
|
|
"function": "",
|
|
"duration": dimension.grain.representation,
|
|
}
|
|
for dimension in self.implementation.get_dimensions()
|
|
if dimension.grain
|
|
]
|
|
|
|
def has_drill_by_columns(self, column_names: list[str]) -> bool:
|
|
dimension_names = {
|
|
dimension.name for dimension in self.implementation.get_dimensions()
|
|
}
|
|
return all(column_name in dimension_names for column_name in column_names)
|
|
|
|
@property
|
|
def is_rls_supported(self) -> bool:
|
|
return False
|
|
|
|
def raise_for_access(self) -> None:
|
|
"""No-op: semantic view access control is not yet implemented."""
|
|
|
|
@property
|
|
def query_language(self) -> str | None:
|
|
return None
|
|
|
|
def get_compatible_metrics(
|
|
self,
|
|
selected_metrics: list[str],
|
|
selected_dimensions: list[str],
|
|
) -> list[str]:
|
|
"""
|
|
Return metric names compatible with the current selection.
|
|
|
|
Translates string names to semantic-layer objects, delegates to the
|
|
view implementation, and translates the result back to names.
|
|
"""
|
|
metric_map = {m.name: m for m in self.implementation.get_metrics()}
|
|
dim_map = {d.name: d for d in self.implementation.get_dimensions()}
|
|
sel_metrics = {metric_map[n] for n in selected_metrics if n in metric_map}
|
|
sel_dims = {dim_map[n] for n in selected_dimensions if n in dim_map}
|
|
compatible = self.implementation.get_compatible_metrics(
|
|
sel_metrics, sel_dims
|
|
)
|
|
return [m.name for m in compatible]
|
|
|
|
def get_compatible_dimensions(
|
|
self,
|
|
selected_metrics: list[str],
|
|
selected_dimensions: list[str],
|
|
) -> list[str]:
|
|
"""
|
|
Return dimension names compatible with the current selection.
|
|
|
|
Translates string names to semantic-layer objects, delegates to the
|
|
view implementation, and translates the result back to names.
|
|
"""
|
|
metric_map = {m.name: m for m in self.implementation.get_metrics()}
|
|
dim_map = {d.name: d for d in self.implementation.get_dimensions()}
|
|
sel_metrics = {metric_map[n] for n in selected_metrics if n in metric_map}
|
|
sel_dims = {dim_map[n] for n in selected_dimensions if n in dim_map}
|
|
compatible = self.implementation.get_compatible_dimensions(
|
|
sel_metrics, sel_dims
|
|
)
|
|
return [d.name for d in compatible]
|