81 lines
1.8 KiB
Python

import mysql.connector
import threading
from secret_handler import return_credentials
import os
import time
import sys
db_username = return_credentials("/etc/secrets/db_username")
db_password = return_credentials("/etc/secrets/db_password")
db_host = os.getenv("BACKEND_API_DB_HOST","localhost")
db_database = os.getenv("BACKEND_API_DB_DATABASE","app")
MAX_RETRIES = 5
RETRY_DELAY = 5
MYSQL_CONFIG = {
"host": db_host,
"user": db_username,
"password": db_password,
"database": db_database
}
_pool_lock = threading.Lock()
_connection_pool = None
def create_connection_pool():
global _connection_pool
for attempt in range(1, MAX_RETRIES+1):
try:
print(f"[MySQL] Attempt {attempt} to connect...")
pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=5,
pool_reset_session=True,
**MYSQL_CONFIG
)
with _pool_lock:
_connection_pool = pool
print("[MySQL] Connection pool created successfully.")
return
except mysql.connector.Error as e:
print(f"[MySQL] Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
time.sleep(RETRY_DELAY)
print(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts — exiting.")
sys.exit(1)
def close_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool:
_connection_pool = None
print("[MySQL] Connection pool closed.")
def get_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool is None:
create_connection_pool()
return _connection_pool
def get_db():
pool = get_connection_pool()
try:
conn = pool.get_connection()
if not conn.is_connected():
conn.reconnect(attempts=MAX_RETRIES, delay=RETRY_DELAY)
except Exception:
create_connection_pool()
pool = get_connection_pool()
conn = pool.get_connection()
try:
yield conn
finally:
conn.close()