Merge pull request 'Improved database connection and logging' (#4) from feature/improved-db-handling into main
All checks were successful
Build & Publish to GHCR / build (push) Successful in 26s
All checks were successful
Build & Publish to GHCR / build (push) Successful in 26s
Reviewed-on: #4
This commit is contained in:
commit
c0fcdaeb4f
78
src/db.py
78
src/db.py
@ -1,59 +1,71 @@
|
||||
import mysql.connector
|
||||
from mysql.connector import pooling, Error
|
||||
import threading
|
||||
from secret_handler import return_credentials
|
||||
import os
|
||||
import time
|
||||
import sys
|
||||
from logger_handler import setup_logger
|
||||
|
||||
db_username = return_credentials("/etc/secrets/db_username")
|
||||
db_password = return_credentials("/etc/secrets/db_password")
|
||||
db_host = os.getenv("BACKEND_API_INTERNAL_DB_HOST","localhost")
|
||||
db_database = os.getenv("BACKEND_API_INTERNAL_DB_DATABASE","app")
|
||||
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
|
||||
MAX_RETRIES = 5
|
||||
RETRY_DELAY = 5
|
||||
HEALTHCHECK_INTERVAL = 60
|
||||
|
||||
MYSQL_CONFIG = {
|
||||
"host": db_host,
|
||||
"user": db_username,
|
||||
"password": db_password,
|
||||
"database": db_database
|
||||
"database": db_database,
|
||||
"connection_timeout": 10
|
||||
}
|
||||
|
||||
_pool_lock = threading.Lock()
|
||||
_connection_pool = None
|
||||
_health_thread = None
|
||||
_stop_healthcheck = threading.Event()
|
||||
|
||||
|
||||
def create_connection_pool():
|
||||
global _connection_pool
|
||||
for attempt in range(1, MAX_RETRIES+1):
|
||||
with _pool_lock:
|
||||
if _connection_pool is not None:
|
||||
return
|
||||
for attempt in range (1,MAX_RETRIES+1):
|
||||
try:
|
||||
print(f"[MySQL] Attempt {attempt} to connect...")
|
||||
pool = mysql.connector.pooling.MySQLConnectionPool(
|
||||
pool_name="mypool",
|
||||
logger.info(f"[MySQL] Attempt {attempt} to connect...")
|
||||
pool = pooling.MySQLConnectionPool(
|
||||
pool_name="royalroadPool",
|
||||
pool_size=5,
|
||||
pool_reset_session=True,
|
||||
**MYSQL_CONFIG
|
||||
)
|
||||
with _pool_lock:
|
||||
_connection_pool = pool
|
||||
print("[MySQL] Connection pool created successfully.")
|
||||
logger.info("[MySQL] Pool created", extra={"pool_name": pool.pool_name})
|
||||
return
|
||||
except mysql.connector.Error as e:
|
||||
print(f"[MySQL] Attempt {attempt} failed: {e}")
|
||||
|
||||
except Error as e:
|
||||
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
|
||||
if attempt < MAX_RETRIES:
|
||||
time.sleep(RETRY_DELAY)
|
||||
print(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts — exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
|
||||
raise RuntimeError("MySQL pool initialization failed")
|
||||
|
||||
def close_connection_pool():
|
||||
global _connection_pool
|
||||
with _pool_lock:
|
||||
if _connection_pool:
|
||||
_connection_pool = None
|
||||
print("[MySQL] Connection pool closed.")
|
||||
|
||||
logger.info("[MySQL] Connection pool closed")
|
||||
_stop_healthcheck.set()
|
||||
|
||||
def get_connection_pool():
|
||||
global _connection_pool
|
||||
@ -62,18 +74,46 @@ def get_connection_pool():
|
||||
create_connection_pool()
|
||||
return _connection_pool
|
||||
|
||||
|
||||
def get_db():
|
||||
pool = get_connection_pool()
|
||||
conn = pool.get_connection()
|
||||
|
||||
try:
|
||||
conn = pool.get_connection()
|
||||
if not conn.is_connected():
|
||||
conn.reconnect(attempts=MAX_RETRIES, delay=RETRY_DELAY)
|
||||
except Exception:
|
||||
conn.ping(reconnect=True, attempts=3, delay=1)
|
||||
except Error as e:
|
||||
logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
|
||||
create_connection_pool()
|
||||
pool = get_connection_pool()
|
||||
conn = pool.get_connection()
|
||||
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
if conn and conn.is_connected():
|
||||
conn.close()
|
||||
|
||||
def _pool_healthcheck():
|
||||
while not _stop_healthcheck.is_set():
|
||||
time.sleep(HEALTHCHECK_INTERVAL)
|
||||
with _pool_lock:
|
||||
pool = _connection_pool
|
||||
if not pool:
|
||||
continue
|
||||
try:
|
||||
conn = pool.get_connection()
|
||||
conn.ping(reconnect=True, attempts=3, delay=1)
|
||||
conn.close()
|
||||
logger.debug("[MySQL] Pool healthcheck OK.")
|
||||
except Error as e:
|
||||
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
|
||||
create_connection_pool()
|
||||
|
||||
|
||||
def start_healthcheck_thread():
|
||||
global _health_thread
|
||||
if _health_thread and _health_thread.is_alive():
|
||||
return
|
||||
_stop_healthcheck.clear()
|
||||
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
|
||||
_health_thread.start()
|
||||
logger.info("[MySQL] Healthcheck thread started.")
|
||||
@ -5,7 +5,7 @@ from starlette.exceptions import HTTPException as StarletteHTTPException
|
||||
from typing import Dict
|
||||
from pydantic import BaseModel
|
||||
from validator import verify_api_key
|
||||
from db import get_db, create_connection_pool, close_connection_pool
|
||||
from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
|
||||
from logger_handler import setup_logger
|
||||
from rabbitmq_handler import send_message_to_rmq, create_connection, close_connection
|
||||
import uvicorn
|
||||
@ -33,6 +33,9 @@ async def lifespan(app: FastAPI):
|
||||
logger.info("Connecting to RabbitMQ...")
|
||||
app.state.rmq_connection = create_connection()
|
||||
|
||||
start_healthcheck_thread()
|
||||
logger.info("MySQL healthcheck thread started.")
|
||||
|
||||
yield
|
||||
logger.info("Closing RabbitMQ connection...")
|
||||
close_connection(app.state.rmq_connection)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user