From 37476c814edfb6a46a780640ff7af46627c2f2cf Mon Sep 17 00:00:00 2001 From: Florian Date: Tue, 14 Oct 2025 21:54:23 +0200 Subject: [PATCH] RMQ: Added Prometheus metrics and cleaned up code for deployment --- requirements.txt | 4 ++-- src/db.py | 4 ++-- src/metrics.py | 39 +++++++++++++++++++++++++++++++++++++++ src/rabbitmq_handler.py | 20 +++++++++++++++++--- src/send_notification.py | 22 +++++++++++++++++----- 5 files changed, 77 insertions(+), 12 deletions(-) create mode 100644 src/metrics.py diff --git a/requirements.txt b/requirements.txt index f5290a2..a817154 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,16 +6,16 @@ aiomysql==0.2.0 aiormq==6.9.0 aiosignal==1.4.0 attrs==25.4.0 -Brotli==1.1.0 cffi==2.0.0 +cryptography==46.0.2 frozenlist==1.8.0 idna==3.11 multidict==6.7.0 pamqp==3.3.0 +prometheus_client==0.23.1 propcache==0.4.1 pycares==4.11.0 pycparser==2.23 PyMySQL==1.1.2 typing_extensions==4.15.0 yarl==1.22.0 -zstandard==0.25.0 diff --git a/src/db.py b/src/db.py index 19f063c..f6c67d0 100644 --- a/src/db.py +++ b/src/db.py @@ -77,6 +77,6 @@ class DBManager: await self._pool.wait_closed() logger.info("[DB] Connection pool closed") -#db_manager = DBManager(host=db_host, port=3306, user=db_username, password=db_password, db=db_database) -db_manager = DBManager(host=db_host, port=30006, user=db_username, password=db_password, db=db_database) +db_manager = DBManager(host=db_host, port=3306, user=db_username, password=db_password, db=db_database) + diff --git a/src/metrics.py b/src/metrics.py new file mode 100644 index 0000000..12f48fc --- /dev/null +++ b/src/metrics.py @@ -0,0 +1,39 @@ +from prometheus_client import Counter, Gauge, start_http_server +from logger_handler import setup_logger +import asyncio + +logger = setup_logger(__name__) + +MSG_PUBLISHED = Counter( + "rmq_messages_published_total", + "Total number of messages successfully published to RabbitMQ" +) + +MSG_FAILED = Counter( + "rmq_messages_failed_total", + "Total number of messages sent to the dead-letter queue" +) + +MSG_RETRY = Counter( + "rmq_messages_retry_total", + "Total number of messages retried via retry queue", + ["queue_name", "uuid", "retry_count"] +) + +QUEUE_MESSAGES = Gauge( + "rmq_queue_messages", + "Current number of messages pending in the queue", + ["queue_name"] +) + +def start_metrics_server(port: int = 9000): + start_http_server(port) + logger.info(f"Prometheus metrics exposed at http://0.0.0.0:{port}/metrics") + +async def update_queue_gauge(channel, queue_name: str): + """Periodically update queue depth""" + queue = await channel.declare_queue(queue_name, passive=True) + while True: + info = await queue.declare() + QUEUE_MESSAGES.labels(queue_name=queue_name).set(info.message_count) + await asyncio.sleep(10) diff --git a/src/rabbitmq_handler.py b/src/rabbitmq_handler.py index 1d0f309..72de00a 100644 --- a/src/rabbitmq_handler.py +++ b/src/rabbitmq_handler.py @@ -8,6 +8,7 @@ import json from db import db_manager from send_notification import send_notification import signal +import metrics logger = setup_logger(__name__) @@ -31,6 +32,7 @@ class RabbitMQConsumer: self.queue: aio_pika.Queue | None = None self._closing = False self.db_manager = db_manager + self.queue_name = "notifications" self.retry_queue_name = "notifications_retry" self.dlq_queue_name = "notifications_dlq" self.max_retries = 5 @@ -42,7 +44,7 @@ class RabbitMQConsumer: self.exchange = await self.channel.declare_exchange( self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True ) - self.queue = await self.channel.declare_queue("notifications", durable=True) + self.queue = await self.channel.declare_queue(self.queue_name, durable=True) await self.queue.bind(self.exchange, routing_key="notify.user.*") retry_queue_args = { "x-message-ttl": self.retry_ttl, @@ -54,6 +56,11 @@ class RabbitMQConsumer: ) await self.channel.declare_queue(self.dlq_queue_name, durable=True) + metrics.start_metrics_server(port=9000) + asyncio.create_task(metrics.update_queue_gauge(self.channel, self.queue_name)) + asyncio.create_task(metrics.update_queue_gauge(self.channel, self.retry_queue_name)) + asyncio.create_task(metrics.update_queue_gauge(self.channel, self.dlq_queue_name)) + logger.info("[RMQ] Connected, exchange, retry, and DLQ queues ready.") async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage): @@ -68,6 +75,7 @@ class RabbitMQConsumer: routing_key=self.dlq_queue_name ) logger.warning(f"Message sent to DLQ: {data['uuid']}") + metrics.MSG_FAILED.inc() async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage): data = json.loads(message.body.decode()) @@ -75,6 +83,12 @@ class RabbitMQConsumer: retry_count = data.get("retry_count", 0) + 1 data["retry_count"] = retry_count + metrics.MSG_RETRY.labels( + queue_name=self.retry_queue_name, + uuid=uuid, + retry_count=retry_count + ).inc() + if retry_count > self.max_retries: await self.send_message_to_dlq(uuid, message) return @@ -111,8 +125,7 @@ class RabbitMQConsumer: token_map[uuid] = decrypted_token response = await send_notification(message=data,push_tokens=token_map) await self.validate_delivery(response,message) - logger.debug(response) - + except json.JSONDecodeError as e: logger.error(f"[RMQ] Bad message, discarding: {e}") await message.nack(requeue=False) @@ -131,6 +144,7 @@ class RabbitMQConsumer: if status == "ok" and api_status == "ok": logger.info(f"Notification delivered successfully to {uuid}") + metrics.MSG_PUBLISHED.inc() if status == "ok" and api_status == "error": api_error = data_list[0].get("details", {}).get("error") diff --git a/src/send_notification.py b/src/send_notification.py index aff26c0..619ddcb 100644 --- a/src/send_notification.py +++ b/src/send_notification.py @@ -1,11 +1,20 @@ import aiohttp +import aiodns import asyncio from logger_handler import setup_logger -#API_ENDPOINT="https://exp.host/fakeUSer/api/v2/push/send" -API_ENDPOINT="http://127.0.0.1:8000/honk" +API_ENDPOINT="https://exp.host/--/api/v2/push/send" logger = setup_logger(__name__) +retryable_exceptions = ( + aiohttp.ClientConnectionError, + aiohttp.ServerDisconnectedError, + aiohttp.ClientOSError, + aiohttp.ServerTimeoutError, + asyncio.TimeoutError, +) + + async def send_notification( message: dict, push_tokens, @@ -20,10 +29,12 @@ async def send_notification( async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int): payload = create_payload(token, message) + resolver = aiohttp.AsyncResolver() + connector = aiohttp.TCPConnector(resolver=resolver) for attempt in range(1, max_retries + 1): try: - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(connector=connector) as session: async with session.post( url=API_ENDPOINT, json=payload, @@ -35,7 +46,7 @@ async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, logger.info(f"Notification sent successfully to uuid {uuid}") return {"status":"ok","data":data} - except (aiohttp.ClientError, asyncio.TimeoutError) as e: + except retryable_exceptions as e: logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}") await asyncio.sleep(2 ** (attempt - 1)) @@ -53,7 +64,8 @@ def create_payload(push_token: str, message: dict) -> dict: "body": message.get("body"), "data": { "link": message.get("link"), - "category": message.get("category") + "category": message.get("category"), + "timestamp": message.get("timestamp") }, "sound": "default", "priority": "high"