backend-api-internal/src/rabbitmq_handler.py

60 lines
1.8 KiB
Python

import pika
from typing import Dict
from secret_handler import return_credentials
import ssl
import json
import time
import sys
import os
rmq_username = return_credentials("/etc/secrets/rmq_username")
rmq_password = return_credentials("/etc/secrets/rmq_password")
rmq_host = os.getenv("BACKEND_API_INTERNAL_RMQ_HOST","localhost")
rmq_vhost = os.getenv("BACKEND_API_INTERNAL_RMQ_VHOST","app_notifications")
rmq_exchange = os.getenv("BACKEND_API_INTERNAL_RMQ_EXCHANGE","app_notifications")
MAX_RETRIES = 5
RETRY_DELAY = 5
def send_message_to_rmq(user_id: int, message: Dict):
credentials = pika.PlainCredentials(username=rmq_username, password=rmq_password)
context = ssl.create_default_context()
context.check_hostname = False
ssl_options = pika.SSLOptions(context)
conn_params = pika.ConnectionParameters(
host=rmq_host,
port=5671,
ssl_options=ssl_options,
credentials=credentials,
virtual_host=rmq_vhost
)
for attempt in range(1, MAX_RETRIES + 1):
try:
connection = pika.BlockingConnection(conn_params)
channel = connection.channel()
channel.exchange_declare(exchange=rmq_exchange, exchange_type="topic", durable=True)
channel.confirm_delivery()
channel.basic_publish(
exchange=rmq_exchange,
routing_key=f"notify.user.{user_id}",
body=json.dumps(message),
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
mandatory=True
)
connection.close()
return
except Exception as e:
print(f"[RMQ] Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
time.sleep(RETRY_DELAY)
else:
print("[RMQ] Failed to connect after maximum retries — exiting.")
sys.exit(1)
if __name__ == "__main__":
send_message_to_rmq(1, {"type": "notification", "content": "Vault TLS cert reloaded successfully."})