RMQ and DB: Added reconnecting to either if the application goes down
This commit is contained in:
parent
482a90ae7b
commit
ac6ca4baf8
@ -64,6 +64,13 @@ def get_connection_pool():
|
|||||||
|
|
||||||
|
|
||||||
def get_db():
|
def get_db():
|
||||||
|
pool = get_connection_pool()
|
||||||
|
try:
|
||||||
|
conn = pool.get_connection()
|
||||||
|
if not conn.is_connected():
|
||||||
|
conn.reconnect(attempts=MAX_RETRIES, delay=RETRY_DELAY)
|
||||||
|
except Exception:
|
||||||
|
create_connection_pool()
|
||||||
pool = get_connection_pool()
|
pool = get_connection_pool()
|
||||||
conn = pool.get_connection()
|
conn = pool.get_connection()
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -37,6 +37,11 @@ async def lifespan(app: FastAPI):
|
|||||||
logger.info("Closing MySQL connection pool...")
|
logger.info("Closing MySQL connection pool...")
|
||||||
close_connection_pool()
|
close_connection_pool()
|
||||||
|
|
||||||
|
def get_rmq_connection(app: FastAPI):
|
||||||
|
connection = getattr(app.state, "rmq_connection", None)
|
||||||
|
if not connection or not connection.is_open:
|
||||||
|
app.state.rmq_connection = create_connection()
|
||||||
|
return app.state.rmq_connection
|
||||||
|
|
||||||
api = FastAPI(
|
api = FastAPI(
|
||||||
title="Internal Notifier API",
|
title="Internal Notifier API",
|
||||||
@ -73,8 +78,9 @@ def receive_notifications(
|
|||||||
db = Depends(get_db),
|
db = Depends(get_db),
|
||||||
program_name: str = Depends(verify_api_key_dependency_internal)
|
program_name: str = Depends(verify_api_key_dependency_internal)
|
||||||
):
|
):
|
||||||
|
rmq_connection = get_rmq_connection(request.app)
|
||||||
logger.info(f"Received notifcation data from {program_name} for RMQ")
|
logger.info(f"Received notifcation data from {program_name} for RMQ")
|
||||||
send_message_to_rmq(request.app.state.rmq_connection,notification_data.receipent_user_id,notification_data.message)
|
send_message_to_rmq(rmq_connection,notification_data.receipent_user_id,notification_data.message)
|
||||||
logger.info("Successfully delivered message to RMQ")
|
logger.info("Successfully delivered message to RMQ")
|
||||||
return {"status": "queued"}
|
return {"status": "queued"}
|
||||||
|
|
||||||
|
|||||||
@ -41,7 +41,8 @@ def create_connection():
|
|||||||
|
|
||||||
def send_message_to_rmq(connection, user_id: int, message: Dict):
|
def send_message_to_rmq(connection, user_id: int, message: Dict):
|
||||||
if not connection or not connection.is_open:
|
if not connection or not connection.is_open:
|
||||||
raise RuntimeError("RabbitMQ connection is not open")
|
print("[RMQ] Connection lost, reconnecting...")
|
||||||
|
connection = create_connection()
|
||||||
|
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
channel.exchange_declare(exchange=rmq_exchange, exchange_type="topic", durable=True)
|
channel.exchange_declare(exchange=rmq_exchange, exchange_type="topic", durable=True)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user