This commit is contained in:
Beto Dealmeida
2025-12-03 14:57:02 -05:00
parent 48d3f441b8
commit ec018cd842
4 changed files with 33 additions and 40 deletions

View File

@@ -52,6 +52,7 @@ from superset.advanced_data_type.plugins.internet_address import internet_addres
from superset.advanced_data_type.plugins.internet_port import internet_port
from superset.advanced_data_type.types import AdvancedDataType
from superset.constants import CHANGE_ME_SECRET_KEY
from superset.engines.manager import EngineModes
from superset.jinja_context import BaseTemplateProcessor
from superset.key_value.types import JsonKeyValueCodec
from superset.stats_logger import DummyStatsLogger
@@ -266,10 +267,10 @@ SQLALCHEMY_CUSTOM_PASSWORD_STORE = None
# Engine manager mode: "NEW" creates a new engine for every connection (default),
# "SINGLETON" reuses engines with connection pooling
ENGINE_MANAGER_MODE = "NEW"
ENGINE_MANAGER_MODE = EngineModes.NEW
# Cleanup interval for abandoned locks in seconds (default: 5 minutes)
ENGINE_MANAGER_CLEANUP_INTERVAL = 300.0
ENGINE_MANAGER_CLEANUP_INTERVAL = timedelta(minutes=5)
# Automatically start cleanup thread for SINGLETON mode (default: True)
ENGINE_MANAGER_AUTO_START_CLEANUP = True

View File

@@ -20,6 +20,7 @@ import logging
import threading
from collections import defaultdict
from contextlib import contextmanager
from datetime import timedelta
from io import StringIO
from typing import Any, TYPE_CHECKING
@@ -71,7 +72,7 @@ class EngineManager:
def __init__(
self,
mode: EngineModes = EngineModes.NEW,
cleanup_interval: float = 300.0, # 5 minutes default
cleanup_interval: timedelta = timedelta(minutes=5),
) -> None:
self.mode = mode
self.cleanup_interval = cleanup_interval
@@ -100,7 +101,7 @@ class EngineManager:
except Exception as ex:
# Avoid exceptions during garbage collection, but log if possible
try:
logger.warning(f"Error stopping cleanup thread: {ex}")
logger.warning("Error stopping cleanup thread: %s", ex)
except Exception: # noqa: S110
# If logging fails during destruction, we can't do anything
pass
@@ -212,6 +213,10 @@ class EngineManager:
needed, since it needs to connect to the tunnel instead of the original DB. But
that information is only available after the tunnel is created.
"""
# Import here to avoid circular imports
from superset.extensions import security_manager
from superset.utils.feature_flag_manager import FeatureFlagManager
uri = make_url_safe(database.sqlalchemy_uri_decrypted)
extra = database.get_extra(source)
@@ -242,10 +247,6 @@ class EngineManager:
# get effective username
username = database.get_effective_user(uri)
# Import here to avoid circular imports
from superset.extensions import security_manager
from superset.utils.feature_flag_manager import FeatureFlagManager
feature_flag_manager = FeatureFlagManager()
if username and feature_flag_manager.is_feature_enabled(
"IMPERSONATE_WITH_EMAIL_PREFIX"
@@ -322,7 +323,6 @@ class EngineManager:
user_id,
)
tunnel = None
if database.ssh_tunnel:
tunnel = self._get_tunnel(database.ssh_tunnel, uri)
uri = uri.set(
@@ -444,7 +444,8 @@ class EngineManager:
)
self._cleanup_thread.start()
logger.info(
f"Started cleanup thread with {self.cleanup_interval}s interval"
"Started cleanup thread with %ds interval",
self.cleanup_interval.total_seconds(),
)
def stop_cleanup_thread(self) -> None:
@@ -475,7 +476,9 @@ class EngineManager:
logger.exception("Error during background cleanup")
# Use wait() instead of sleep() to allow for immediate shutdown
if self._cleanup_stop_event.wait(timeout=self.cleanup_interval):
if self._cleanup_stop_event.wait(
timeout=self.cleanup_interval.total_seconds()
):
break # Stop event was set
def cleanup(self) -> None:
@@ -502,7 +505,8 @@ class EngineManager:
if abandoned_engine_locks:
logger.debug(
f"Cleaned up {len(abandoned_engine_locks)} abandoned engine locks"
"Cleaned up %d abandoned engine locks",
len(abandoned_engine_locks),
)
# Clean up tunnel locks
@@ -513,7 +517,8 @@ class EngineManager:
if abandoned_tunnel_locks:
logger.debug(
f"Cleaned up {len(abandoned_tunnel_locks)} abandoned tunnel locks"
"Cleaned up %d abandoned tunnel locks",
len(abandoned_tunnel_locks),
)
def _add_disposal_listener(self, engine: Engine, engine_key: EngineKey) -> None:
@@ -522,7 +527,9 @@ class EngineManager:
try:
# `pop` is atomic -- no lock needed
if self._engines.pop(engine_key, None):
logger.info(f"Engine disposed and removed from cache: {engine_key}")
logger.info(
"Engine disposed and removed from cache: %s", engine_key
)
self._engine_locks.pop(engine_key, None)
except Exception as ex:
logger.error(

View File

@@ -45,24 +45,12 @@ class EngineManagerExtension:
Initialize the EngineManager with Flask app configuration.
"""
# Get configuration values with defaults
mode_name = app.config.get("ENGINE_MANAGER_MODE", "NEW")
cleanup_interval = app.config.get("ENGINE_MANAGER_CLEANUP_INTERVAL", 300.0)
auto_start_cleanup = app.config.get("ENGINE_MANAGER_AUTO_START_CLEANUP", True)
# Convert mode string to enum
try:
mode = EngineModes[mode_name.upper()]
except KeyError:
logger.warning(
f"Invalid ENGINE_MANAGER_MODE '{mode_name}', defaulting to NEW"
)
mode = EngineModes.NEW
mode = app.config["ENGINE_MANAGER_MODE"]
cleanup_interval = app.config["ENGINE_MANAGER_CLEANUP_INTERVAL"]
auto_start_cleanup = app.config["ENGINE_MANAGER_AUTO_START_CLEANUP"]
# Create the engine manager
self.engine_manager = EngineManager(
mode=mode,
cleanup_interval=cleanup_interval,
)
self.engine_manager = EngineManager(mode, cleanup_interval)
# Start cleanup thread if requested and in SINGLETON mode
if auto_start_cleanup and mode == EngineModes.SINGLETON:
@@ -83,8 +71,9 @@ class EngineManagerExtension:
atexit.register(shutdown_engine_manager)
logger.info(
f"Initialized EngineManager with mode={mode.name}, "
f"cleanup_interval={cleanup_interval}s"
"Initialized EngineManager with mode=%s, cleanup_interval=%ds",
mode,
cleanup_interval.total_seconds(),
)
@property

View File

@@ -136,9 +136,7 @@ class ConfigurationMethod(StrEnum):
DYNAMIC_FORM = "dynamic_form"
class Database(
CoreDatabase, AuditMixinNullable, ImportExportMixin
): # pylint: disable=too-many-public-methods
class Database(CoreDatabase, AuditMixinNullable, ImportExportMixin): # pylint: disable=too-many-public-methods
"""An ORM object that stores Database related information"""
__tablename__ = "dbs"
@@ -415,7 +413,9 @@ class Database(
return (
username
if (username := get_username())
else object_url.username if self.impersonate_user else None
else object_url.username
if self.impersonate_user
else None
)
@contextmanager
@@ -431,9 +431,6 @@ class Database(
This method will return a context manager for a SQLAlchemy engine. The engine
manager handles connection pooling, SSH tunnels, and other connection details
based on the configured mode (NEW or SINGLETON).
Note: The nullpool parameter is kept for backwards compatibility but is ignored.
Pool configuration is now read from the database's extra configuration.
"""
# Import here to avoid circular imports
from superset.extensions import engine_manager_extension
@@ -470,7 +467,6 @@ class Database(
self,
catalog: str | None = None,
schema: str | None = None,
nullpool: bool = True, # Kept for backwards compatibility, but ignored
source: utils.QuerySource | None = None,
) -> Connection:
with self.get_sqla_engine(