mirror of
https://github.com/apache/superset.git
synced 2026-05-06 16:34:32 +00:00
Cleanup locks
This commit is contained in:
@@ -71,8 +71,13 @@ class EngineManager:
|
||||
engines, as well as configuring the pool through the database settings.
|
||||
"""
|
||||
|
||||
def __init__(self, mode: EngineModes = EngineModes.NEW) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
mode: EngineModes = EngineModes.NEW,
|
||||
cleanup_interval: float = 300.0, # 5 minutes default
|
||||
) -> None:
|
||||
self.mode = mode
|
||||
self.cleanup_interval = cleanup_interval
|
||||
|
||||
self._engines: dict[EngineKey, Engine] = {}
|
||||
self._engine_locks: dict[EngineKey, threading.Lock] = defaultdict(
|
||||
@@ -84,6 +89,25 @@ class EngineManager:
|
||||
threading.Lock
|
||||
)
|
||||
|
||||
# Background cleanup thread management
|
||||
self._cleanup_thread: threading.Thread | None = None
|
||||
self._cleanup_stop_event = threading.Event()
|
||||
self._cleanup_thread_lock = threading.Lock()
|
||||
|
||||
def __del__(self) -> None:
|
||||
"""
|
||||
Ensure cleanup thread is stopped when the manager is destroyed.
|
||||
"""
|
||||
try:
|
||||
self.stop_cleanup_thread()
|
||||
except Exception as ex:
|
||||
# Avoid exceptions during garbage collection, but log if possible
|
||||
try:
|
||||
logger.warning(f"Error stopping cleanup thread: {ex}")
|
||||
except Exception: # noqa: S110
|
||||
# If logging fails during destruction, we can't do anything
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
def get_engine(
|
||||
self,
|
||||
@@ -387,6 +411,95 @@ class EngineManager:
|
||||
|
||||
return kwargs
|
||||
|
||||
def start_cleanup_thread(self) -> None:
|
||||
"""
|
||||
Start the background cleanup thread.
|
||||
|
||||
The thread will periodically clean up abandoned locks at the configured
|
||||
interval. This is safe to call multiple times - subsequent calls are no-ops.
|
||||
"""
|
||||
with self._cleanup_thread_lock:
|
||||
if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
|
||||
self._cleanup_stop_event.clear()
|
||||
self._cleanup_thread = threading.Thread(
|
||||
target=self._cleanup_worker,
|
||||
name=f"EngineManager-cleanup-{id(self)}",
|
||||
daemon=True,
|
||||
)
|
||||
self._cleanup_thread.start()
|
||||
logger.info(
|
||||
f"Started cleanup thread with {self.cleanup_interval}s interval"
|
||||
)
|
||||
|
||||
def stop_cleanup_thread(self) -> None:
|
||||
"""
|
||||
Stop the background cleanup thread gracefully.
|
||||
|
||||
This will signal the thread to stop and wait for it to finish.
|
||||
Safe to call even if no thread is running.
|
||||
"""
|
||||
with self._cleanup_thread_lock:
|
||||
if self._cleanup_thread is not None and self._cleanup_thread.is_alive():
|
||||
self._cleanup_stop_event.set()
|
||||
self._cleanup_thread.join(timeout=5.0) # 5 second timeout
|
||||
if self._cleanup_thread.is_alive():
|
||||
logger.warning("Cleanup thread did not stop within timeout")
|
||||
else:
|
||||
logger.info("Cleanup thread stopped")
|
||||
self._cleanup_thread = None
|
||||
|
||||
def _cleanup_worker(self) -> None:
|
||||
"""
|
||||
Background thread worker that periodically cleans up abandoned locks.
|
||||
"""
|
||||
while not self._cleanup_stop_event.is_set():
|
||||
try:
|
||||
self._cleanup_abandoned_locks()
|
||||
except Exception:
|
||||
logger.exception("Error during background cleanup")
|
||||
|
||||
# Use wait() instead of sleep() to allow for immediate shutdown
|
||||
if self._cleanup_stop_event.wait(timeout=self.cleanup_interval):
|
||||
break # Stop event was set
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""
|
||||
Public method to manually trigger cleanup of abandoned locks.
|
||||
|
||||
This can be called periodically by external systems to prevent
|
||||
memory leaks from accumulating locks.
|
||||
"""
|
||||
self._cleanup_abandoned_locks()
|
||||
|
||||
def _cleanup_abandoned_locks(self) -> None:
|
||||
"""
|
||||
Remove locks for engines and tunnels that no longer exist.
|
||||
|
||||
This prevents memory leaks from accumulating locks in defaultdict
|
||||
when engines/tunnels are disposed outside of normal cleanup paths.
|
||||
"""
|
||||
# Clean up engine locks
|
||||
active_engine_keys = set(self._engines.keys())
|
||||
abandoned_engine_locks = set(self._engine_locks.keys()) - active_engine_keys
|
||||
for key in abandoned_engine_locks:
|
||||
self._engine_locks.pop(key, None)
|
||||
|
||||
if abandoned_engine_locks:
|
||||
logger.debug(
|
||||
f"Cleaned up {len(abandoned_engine_locks)} abandoned engine locks"
|
||||
)
|
||||
|
||||
# Clean up tunnel locks
|
||||
active_tunnel_keys = set(self._tunnels.keys())
|
||||
abandoned_tunnel_locks = set(self._tunnel_locks.keys()) - active_tunnel_keys
|
||||
for key in abandoned_tunnel_locks:
|
||||
self._tunnel_locks.pop(key, None)
|
||||
|
||||
if abandoned_tunnel_locks:
|
||||
logger.debug(
|
||||
f"Cleaned up {len(abandoned_tunnel_locks)} abandoned tunnel locks"
|
||||
)
|
||||
|
||||
def _add_disposal_listener(self, engine: Engine, engine_key: EngineKey) -> None:
|
||||
@event.listens_for(engine, "engine_disposed")
|
||||
def on_engine_disposed(engine_instance: Engine) -> None:
|
||||
|
||||
Reference in New Issue
Block a user