Improved database connection and logging:

Robust ping/reconnect: get_db() now retries ping multiple times and safely recreates the pool once per request.
Healthcheck thread: background daemon periodically pings connections and recreates the pool if needed.
Logging improvements: early pool creation and healthcheck logs appear via proper logger setup.
This commit is contained in:
Florian 2025-10-12 11:50:13 +02:00
parent cd759f43a8
commit 7b0ba7de10
5 changed files with 121 additions and 24 deletions

View File

@ -1,59 +1,72 @@
import mysql.connector import mysql.connector
from mysql.connector import pooling, Error
import threading import threading
from secret_handler import return_credentials from secret_handler import return_credentials
import os import os
import time import time
import sys from logger_handler import setup_logger
db_username = return_credentials("/etc/secrets/db_username") db_username = return_credentials("/etc/secrets/db_username")
db_password = return_credentials("/etc/secrets/db_password") db_password = return_credentials("/etc/secrets/db_password")
db_host = os.getenv("SERVICE_RR_DB_HOST","localhost") db_host = os.getenv("SERVICE_RR_DB_HOST","localhost")
db_database = os.getenv("SERVICE_RR_DB_HOST_DATABASE","app") db_database = os.getenv("SERVICE_RR_DB_HOST_DATABASE","app")
logger = setup_logger(__name__)
MAX_RETRIES = 5 MAX_RETRIES = 5
RETRY_DELAY = 5 RETRY_DELAY = 5
HEALTHCHECK_INTERVAL = 60
MYSQL_CONFIG = { MYSQL_CONFIG = {
"host": db_host, "host": db_host,
"user": db_username, "user": db_username,
"password": db_password, "password": db_password,
"database": db_database "database": db_database,
"connection_timeout": 10
} }
_pool_lock = threading.Lock() _pool_lock = threading.Lock()
_connection_pool = None _connection_pool = None
_health_thread = None
_stop_healthcheck = threading.Event()
def create_connection_pool(): def create_connection_pool():
global _connection_pool global _connection_pool
for attempt in range(1, MAX_RETRIES+1): with _pool_lock:
if _connection_pool is not None:
return
for attempt in range (1,MAX_RETRIES+1):
try: try:
print(f"[MySQL] Attempt {attempt} to connect...") logger.info(f"[MySQL] Attempt {attempt} to connect...")
pool = mysql.connector.pooling.MySQLConnectionPool( pool = pooling.MySQLConnectionPool(
pool_name="mypool", pool_name="royalroadPool",
pool_size=5, pool_size=5,
pool_reset_session=True, pool_reset_session=True,
**MYSQL_CONFIG **MYSQL_CONFIG
) )
with _pool_lock: with _pool_lock:
_connection_pool = pool _connection_pool = pool
print("[MySQL] Connection pool created successfully.") logger.info("[MySQL] Pool created", extra={"pool_name": pool.pool_name})
return return
except mysql.connector.Error as e:
print(f"[MySQL] Attempt {attempt} failed: {e}") except Error as e:
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES: if attempt < MAX_RETRIES:
time.sleep(RETRY_DELAY) time.sleep(RETRY_DELAY)
print(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts — exiting.")
sys.exit(1)
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
raise RuntimeError("MySQL pool initialization failed")
def close_connection_pool(): def close_connection_pool():
global _connection_pool global _connection_pool
with _pool_lock: with _pool_lock:
if _connection_pool: if _connection_pool:
_connection_pool = None _connection_pool = None
print("[MySQL] Connection pool closed.") logger.info("[MySQL] Connection pool closed")
_stop_healthcheck.set()
def get_connection_pool(): def get_connection_pool():
global _connection_pool global _connection_pool
@ -62,18 +75,46 @@ def get_connection_pool():
create_connection_pool() create_connection_pool()
return _connection_pool return _connection_pool
def get_db(): def get_db():
pool = get_connection_pool() pool = get_connection_pool()
conn = pool.get_connection()
try: try:
conn = pool.get_connection() conn.ping(reconnect=True, attempts=3, delay=1)
if not conn.is_connected(): except Error as e:
conn.reconnect(attempts=MAX_RETRIES, delay=RETRY_DELAY) logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
except Exception:
create_connection_pool() create_connection_pool()
pool = get_connection_pool() pool = get_connection_pool()
conn = pool.get_connection() conn = pool.get_connection()
try: try:
yield conn yield conn
finally: finally:
conn.close() if conn and conn.is_connected():
conn.close()
def _pool_healthcheck():
while not _stop_healthcheck.is_set():
time.sleep(HEALTHCHECK_INTERVAL)
with _pool_lock:
pool = _connection_pool
if not pool:
continue
try:
conn = pool.get_connection()
conn.ping(reconnect=True, attempts=3, delay=1)
conn.close()
logger.debug("[MySQL] Pool healthcheck OK.")
except Error as e:
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
create_connection_pool()
def start_healthcheck_thread():
global _health_thread
if _health_thread and _health_thread.is_alive():
return
_stop_healthcheck.clear()
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
_health_thread.start()
logger.info("[MySQL] Healthcheck thread started.")

13
src/logger_handler.py Normal file
View File

@ -0,0 +1,13 @@
import logging
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger

View File

@ -1,15 +1,16 @@
from fastapi import FastAPI, Depends, HTTPException, Response, Request from fastapi import FastAPI, Depends, HTTPException, Response, Request
import uvicorn import uvicorn
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from db import get_db, create_connection_pool, close_connection_pool from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
import logging from logger_handler import setup_logger
from feed_handler import grab_latest_chapter_information from feed_handler import grab_latest_chapter_information
from send_notification import send_notification from send_notification import send_notification
from metrics_server import REQUEST_COUNTER from metrics_server import REQUEST_COUNTER
import asyncio import asyncio
logger = logging.getLogger(__name__) logger = setup_logger(__name__)
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
@ -18,6 +19,9 @@ async def lifespan(app: FastAPI):
logger.info("Creating MySQL connection pool...") logger.info("Creating MySQL connection pool...")
create_connection_pool() create_connection_pool()
start_healthcheck_thread()
logger.info("MySQL healthcheck thread started.")
yield yield
logger.info("Closing MySQL connection pool...") logger.info("Closing MySQL connection pool...")
close_connection_pool() close_connection_pool()

View File

@ -4,12 +4,12 @@ from fastapi import HTTPException
from secret_handler import return_credentials from secret_handler import return_credentials
import os import os
import time import time
import logging from logger_handler import setup_logger
backend_api_url=os.getenv("BACKEND_API_URL","localhost:8101/internal/receive-notifications") backend_api_url=os.getenv("BACKEND_API_URL","localhost:8101/internal/receive-notifications")
api_key= return_credentials("/etc/secrets/api_key") api_key= return_credentials("/etc/secrets/api_key")
logger = logging.getLogger(__name__) logger = setup_logger(__name__)
def send_notification(title:str, def send_notification(title:str,

View File

@ -0,0 +1,39 @@
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(levelname)s - %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"handlers": {
"default": {
"class": "logging.StreamHandler",
"formatter": "default",
"stream": "ext://sys.stdout"
}
},
"loggers": {
"": { # root logger
"handlers": ["default"],
"level": "INFO",
"propagate": False
},
"uvicorn": {
"handlers": ["default"],
"level": "INFO",
"propagate": False
},
"uvicorn.error": {
"handlers": ["default"],
"level": "INFO",
"propagate": False
},
"uvicorn.access": {
"handlers": ["default"],
"level": "INFO",
"propagate": False
}
}
}