Files
superset2/superset/views/base.py
2025-07-31 19:27:42 -07:00

575 lines
19 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import functools
import logging
import os
import traceback
from datetime import datetime
from typing import Any, Callable
from babel import Locale
from flask import (
abort,
current_app as app,
flash,
g,
get_flashed_messages,
redirect,
Response,
session,
url_for,
)
from flask_appbuilder import BaseView, Model, ModelView
from flask_appbuilder.actions import action
from flask_appbuilder.const import AUTH_OAUTH, AUTH_OID
from flask_appbuilder.forms import DynamicForm
from flask_appbuilder.models.sqla.filters import BaseFilter
from flask_appbuilder.security.sqla.models import User
from flask_appbuilder.widgets import ListWidget
from flask_babel import get_locale, gettext as __
from flask_jwt_extended.exceptions import NoAuthorizationError
from flask_wtf.form import FlaskForm
from sqlalchemy.orm import Query
from wtforms.fields.core import Field, UnboundField
from superset import (
appbuilder,
db,
get_feature_flags,
is_feature_enabled,
security_manager,
)
from superset.connectors.sqla import models
from superset.db_engine_specs import get_available_engine_specs
from superset.db_engine_specs.gsheets import GSheetsEngineSpec
from superset.extensions import cache_manager
from superset.reports.models import ReportRecipientType
from superset.superset_typing import FlaskResponse
from superset.themes.utils import (
is_valid_theme,
is_valid_theme_settings,
)
from superset.utils import core as utils, json
from superset.utils.filters import get_dataset_access_filters
from superset.views.error_handling import json_error_response
from .utils import bootstrap_user_data, get_config_value
DEFAULT_THEME_SETTINGS = {
"enforced": False,
"allowSwitching": True,
"allowOSPreference": True,
}
FRONTEND_CONF_KEYS = (
"SUPERSET_WEBSERVER_TIMEOUT",
"SUPERSET_DASHBOARD_POSITION_DATA_LIMIT",
"SUPERSET_DASHBOARD_PERIODICAL_REFRESH_LIMIT",
"SUPERSET_DASHBOARD_PERIODICAL_REFRESH_WARNING_MESSAGE",
"ENABLE_JAVASCRIPT_CONTROLS",
"DEFAULT_SQLLAB_LIMIT",
"DEFAULT_VIZ_TYPE",
"SQL_MAX_ROW",
"SUPERSET_WEBSERVER_DOMAINS",
"SQLLAB_SAVE_WARNING_MESSAGE",
"SQLLAB_DEFAULT_DBID",
"DISPLAY_MAX_ROW",
"GLOBAL_ASYNC_QUERIES_TRANSPORT",
"GLOBAL_ASYNC_QUERIES_POLLING_DELAY",
"SQL_VALIDATORS_BY_ENGINE",
"SQLALCHEMY_DOCS_URL",
"SQLALCHEMY_DISPLAY_TEXT",
"GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL",
"DASHBOARD_AUTO_REFRESH_MODE",
"DASHBOARD_AUTO_REFRESH_INTERVALS",
"DASHBOARD_VIRTUALIZATION",
"SCHEDULED_QUERIES",
"EXCEL_EXTENSIONS",
"CSV_EXTENSIONS",
"COLUMNAR_EXTENSIONS",
"ALLOWED_EXTENSIONS",
"SAMPLES_ROW_LIMIT",
"DEFAULT_TIME_FILTER",
"HTML_SANITIZATION",
"HTML_SANITIZATION_SCHEMA_EXTENSIONS",
"WELCOME_PAGE_LAST_TAB",
"VIZ_TYPE_DENYLIST",
"ALERT_REPORTS_DEFAULT_CRON_VALUE",
"ALERT_REPORTS_DEFAULT_RETENTION",
"ALERT_REPORTS_DEFAULT_WORKING_TIMEOUT",
"NATIVE_FILTER_DEFAULT_ROW_LIMIT",
"SUPERSET_CLIENT_RETRY_ATTEMPTS",
"SUPERSET_CLIENT_RETRY_DELAY",
"SUPERSET_CLIENT_RETRY_BACKOFF_MULTIPLIER",
"SUPERSET_CLIENT_RETRY_MAX_DELAY",
"SUPERSET_CLIENT_RETRY_JITTER_MAX",
"SUPERSET_CLIENT_RETRY_STATUS_CODES",
"PREVENT_UNSAFE_DEFAULT_URLS_ON_DATASET",
"JWT_ACCESS_CSRF_COOKIE_NAME",
"SQLLAB_QUERY_RESULT_TIMEOUT",
"SYNC_DB_PERMISSIONS_IN_ASYNC_MODE",
"TABLE_VIZ_MAX_ROW_SERVER",
)
logger = logging.getLogger(__name__)
def get_error_msg() -> str:
if app.config.get("SHOW_STACKTRACE"):
error_msg = traceback.format_exc()
else:
error_msg = "FATAL ERROR \n"
error_msg += (
"Stacktrace is hidden. Change the SHOW_STACKTRACE "
"configuration setting to enable it"
)
return error_msg
def json_success(json_msg: str, status: int = 200) -> FlaskResponse:
return Response(json_msg, status=status, mimetype="application/json")
def data_payload_response(payload_json: str, has_error: bool = False) -> FlaskResponse:
status = 400 if has_error else 200
return json_success(payload_json, status=status)
def generate_download_headers(
extension: str, filename: str | None = None
) -> dict[str, Any]:
filename = filename if filename else datetime.now().strftime("%Y%m%d_%H%M%S")
content_disp = f"attachment; filename={filename}.{extension}"
headers = {"Content-Disposition": content_disp}
return headers
def deprecated(
eol_version: str = "5.0.0",
new_target: str | None = None,
) -> Callable[[Callable[..., FlaskResponse]], Callable[..., FlaskResponse]]:
"""
A decorator to set an API endpoint from SupersetView has deprecated.
Issues a log warning
"""
def _deprecated(f: Callable[..., FlaskResponse]) -> Callable[..., FlaskResponse]:
def wraps(self: BaseSupersetView, *args: Any, **kwargs: Any) -> FlaskResponse:
message = (
"%s.%s "
"This API endpoint is deprecated and will be removed in version %s"
)
logger_args = [
self.__class__.__name__,
f.__name__,
eol_version,
]
if new_target:
message += " . Use the following API endpoint instead: %s"
logger_args.append(new_target)
logger.warning(message, *logger_args)
return f(self, *args, **kwargs)
return functools.update_wrapper(wraps, f)
return _deprecated
def api(f: Callable[..., FlaskResponse]) -> Callable[..., FlaskResponse]:
"""
A decorator to label an endpoint as an API. Catches uncaught exceptions and
return the response in the JSON format
"""
def wraps(self: BaseSupersetView, *args: Any, **kwargs: Any) -> FlaskResponse:
try:
return f(self, *args, **kwargs)
except NoAuthorizationError:
logger.warning("Api failed- no authorization", exc_info=True)
return json_error_response(get_error_msg(), status=401)
except Exception as ex: # pylint: disable=broad-except
logger.exception(ex)
return json_error_response(get_error_msg())
return functools.update_wrapper(wraps, f)
class BaseSupersetView(BaseView):
@staticmethod
def json_response(obj: Any, status: int = 200) -> FlaskResponse:
return Response(
json.dumps(obj, default=json.json_int_dttm_ser, ignore_nan=True),
status=status,
mimetype="application/json",
)
def render_app_template(
self, extra_bootstrap_data: dict[str, Any] | None = None
) -> FlaskResponse:
payload = {
"user": bootstrap_user_data(g.user, include_perms=True),
"common": common_bootstrap_payload(),
**(extra_bootstrap_data or {}),
}
return self.render_template(
"superset/spa.html",
entry="spa",
bootstrap_data=json.dumps(
payload, default=json.pessimistic_json_iso_dttm_ser
),
)
def get_environment_tag() -> dict[str, Any]:
# Whether flask is in debug mode (--debug)
debug = app.config["DEBUG"]
# Getting the configuration option for ENVIRONMENT_TAG_CONFIG
env_tag_config = app.config["ENVIRONMENT_TAG_CONFIG"]
# These are the predefined templates define in the config
env_tag_templates = env_tag_config.get("values")
# This is the environment variable name from which to select the template
# default is SUPERSET_ENV (from FLASK_ENV in previous versions)
env_envvar = env_tag_config.get("variable")
# this is the actual name we want to use
env_name = os.environ.get(env_envvar)
if not env_name or env_name not in env_tag_templates.keys():
env_name = "debug" if debug else None
env_tag = env_tag_templates.get(env_name)
return env_tag or {}
def menu_data(user: User) -> dict[str, Any]:
languages = {
lang: {**appbuilder.languages[lang], "url": appbuilder.get_url_for_locale(lang)}
for lang in appbuilder.languages
}
if callable(brand_text := app.config["LOGO_RIGHT_TEXT"]):
brand_text = brand_text()
return {
"menu": appbuilder.menu.get_data(),
"brand": {
"path": app.config["LOGO_TARGET_PATH"] or url_for("Superset.welcome"),
"icon": appbuilder.app_icon,
"alt": appbuilder.app_name,
"tooltip": app.config["LOGO_TOOLTIP"],
"text": brand_text,
},
"environment_tag": get_environment_tag(),
"navbar_right": {
# show the watermark if the default app icon has been overridden
"show_watermark": ("superset-logo-horiz" not in appbuilder.app_icon),
"bug_report_url": app.config["BUG_REPORT_URL"],
"bug_report_icon": app.config["BUG_REPORT_ICON"],
"bug_report_text": app.config["BUG_REPORT_TEXT"],
"documentation_url": app.config["DOCUMENTATION_URL"],
"documentation_icon": app.config["DOCUMENTATION_ICON"],
"documentation_text": app.config["DOCUMENTATION_TEXT"],
"version_string": app.config["VERSION_STRING"],
"version_sha": app.config["VERSION_SHA"],
"build_number": app.config["BUILD_NUMBER"],
"languages": languages,
"show_language_picker": len(languages) > 1,
"user_is_anonymous": user.is_anonymous,
"user_info_url": (
None if is_feature_enabled("MENU_HIDE_USER_INFO") else "/user_info/"
),
"user_logout_url": appbuilder.get_url_for_logout,
"user_login_url": appbuilder.get_url_for_login,
"locale": session.get("locale", "en"),
},
}
def get_theme_bootstrap_data() -> dict[str, Any]:
"""
Returns the theme data to be sent to the client.
"""
# Get theme configs
default_theme_config = get_config_value("THEME_DEFAULT")
dark_theme_config = get_config_value("THEME_DARK")
theme_settings = get_config_value("THEME_SETTINGS")
# Validate theme configurations
default_theme = default_theme_config
if not is_valid_theme(default_theme):
logger.warning(
"Invalid THEME_DEFAULT configuration: %s, using empty theme",
default_theme_config,
)
default_theme = {}
dark_theme = dark_theme_config
if not is_valid_theme(dark_theme):
logger.warning(
"Invalid THEME_DARK configuration: %s, using empty theme",
dark_theme_config,
)
dark_theme = {}
if not is_valid_theme_settings(theme_settings):
logger.warning(
"Invalid THEME_SETTINGS configuration: %s, using defaults", theme_settings
)
theme_settings = DEFAULT_THEME_SETTINGS
return {
"theme": {
"default": default_theme,
"dark": dark_theme,
"settings": theme_settings,
}
}
@cache_manager.cache.memoize(timeout=60)
def cached_common_bootstrap_data( # pylint: disable=unused-argument
user_id: int | None, locale: Locale | None
) -> dict[str, Any]:
"""Common data always sent to the client
The function is memoized as the return value only changes when user permissions
or configuration values change.
"""
# should not expose API TOKEN to frontend
frontend_config = {
k: (
list(app.config.get(k))
if isinstance(app.config.get(k), set)
else app.config.get(k)
)
for k in FRONTEND_CONF_KEYS
}
if app.config.get("SLACK_API_TOKEN"):
frontend_config["ALERT_REPORTS_NOTIFICATION_METHODS"] = [
ReportRecipientType.EMAIL,
ReportRecipientType.SLACK,
ReportRecipientType.SLACKV2,
]
else:
frontend_config["ALERT_REPORTS_NOTIFICATION_METHODS"] = [
ReportRecipientType.EMAIL,
]
# verify client has google sheets installed
available_specs = get_available_engine_specs()
frontend_config["HAS_GSHEETS_INSTALLED"] = (
GSheetsEngineSpec in available_specs
and bool(available_specs[GSheetsEngineSpec])
)
language = locale.language if locale else "en"
auth_type = app.config["AUTH_TYPE"]
auth_user_registration = app.config["AUTH_USER_REGISTRATION"]
frontend_config["AUTH_USER_REGISTRATION"] = auth_user_registration
should_show_recaptcha = auth_user_registration and (auth_type != AUTH_OAUTH)
if auth_user_registration:
frontend_config["AUTH_USER_REGISTRATION_ROLE"] = app.config[
"AUTH_USER_REGISTRATION_ROLE"
]
if should_show_recaptcha:
frontend_config["RECAPTCHA_PUBLIC_KEY"] = app.config["RECAPTCHA_PUBLIC_KEY"]
frontend_config["AUTH_TYPE"] = auth_type
if auth_type == AUTH_OAUTH:
oauth_providers = []
for provider in appbuilder.sm.oauth_providers:
oauth_providers.append(
{
"name": provider["name"],
"icon": provider["icon"],
}
)
frontend_config["AUTH_PROVIDERS"] = oauth_providers
if auth_type == AUTH_OID:
oid_providers = []
for provider in appbuilder.sm.openid_providers:
oid_providers.append(provider)
frontend_config["AUTH_PROVIDERS"] = oid_providers
bootstrap_data = {
"application_root": app.config["APPLICATION_ROOT"],
"static_assets_prefix": app.config["STATIC_ASSETS_PREFIX"],
"conf": frontend_config,
"locale": language,
"d3_format": app.config.get("D3_FORMAT"),
"d3_time_format": app.config.get("D3_TIME_FORMAT"),
"currencies": app.config.get("CURRENCIES"),
"deckgl_tiles": app.config.get("DECKGL_BASE_MAP"),
"feature_flags": get_feature_flags(),
"extra_sequential_color_schemes": app.config["EXTRA_SEQUENTIAL_COLOR_SCHEMES"],
"extra_categorical_color_schemes": app.config[
"EXTRA_CATEGORICAL_COLOR_SCHEMES"
],
"menu_data": menu_data(g.user),
}
bootstrap_data.update(app.config["COMMON_BOOTSTRAP_OVERRIDES_FUNC"](bootstrap_data))
bootstrap_data.update(get_theme_bootstrap_data())
return bootstrap_data
def common_bootstrap_payload() -> dict[str, Any]:
return {
**cached_common_bootstrap_data(utils.get_user_id(), get_locale()),
"flash_messages": get_flashed_messages(with_categories=True),
}
class SupersetListWidget(ListWidget): # pylint: disable=too-few-public-methods
template = "superset/fab_overrides/list.html"
class SupersetModelView(ModelView):
page_size = 100
list_widget = SupersetListWidget
def render_app_template(self) -> FlaskResponse:
payload = {
"user": bootstrap_user_data(g.user, include_perms=True),
"common": common_bootstrap_payload(),
}
return self.render_template(
"superset/spa.html",
entry="spa",
bootstrap_data=json.dumps(
payload, default=json.pessimistic_json_iso_dttm_ser
),
)
class DeleteMixin: # pylint: disable=too-few-public-methods
def _delete(self: BaseView, primary_key: int) -> None:
"""
Delete function logic, override to implement different logic
deletes the record with primary_key = primary_key
:param primary_key:
record primary key to delete
"""
item = self.datamodel.get(primary_key, self._base_filters)
if not item:
abort(404)
try:
self.pre_delete(item)
except Exception as ex: # pylint: disable=broad-except
flash(str(ex), "danger")
else:
view_menu = security_manager.find_view_menu(item.get_perm())
pvs = (
security_manager.get_session.query(
security_manager.permissionview_model
)
.filter_by(view_menu=view_menu)
.all()
)
if self.datamodel.delete(item):
self.post_delete(item)
for pv in pvs:
security_manager.get_session.delete(pv)
if view_menu:
security_manager.get_session.delete(view_menu)
db.session.commit() # pylint: disable=consider-using-transaction
flash(*self.datamodel.message)
self.update_redirect()
@action(
"muldelete", __("Delete"), __("Delete all Really?"), "fa-trash", single=False
)
def muldelete(self: BaseView, items: list[Model]) -> FlaskResponse:
if not items:
abort(404)
for item in items:
try:
self.pre_delete(item)
except Exception as ex: # pylint: disable=broad-except
flash(str(ex), "danger")
else:
self._delete(item.id)
self.update_redirect()
return redirect(self.get_redirect())
class DatasourceFilter(BaseFilter): # pylint: disable=too-few-public-methods
def apply(self, query: Query, value: Any) -> Query:
if security_manager.can_access_all_datasources():
return query
query = query.join(
models.Database,
models.Database.id == self.model.database_id,
)
return query.filter(get_dataset_access_filters(self.model))
class CsvResponse(Response):
"""
Override Response to take into account csv encoding from config.py
"""
charset = app.config["CSV_EXPORT"].get("encoding", "utf-8")
default_mimetype = "text/csv"
class XlsxResponse(Response):
"""
Override Response to use xlsx mimetype
"""
charset = "utf-8"
default_mimetype = (
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
def bind_field(
_: Any, form: DynamicForm, unbound_field: UnboundField, options: dict[Any, Any]
) -> Field:
"""
Customize how fields are bound by stripping all whitespace.
:param form: The form
:param unbound_field: The unbound field
:param options: The field options
:returns: The bound field
"""
filters = unbound_field.kwargs.get("filters", [])
filters.append(lambda x: x.strip() if isinstance(x, str) else x)
return unbound_field.bind(form=form, filters=filters, **options)
FlaskForm.Meta.bind_field = bind_field