From f8b66d742f35f59c20baa7714718a9cfca6aa778 Mon Sep 17 00:00:00 2001 From: Florian Date: Tue, 14 Oct 2025 17:22:25 +0200 Subject: [PATCH] RMQ: Added retrying messages sent to a retry queue and eventual parking in the DLQ --- src/rabbitmq_handler.py | 71 +++++++++++++++++++++++++++++++++------- src/secret_handler.py | 22 +++++++++---- src/send_notification.py | 2 +- 3 files changed, 76 insertions(+), 19 deletions(-) diff --git a/src/rabbitmq_handler.py b/src/rabbitmq_handler.py index e60d1fb..1d0f309 100644 --- a/src/rabbitmq_handler.py +++ b/src/rabbitmq_handler.py @@ -1,7 +1,7 @@ import asyncio import aio_pika from aio_pika.exceptions import AMQPException -from secret_handler import return_credentials, database_lookup, decrypt_token, remove_inactive_push_token +from secret_handler import return_credentials, database_lookup_by_user_id, decrypt_token, remove_inactive_push_token, database_lookup_by_uuid import os from logger_handler import setup_logger import json @@ -31,6 +31,10 @@ class RabbitMQConsumer: self.queue: aio_pika.Queue | None = None self._closing = False self.db_manager = db_manager + self.retry_queue_name = "notifications_retry" + self.dlq_queue_name = "notifications_dlq" + self.max_retries = 5 + self.retry_ttl = 5 * 60 * 10000 async def connect(self): self.connection = await aio_pika.connect_robust(self.url) @@ -40,13 +44,49 @@ class RabbitMQConsumer: ) self.queue = await self.channel.declare_queue("notifications", durable=True) await self.queue.bind(self.exchange, routing_key="notify.user.*") - logger.info("[RMQ] Connected, queue bound to notify.user.*") + retry_queue_args = { + "x-message-ttl": self.retry_ttl, + "x-dead-letter-exchange": self.exchange_name, + "x-dead-letter-routing-key": "notify.user.retry", + } + self.retry_queue = await self.channel.declare_queue( + self.retry_queue_name, durable=True, arguments=retry_queue_args + ) + await self.channel.declare_queue(self.dlq_queue_name, durable=True) + + logger.info("[RMQ] Connected, exchange, retry, and DLQ queues ready.") async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage): - return + data = json.loads(message.body.decode()) + data["uuid"] = uuid + await self.channel.default_exchange.publish( + aio_pika.Message( + body=json.dumps(data).encode(), + content_type="application/json", + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ), + routing_key=self.dlq_queue_name + ) + logger.warning(f"Message sent to DLQ: {data['uuid']}") - async def send_message_to_retry_queue(self): - return + async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage): + data = json.loads(message.body.decode()) + data["uuid"] = uuid + retry_count = data.get("retry_count", 0) + 1 + data["retry_count"] = retry_count + + if retry_count > self.max_retries: + await self.send_message_to_dlq(uuid, message) + return + await self.channel.default_exchange.publish( + aio_pika.Message( + body=json.dumps(data).encode(), + content_type="application/json", + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ), + routing_key=self.retry_queue_name + ) + logger.info(f"Message requeued for retry #{retry_count}") async def handle_message(self, message: aio_pika.IncomingMessage): if self._closing: @@ -55,12 +95,16 @@ class RabbitMQConsumer: try: data = json.loads(message.body.decode()) logger.info(f"[RMQ] Received: {data}") - logger.info(message.routing_key) - encrypted_tokens = await database_lookup(message.routing_key, db_manager) + uuid = data.get("uuid") + if uuid: + encrypted_tokens = await database_lookup_by_uuid(uuid, db_manager) + else: + encrypted_tokens = await database_lookup_by_user_id(message.routing_key, db_manager) + if not encrypted_tokens: logger.warning(f"No push tokens found for user {message.routing_key}") return - + token_map = {row["uuid"]: row["token"].decode() for row in encrypted_tokens} for uuid, token in token_map.items(): decrypted_token = decrypt_token(token) @@ -91,13 +135,17 @@ class RabbitMQConsumer: if status == "ok" and api_status == "error": api_error = data_list[0].get("details", {}).get("error") if api_error == "DeviceNotRegistered": - logger.info(f"Device no longer registered for uuid {uuid}, setting as expired") - await remove_inactive_push_token(uuid, self.db_manager) + expired = await remove_inactive_push_token(uuid, self.db_manager) + if expired: + logger.info(f"Device no longer registered for uuid {uuid}, setting as expired") + else: + logger.error(f"Failed expiring token for uuid: {uuid}") else: await self.send_message_to_dlq(uuid, message) if status == "error": await self.send_message_to_dlq(uuid, message) + if status == "failure": await self.send_message_to_retry_queue(uuid, message) @@ -128,8 +176,9 @@ async def main(): for sig in (signal.SIGINT, signal.SIGTERM): asyncio.get_running_loop().add_signal_handler(sig, stop_event.set) - logger.info("Stopping application") + await stop_event.wait() + logger.info("Stopping application") await consumer.close() await db_manager.close() diff --git a/src/secret_handler.py b/src/secret_handler.py index 005d809..311e358 100644 --- a/src/secret_handler.py +++ b/src/secret_handler.py @@ -1,6 +1,4 @@ from cryptography.fernet import Fernet -import sys -import asyncio from logger_handler import setup_logger logger = setup_logger(__name__) @@ -10,10 +8,10 @@ try: encryption_key = file.read() except FileNotFoundError: logger.fatal("[Secret Handler] Encryption key not found") - sys.exit(1) + raise except Exception as e: logger.fatal(f"[Secret Handler] Failed to read encryption key: {e}") - sys.exit(1) + raise fernet = Fernet(encryption_key) @@ -29,12 +27,12 @@ def return_credentials(path: str)->str: return file.read().strip() except FileNotFoundError: logger.fatal(f"[Secret Handler] Secret file not found: {path}") - sys.exit(1) + raise except Exception as e: logger.fatal(f"[Secret Handler] Failed to read secret file {path}: {e}") - sys.exit(1) + raise -async def database_lookup(routing_key: str, db_manager): +async def database_lookup_by_user_id(routing_key: str, db_manager): try: user_id = int(routing_key.split('.')[-1]) except ValueError: @@ -49,8 +47,18 @@ async def database_lookup(routing_key: str, db_manager): return await cur.fetchall() return [] +async def database_lookup_by_uuid(uuid: str, db_manager): + async with db_manager.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE token_id=%s", + (uuid,)) + if cur.description: + return await cur.fetchall() + return [] + async def remove_inactive_push_token(uuid :str, db_manager): async with db_manager.acquire() as conn: async with conn.cursor() as cur: await cur.execute("UPDATE device_tokens SET status='expired' WHERE token_id=%s", (uuid,)) + return cur.rowcount > 0 diff --git a/src/send_notification.py b/src/send_notification.py index aab0290..aff26c0 100644 --- a/src/send_notification.py +++ b/src/send_notification.py @@ -44,7 +44,7 @@ async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, return {"status": "error", "exception": str(e)} logger.error(f"Failed to send notification to uuid {uuid} after {max_retries} attempts") - return {"status": "failed"} + return {"status": "failure"} def create_payload(push_token: str, message: dict) -> dict: return {