Moved database module to separate library
All checks were successful
Build & Publish to GHCR / build (push) Successful in 1m39s

This commit is contained in:
Florian 2025-11-06 17:23:50 +01:00
parent c7691dcf15
commit abf06c43b5
3 changed files with 3 additions and 133 deletions

View File

@ -9,6 +9,7 @@ click==8.3.0
fastapi==0.119.0
h11==0.16.0
idna==3.11
lib-db-module @ git+https://git.gansejunge.com/notifier/lib-db-module.git@main
lib-secret-manager @ git+https://git.gansejunge.com/notifier/lib-secret-manager.git@main
lib-uvicorn-config @ git+https://git.gansejunge.com/notifier/lib-uvicorn-config.git@main
multidict==6.7.0

131
src/db.py
View File

@ -1,131 +0,0 @@
from mysql.connector import pooling, Error
import threading
from secret_manager import return_credentials
import os
import time
from simple_logger_handler import setup_logger
db_username = return_credentials("/etc/secrets/db_username")
db_password = return_credentials("/etc/secrets/db_password")
db_host = os.getenv("BACKEND_API_INTERNAL_DB_HOST","localhost")
db_database = os.getenv("BACKEND_API_INTERNAL_DB_DATABASE","app")
logger = setup_logger(__name__)
MAX_RETRIES = 5
RETRY_DELAY = 5
HEALTHCHECK_INTERVAL = 60
MYSQL_CONFIG = {
"host": db_host,
"user": db_username,
"password": db_password,
"database": db_database,
"connection_timeout": 10
}
_pool_lock = threading.Lock()
_connection_pool = None
_health_thread = None
_stop_healthcheck = threading.Event()
def create_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool is not None:
logger.debug("[MySQL] Pool already exists, returning existing pool")
return
for attempt in range (1,MAX_RETRIES+1):
try:
logger.info(f"[MySQL] Attempt {attempt} to connect...")
pool = pooling.MySQLConnectionPool(
pool_name="ApiInternalPool",
pool_size=5,
pool_reset_session=True,
**MYSQL_CONFIG
)
with _pool_lock:
_connection_pool = pool
logger.info("[MySQL] Pool created", extra={"pool_name": pool.pool_name})
return
except Error as e:
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
logger.debug(f"[MySQL] Retrying in {RETRY_DELAY} seconds...")
time.sleep(RETRY_DELAY)
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
raise RuntimeError("MySQL pool initialization failed")
def close_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool:
logger.debug(f"[MySQL] Closing pool: {_connection_pool}")
_connection_pool = None
logger.info("[MySQL] Connection pool closed")
_stop_healthcheck.set()
logger.debug("[MySQL] Healthcheck stop flag set")
def get_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool is None:
logger.debug("[MySQL] No pool found, creating one")
create_connection_pool()
else:
logger.debug(f"[MySQL] Returning existing pool: {_connection_pool}")
return _connection_pool
def get_db():
pool = get_connection_pool()
logger.debug(f"[MySQL] Acquiring connection from pool: {pool}")
conn = pool.get_connection()
try:
conn.ping(reconnect=True, attempts=3, delay=1)
logger.debug("[MySQL] Connection alive")
except Error as e:
logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
create_connection_pool()
pool = get_connection_pool()
conn = pool.get_connection()
logger.debug("[MySQL] Reconnected successfully")
try:
yield conn
finally:
if conn and conn.is_connected():
conn.close()
logger.debug("[MySQL] Connection returned to pool")
def _pool_healthcheck():
while not _stop_healthcheck.is_set():
time.sleep(HEALTHCHECK_INTERVAL)
with _pool_lock:
pool = _connection_pool
if not pool:
logger.debug("[MySQL] Healthcheck skipped, pool not initialized")
continue
try:
conn = pool.get_connection()
conn.ping(reconnect=True, attempts=3, delay=1)
conn.close()
logger.debug(f"[MySQL] Pool healthcheck succeeded")
except Error as e:
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
create_connection_pool()
def start_healthcheck_thread():
global _health_thread
if _health_thread and _health_thread.is_alive():
logger.debug("[MySQL] Healthcheck thread already running")
return
_stop_healthcheck.clear()
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
_health_thread.start()
logger.info("[MySQL] Healthcheck thread started.")

View File

@ -5,7 +5,7 @@ from starlette.exceptions import HTTPException as StarletteHTTPException
from typing import Dict
from pydantic import BaseModel
from validator import verify_api_key
from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
from db_module import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
from simple_logger_handler import setup_logger, LOG_LEVEL
from rabbitmq_handler import RabbitMQProducer
import uvicorn
@ -30,7 +30,7 @@ async def lifespan(app: FastAPI):
logger.info("Starting application...")
logger.info("Creating MySQL connection pool...")
create_connection_pool()
create_connection_pool("ApiInternalPool")
start_healthcheck_thread()
logger.info("MySQL healthcheck thread started.")