from mysql.connector import pooling, Error import threading from secret_handler import return_credentials import os import time from logger_handler import setup_logger db_username = return_credentials("/etc/secrets/db_username") db_password = return_credentials("/etc/secrets/db_password") db_host = os.getenv("BACKEND_API_DB_HOST","localhost") db_database = os.getenv("BACKEND_API_DB_DATABASE","app") logger = setup_logger(__name__) MAX_RETRIES = 5 RETRY_DELAY = 5 HEALTHCHECK_INTERVAL = 60 MYSQL_CONFIG = { "host": db_host, "user": db_username, "password": db_password, "database": db_database, "connection_timeout": 10 } _pool_lock = threading.Lock() _connection_pool = None _health_thread = None _stop_healthcheck = threading.Event() def create_connection_pool(): global _connection_pool with _pool_lock: if _connection_pool is not None: logger.debug("[MySQL] Pool already exists, returning existing pool") return for attempt in range (1,MAX_RETRIES+1): try: logger.info(f"[MySQL] Attempt {attempt} to connect...") pool = pooling.MySQLConnectionPool( pool_name="ApiPool", pool_size=5, pool_reset_session=True, **MYSQL_CONFIG ) with _pool_lock: _connection_pool = pool logger.info("[MySQL] Pool created", extra={"pool_name": pool.pool_name}) return except Error as e: logger.warning(f"[MySQL] Attempt {attempt} failed: {e}") if attempt < MAX_RETRIES: logger.debug(f"[MySQL] Retrying in {RETRY_DELAY} seconds...") time.sleep(RETRY_DELAY) logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.") raise RuntimeError("MySQL pool initialization failed") def close_connection_pool(): global _connection_pool with _pool_lock: if _connection_pool: logger.debug(f"[MySQL] Closing pool: {_connection_pool}") _connection_pool = None logger.info("[MySQL] Connection pool closed") _stop_healthcheck.set() logger.debug("[MySQL] Healthcheck stop flag set") def get_connection_pool(): global _connection_pool with _pool_lock: if _connection_pool is None: logger.debug("[MySQL] No pool found, creating one") create_connection_pool() else: logger.debug(f"[MySQL] Returning existing pool: {_connection_pool}") return _connection_pool def get_db(): pool = get_connection_pool() logger.debug(f"[MySQL] Acquiring connection from pool: {pool}") conn = pool.get_connection() try: conn.ping(reconnect=True, attempts=3, delay=1) logger.debug("[MySQL] Connection alive") except Error as e: logger.warning(f"[MySQL] Connection dead, recreating pool: {e}") create_connection_pool() pool = get_connection_pool() conn = pool.get_connection() logger.debug("[MySQL] Reconnected successfully") try: yield conn finally: if conn and conn.is_connected(): conn.close() logger.debug("[MySQL] Connection returned to pool") def _pool_healthcheck(): while not _stop_healthcheck.is_set(): time.sleep(HEALTHCHECK_INTERVAL) with _pool_lock: pool = _connection_pool if not pool: logger.debug("[MySQL] Healthcheck skipped, pool not initialized") continue try: conn = pool.get_connection() conn.ping(reconnect=True, attempts=3, delay=1) conn.close() logger.debug(f"[MySQL] Pool healthcheck succeeded") except Error as e: logger.warning(f"[MySQL] Pool healthcheck failed: {e}") create_connection_pool() def start_healthcheck_thread(): global _health_thread if _health_thread and _health_thread.is_alive(): logger.debug("[MySQL] Healthcheck thread already running") return _stop_healthcheck.clear() _health_thread = threading.Thread(target=_pool_healthcheck, daemon=True) _health_thread.start() logger.info("[MySQL] Healthcheck thread started.")