Moved database module to separate library

This commit is contained in:
Florian 2025-11-06 17:40:12 +01:00
parent 9daf283a6e
commit ba878f2ec4
3 changed files with 3 additions and 131 deletions

View File

@ -6,6 +6,7 @@ click==8.3.0
fastapi==0.118.2
h11==0.16.0
idna==3.10
lib-db-module @ git+https://git.gansejunge.com/notifier/lib-db-module.git@main
lib-secret-manager @ git+https://git.gansejunge.com/notifier/lib-secret-manager.git@main
lib-uvicorn-config @ git+https://git.gansejunge.com/notifier/lib-uvicorn-config.git@main
mysql-connector-python==9.4.0

129
src/db.py
View File

@ -1,129 +0,0 @@
from mysql.connector import pooling, Error
import threading
from secret_manager import return_credentials
import os
import time
from simple_logger_handler import setup_logger
db_username = return_credentials("/etc/secrets/db_username")
db_password = return_credentials("/etc/secrets/db_password")
db_host = os.getenv("SERVICE_DRQ_DB_HOST","localhost")
db_database = os.getenv("SERVICE_DRQ_DB_HOST_DATABASE","app")
logger = setup_logger(__name__)
MAX_RETRIES = 5
RETRY_DELAY = 5
HEALTHCHECK_INTERVAL = 60
MYSQL_CONFIG = {
"host": db_host,
"user": db_username,
"password": db_password,
"database": db_database
}
_pool_lock = threading.Lock()
_connection_pool = None
_health_thread = None
_stop_healthcheck = threading.Event()
def create_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool is not None:
logger.debug("[MySQL] Pool already exists, returning existing pool")
return
for attempt in range (1,MAX_RETRIES+1):
try:
logger.info(f"[MySQL] Attempt {attempt} to connect...")
pool = pooling.MySQLConnectionPool(
pool_name="repositoryQueryPool",
pool_size=5,
pool_reset_session=True,
**MYSQL_CONFIG
)
with _pool_lock:
_connection_pool = pool
logger.info("[MySQL] Pool created", extra={"pool_name": pool.pool_name})
return
except Error as e:
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
logger.debug(f"[MySQL] Retrying in {RETRY_DELAY} seconds...")
time.sleep(RETRY_DELAY)
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
raise RuntimeError("MySQL pool initialization failed")
def close_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool:
logger.debug(f"[MySQL] Closing pool: {_connection_pool}")
_connection_pool = None
logger.info("[MySQL] Connection pool closed")
_stop_healthcheck.set()
logger.debug("[MySQL] Healthcheck stop flag set")
def get_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool is None:
logger.debug("[MySQL] No pool found, creating one")
create_connection_pool()
else:
logger.debug(f"[MySQL] Returning existing pool: {_connection_pool}")
return _connection_pool
def get_db():
pool = get_connection_pool()
logger.debug(f"[MySQL] Acquiring connection from pool: {pool}")
conn = pool.get_connection()
try:
conn.ping(reconnect=True, attempts=3, delay=1)
logger.debug("[MySQL] Connection alive")
except Error as e:
logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
create_connection_pool()
pool = get_connection_pool()
conn = pool.get_connection()
logger.debug("[MySQL] Reconnected successfully")
try:
yield conn
finally:
if conn and conn.is_connected():
conn.close()
logger.debug("[MySQL] Connection returned to pool")
def _pool_healthcheck():
while not _stop_healthcheck.is_set():
time.sleep(HEALTHCHECK_INTERVAL)
with _pool_lock:
pool = _connection_pool
if not pool:
logger.debug("[MySQL] Healthcheck skipped, pool not initialized")
continue
try:
conn = pool.get_connection()
conn.ping(reconnect=True, attempts=3, delay=1)
conn.close()
logger.debug(f"[MySQL] Pool healthcheck succeeded")
except Error as e:
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
create_connection_pool()
def start_healthcheck_thread():
global _health_thread
if _health_thread and _health_thread.is_alive():
logger.debug("[MySQL] Healthcheck thread already running")
return
_stop_healthcheck.clear()
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
_health_thread.start()
logger.info("[MySQL] Healthcheck thread started.")

View File

@ -4,7 +4,7 @@ from github_api import find_package_version_with_tag as find_package_version_wit
from dockerhub_api import find_package_version_with_tag as find_package_version_with_tag_dockerhub
import uvicorn
from contextlib import asynccontextmanager
from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
from db_module import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
from simple_logger_handler import setup_logger, LOG_LEVEL
from send_notification import send_notification
from metrics_server import REQUEST_COUNTER
@ -20,7 +20,7 @@ async def lifespan(app: FastAPI):
logger.info("[App] Starting application...")
logger.info("[DB] Creating MySQL connection pool...")
create_connection_pool()
create_connection_pool("repositoryQueryPool")
start_healthcheck_thread()
logger.info("[DB] MySQL healthcheck thread started.")