Merge pull request 'Improved database connection and logging' (#2) from feature/database-retry-connections into main
All checks were successful
/ build (push) Successful in 25s

Reviewed-on: #2
This commit is contained in:
florian 2025-10-12 13:21:01 +02:00
commit 14b1c7f82d
5 changed files with 114 additions and 21 deletions

View File

@ -1,17 +1,21 @@
import mysql.connector
from mysql.connector import pooling, Error
import threading
from secret_handler import return_credentials
import os
import time
import sys
from logger_handler import setup_logger
db_username = return_credentials("/etc/secrets/db_username")
db_password = return_credentials("/etc/secrets/db_password")
db_host = os.getenv("SERVICE_DRQ_DB_HOST","localhost")
db_database = os.getenv("SERVICE_DRQ_DB_HOST_DATABASE","app")
logger = setup_logger(__name__)
MAX_RETRIES = 5
RETRY_DELAY = 5
HEALTHCHECK_INTERVAL = 60
MYSQL_CONFIG = {
"host": db_host,
@ -22,38 +26,44 @@ MYSQL_CONFIG = {
_pool_lock = threading.Lock()
_connection_pool = None
_health_thread = None
_stop_healthcheck = threading.Event()
def create_connection_pool():
global _connection_pool
for attempt in range(1, MAX_RETRIES+1):
with _pool_lock:
if _connection_pool is not None:
return
for attempt in range (1,MAX_RETRIES+1):
try:
print(f"[MySQL] Attempt {attempt} to connect...")
pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name="mypool",
logger.info(f"[MySQL] Attempt {attempt} to connect...")
pool = pooling.MySQLConnectionPool(
pool_name="royalroadPool",
pool_size=5,
pool_reset_session=True,
**MYSQL_CONFIG
)
with _pool_lock:
_connection_pool = pool
print("[MySQL] Connection pool created successfully.")
logger.info("[MySQL] Pool created", extra={"pool_name": pool.pool_name})
return
except mysql.connector.Error as e:
print(f"[MySQL] Attempt {attempt} failed: {e}")
except Error as e:
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
time.sleep(RETRY_DELAY)
print(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts — exiting.")
sys.exit(1)
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
raise RuntimeError("MySQL pool initialization failed")
def close_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool:
_connection_pool = None
print("[MySQL] Connection pool closed.")
logger.info("[MySQL] Connection pool closed")
_stop_healthcheck.set()
def get_connection_pool():
global _connection_pool
@ -62,18 +72,46 @@ def get_connection_pool():
create_connection_pool()
return _connection_pool
def get_db():
pool = get_connection_pool()
conn = pool.get_connection()
try:
conn = pool.get_connection()
if not conn.is_connected():
conn.reconnect(attempts=MAX_RETRIES, delay=RETRY_DELAY)
except Exception:
conn.ping(reconnect=True, attempts=3, delay=1)
except Error as e:
logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
create_connection_pool()
pool = get_connection_pool()
conn = pool.get_connection()
try:
yield conn
finally:
conn.close()
if conn and conn.is_connected():
conn.close()
def _pool_healthcheck():
while not _stop_healthcheck.is_set():
time.sleep(HEALTHCHECK_INTERVAL)
with _pool_lock:
pool = _connection_pool
if not pool:
continue
try:
conn = pool.get_connection()
conn.ping(reconnect=True, attempts=3, delay=1)
conn.close()
logger.debug("[MySQL] Pool healthcheck OK.")
except Error as e:
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
create_connection_pool()
def start_healthcheck_thread():
global _health_thread
if _health_thread and _health_thread.is_alive():
return
_stop_healthcheck.clear()
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
_health_thread.start()
logger.info("[MySQL] Healthcheck thread started.")

13
src/logger_handler.py Normal file
View File

@ -0,0 +1,13 @@
import logging
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger

View File

@ -4,7 +4,7 @@ from github_api import find_package_version_with_tag as find_package_version_wit
from dockerhub_api import find_package_version_with_tag as find_package_version_with_tag_dockerhub
import uvicorn
from contextlib import asynccontextmanager
from db import get_db, create_connection_pool, close_connection_pool
from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
import logging
from send_notification import send_notification
from metrics_server import REQUEST_COUNTER
@ -19,6 +19,9 @@ async def lifespan(app: FastAPI):
logger.info("Creating MySQL connection pool...")
create_connection_pool()
start_healthcheck_thread()
logger.info("MySQL healthcheck thread started.")
yield
logger.info("Closing MySQL connection pool...")
close_connection_pool()

View File

@ -4,12 +4,12 @@ from fastapi import HTTPException
from secret_handler import return_credentials
import os
import time
import logging
from logger_handler import setup_logger
backend_api_url=os.getenv("BACKEND_API_URL","localhost:8101/internal/receive-notifications")
api_key= return_credentials("/etc/secrets/api_key")
logger = logging.getLogger(__name__)
logger = setup_logger(__name__)
def send_notification(notification:str,

View File

@ -0,0 +1,39 @@
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(levelname)s - %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"handlers": {
"default": {
"class": "logging.StreamHandler",
"formatter": "default",
"stream": "ext://sys.stdout"
}
},
"loggers": {
"": { # root logger
"handlers": ["default"],
"level": "INFO",
"propagate": False
},
"uvicorn": {
"handlers": ["default"],
"level": "INFO",
"propagate": False
},
"uvicorn.error": {
"handlers": ["default"],
"level": "INFO",
"propagate": False
},
"uvicorn.access": {
"handlers": ["default"],
"level": "INFO",
"propagate": False
}
}
}