diff --git a/src/db.py b/src/db.py index 336001a..1f32850 100644 --- a/src/db.py +++ b/src/db.py @@ -65,7 +65,14 @@ def get_connection_pool(): def get_db(): pool = get_connection_pool() - conn = pool.get_connection() + try: + conn = pool.get_connection() + if not conn.is_connected(): + conn.reconnect(attempts=MAX_RETRIES, delay=RETRY_DELAY) + except Exception: + create_connection_pool() + pool = get_connection_pool() + conn = pool.get_connection() try: yield conn finally: diff --git a/src/main.py b/src/main.py index 27a8767..347dafc 100644 --- a/src/main.py +++ b/src/main.py @@ -37,6 +37,11 @@ async def lifespan(app: FastAPI): logger.info("Closing MySQL connection pool...") close_connection_pool() +def get_rmq_connection(app: FastAPI): + connection = getattr(app.state, "rmq_connection", None) + if not connection or not connection.is_open: + app.state.rmq_connection = create_connection() + return app.state.rmq_connection api = FastAPI( title="Internal Notifier API", @@ -73,8 +78,9 @@ def receive_notifications( db = Depends(get_db), program_name: str = Depends(verify_api_key_dependency_internal) ): + rmq_connection = get_rmq_connection(request.app) logger.info(f"Received notifcation data from {program_name} for RMQ") - send_message_to_rmq(request.app.state.rmq_connection,notification_data.receipent_user_id,notification_data.message) + send_message_to_rmq(rmq_connection,notification_data.receipent_user_id,notification_data.message) logger.info("Successfully delivered message to RMQ") return {"status": "queued"} diff --git a/src/rabbitmq_handler.py b/src/rabbitmq_handler.py index c6ee28a..68ea583 100644 --- a/src/rabbitmq_handler.py +++ b/src/rabbitmq_handler.py @@ -41,7 +41,8 @@ def create_connection(): def send_message_to_rmq(connection, user_id: int, message: Dict): if not connection or not connection.is_open: - raise RuntimeError("RabbitMQ connection is not open") + print("[RMQ] Connection lost, reconnecting...") + connection = create_connection() channel = connection.channel() channel.exchange_declare(exchange=rmq_exchange, exchange_type="topic", durable=True)