from mysql.connector import pooling, Error from mysql.connector.pooling import MySQLConnectionPool, MySQLConnection import threading from secret_manager import return_credentials import os import time from simple_logger_handler import setup_logger from typing import Generator db_username = return_credentials("/etc/secrets/db_username") db_password = return_credentials("/etc/secrets/db_password") db_host = os.getenv("BACKEND_API_DB_HOST","localhost") db_database = os.getenv("BACKEND_API_DB_DATABASE","app") logger = setup_logger(__name__) MAX_RETRIES = 5 RETRY_DELAY = 5 HEALTHCHECK_INTERVAL = 60 MYSQL_CONFIG = { "host": db_host, "user": db_username, "password": db_password, "database": db_database, "connection_timeout": 10 } _pool_lock = threading.Lock() _connection_pool = None _pool_name = "MySQLPool" _health_thread = None _stop_healthcheck = threading.Event() def create_connection_pool(pool_name : str = None): """ Create a MySQL connection pool. Args: pool_name (str): Name for the connection pool overriding the default. Raises: RuntimeError: If pool creation fails after retries. """ global _connection_pool, _pool_name if pool_name: _pool_name = pool_name with _pool_lock: if _connection_pool is not None: logger.debug("[MySQL] Pool already exists, returning existing pool") return for attempt in range (1,MAX_RETRIES+1): try: logger.info(f"[MySQL] Attempt {attempt} to connect...") pool = pooling.MySQLConnectionPool( pool_name=_pool_name, pool_size=5, pool_reset_session=True, **MYSQL_CONFIG ) with _pool_lock: _connection_pool = pool logger.info("[MySQL] Pool created", extra={"pool_name": pool.pool_name}) return except Error as e: logger.warning(f"[MySQL] Attempt {attempt} failed: {e}") if attempt < MAX_RETRIES: logger.debug(f"[MySQL] Retrying in {RETRY_DELAY} seconds...") time.sleep(RETRY_DELAY) logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.") raise RuntimeError("MySQL pool initialization failed") def close_connection_pool(): """ Close the MySQL connection pool and stop the healthcheck thread. """ global _connection_pool with _pool_lock: if _connection_pool: logger.debug(f"[MySQL] Closing pool: {_connection_pool}") _connection_pool = None logger.info("[MySQL] Connection pool closed") _stop_healthcheck.set() logger.debug("[MySQL] Healthcheck stop flag set") def get_connection_pool() -> MySQLConnectionPool: """ Retrieve the existing MySQL connection pool, or create a new one if not initialized. Returns: MySQLConnectionPool: The active connection pool. """ global _connection_pool with _pool_lock: if _connection_pool is None: logger.debug("[MySQL] No pool found, creating one") create_connection_pool(_pool_name) else: logger.debug(f"[MySQL] Returning existing pool: {_connection_pool}") return _connection_pool def get_db() -> Generator[MySQLConnection, None, None]: """ Context generator yielding a database connection from the pool. Yields: MySQLConnection: A live MySQL database connection. """ pool = get_connection_pool() logger.debug(f"[MySQL] Acquiring connection from pool: {pool}") conn = pool.get_connection() try: conn.ping(reconnect=True, attempts=3, delay=1) logger.debug("[MySQL] Connection alive") except Error as e: logger.warning(f"[MySQL] Connection dead, recreating pool: {e}") create_connection_pool(_pool_name) pool = get_connection_pool() conn = pool.get_connection() logger.debug("[MySQL] Reconnected successfully") try: yield conn finally: if conn and conn.is_connected(): conn.close() logger.debug("[MySQL] Connection returned to pool") def _pool_healthcheck(): """ Internal function that periodically pings the database to ensure pool health. """ while not _stop_healthcheck.is_set(): time.sleep(HEALTHCHECK_INTERVAL) with _pool_lock: pool = _connection_pool if not pool: logger.debug("[MySQL] Healthcheck skipped, pool not initialized") continue try: conn = pool.get_connection() conn.ping(reconnect=True, attempts=3, delay=1) conn.close() logger.debug(f"[MySQL] Pool healthcheck succeeded") except Error as e: logger.warning(f"[MySQL] Pool healthcheck failed: {e}") create_connection_pool() def start_healthcheck_thread(): """ Start the background thread that runs periodic pool healthchecks. """ global _health_thread if _health_thread and _health_thread.is_alive(): logger.debug("[MySQL] Healthcheck thread already running") return _stop_healthcheck.clear() _health_thread = threading.Thread(target=_pool_healthcheck, daemon=True) _health_thread.start() logger.info("[MySQL] Healthcheck thread started.")