First version

This commit is contained in:
Florian 2025-11-06 16:37:06 +01:00
parent 681e76577e
commit 29fb26d4f9
3 changed files with 194 additions and 0 deletions

8
db_module/__init__.py Normal file
View File

@ -0,0 +1,8 @@
from .db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
__all__ = [
"get_db",
"create_connection_pool",
"close_connection_pool",
"start_healthcheck_thread",
]

167
db_module/db.py Normal file
View File

@ -0,0 +1,167 @@
from mysql.connector import pooling, Error
from mysql.connector.pooling import MySQLConnectionPool, MySQLConnection
import threading
from secret_manager import return_credentials
import os
import time
from simple_logger_handler import setup_logger
from typing import Generator
db_username = return_credentials("/etc/secrets/db_username")
db_password = return_credentials("/etc/secrets/db_password")
db_host = os.getenv("BACKEND_API_DB_HOST","localhost")
db_database = os.getenv("BACKEND_API_DB_DATABASE","app")
logger = setup_logger(__name__)
MAX_RETRIES = 5
RETRY_DELAY = 5
HEALTHCHECK_INTERVAL = 60
MYSQL_CONFIG = {
"host": db_host,
"user": db_username,
"password": db_password,
"database": db_database,
"connection_timeout": 10
}
_pool_lock = threading.Lock()
_connection_pool = None
_health_thread = None
_stop_healthcheck = threading.Event()
def create_connection_pool(pool_name: str = "MySQLPool"):
"""
Create a MySQL connection pool.
Args:
pool_name (str): Name for the connection pool.
Raises:
RuntimeError: If pool creation fails after retries.
"""
global _connection_pool
with _pool_lock:
if _connection_pool is not None:
logger.debug("[MySQL] Pool already exists, returning existing pool")
return
for attempt in range (1,MAX_RETRIES+1):
try:
logger.info(f"[MySQL] Attempt {attempt} to connect...")
pool = pooling.MySQLConnectionPool(
pool_name=pool_name,
pool_size=5,
pool_reset_session=True,
**MYSQL_CONFIG
)
with _pool_lock:
_connection_pool = pool
logger.info("[MySQL] Pool created", extra={"pool_name": pool.pool_name})
return
except Error as e:
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
logger.debug(f"[MySQL] Retrying in {RETRY_DELAY} seconds...")
time.sleep(RETRY_DELAY)
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
raise RuntimeError("MySQL pool initialization failed")
def close_connection_pool():
"""
Close the MySQL connection pool and stop the healthcheck thread.
"""
global _connection_pool
with _pool_lock:
if _connection_pool:
logger.debug(f"[MySQL] Closing pool: {_connection_pool}")
_connection_pool = None
logger.info("[MySQL] Connection pool closed")
_stop_healthcheck.set()
logger.debug("[MySQL] Healthcheck stop flag set")
def get_connection_pool(pool_name: str = "MySQLPool") -> MySQLConnectionPool:
"""
Retrieve the existing MySQL connection pool, or create a new one if not initialized.
Args:
pool_name (str): Name for the connection pool.
Returns:
MySQLConnectionPool: The active connection pool.
"""
global _connection_pool
with _pool_lock:
if _connection_pool is None:
logger.debug("[MySQL] No pool found, creating one")
create_connection_pool(pool_name)
else:
logger.debug(f"[MySQL] Returning existing pool: {_connection_pool}")
return _connection_pool
def get_db(pool_name: str = "MySQLPool") -> Generator[MySQLConnection, None, None]:
"""
Context generator yielding a database connection from the pool.
Args:
pool_name (str): Name for the connection pool.
Yields:
MySQLConnection: A live MySQL database connection.
"""
pool = get_connection_pool(pool_name)
logger.debug(f"[MySQL] Acquiring connection from pool: {pool}")
conn = pool.get_connection()
try:
conn.ping(reconnect=True, attempts=3, delay=1)
logger.debug("[MySQL] Connection alive")
except Error as e:
logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
create_connection_pool(pool_name)
pool = get_connection_pool(pool_name)
conn = pool.get_connection()
logger.debug("[MySQL] Reconnected successfully")
try:
yield conn
finally:
if conn and conn.is_connected():
conn.close()
logger.debug("[MySQL] Connection returned to pool")
def _pool_healthcheck():
"""
Internal function that periodically pings the database to ensure pool health.
"""
while not _stop_healthcheck.is_set():
time.sleep(HEALTHCHECK_INTERVAL)
with _pool_lock:
pool = _connection_pool
if not pool:
logger.debug("[MySQL] Healthcheck skipped, pool not initialized")
continue
try:
conn = pool.get_connection()
conn.ping(reconnect=True, attempts=3, delay=1)
conn.close()
logger.debug(f"[MySQL] Pool healthcheck succeeded")
except Error as e:
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
create_connection_pool()
def start_healthcheck_thread():
"""
Start the background thread that runs periodic pool healthchecks.
"""
global _health_thread
if _health_thread and _health_thread.is_alive():
logger.debug("[MySQL] Healthcheck thread already running")
return
_stop_healthcheck.clear()
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
_health_thread.start()
logger.info("[MySQL] Healthcheck thread started.")

19
pyproject.toml Normal file
View File

@ -0,0 +1,19 @@
[project]
name = "lib-db-module"
version = "0.1.0"
description = "Shared database module"
readme = "README.md"
requires-python = ">=3.10"
authors = [{ name = "Florian Gänsejunge" }]
dependencies = [
"simple-logger-handler @ git+https://git.gansejunge.com/notifier/lib-logger-handler.git@main",
"mysql-connector-python==9.4.0",
"lib-secret-manager @ git+https://git.gansejunge.com/notifier/lib-secret-manager.git@main"
]
[build-system]
requires = ["setuptools>=61"]
build-backend = "setuptools.build_meta"
[project.urls]
Homepage = "https://git.gansejunge.com/notifier/lib-db-module"