From c58370905497f54b9304a6de4126e2a43a623443 Mon Sep 17 00:00:00 2001 From: Florian Date: Fri, 17 Oct 2025 14:24:46 +0200 Subject: [PATCH] RabbitMQ: Added detailed descriptions --- src/rabbitmq_handler.py | 60 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/rabbitmq_handler.py b/src/rabbitmq_handler.py index 535cb86..2581a0d 100644 --- a/src/rabbitmq_handler.py +++ b/src/rabbitmq_handler.py @@ -84,6 +84,18 @@ class RabbitMQConsumer: metrics.MSG_FAILED.inc() async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage): + """ + Publishes a message to the retry queue with an incremented retry counter. + + Behaviour: + - Each message is retried up to `max_retries` times. + - After exceeding the limit, the message is forwarded to the DLQ. + - The UUID of the intended device is embedded in the message payload. + + Args: + uuid: Unique identifier for the device or token associated with the message. + message: The original RabbitMQ message being retried. + """ data = json.loads(message.body.decode()) data["uuid"] = uuid retry_count = data.get("retry_count", 0) + 1 @@ -111,6 +123,24 @@ class RabbitMQConsumer: logger.info(f"[RabbitMQ] Message requeued for retry #{retry_count}") async def handle_message(self, message: aio_pika.IncomingMessage): + """ + Processes a single RabbitMQ message containing push notification data. + + Message routing: + - Standard messages use the routing key: notify.user. + - Retry messages use the routing key: notify.user.retry and include a device UUID + + Behaviour: + 1. Parses and validates the message payload. + 2. Fetches encrypted push tokens from the database by UUID or user ID. + 3. Decrypts tokens and sends notifications via the Expo Push API. + 4. Delegates delivery result handling to `validate_delivery`. + + Error handling: + - Invalid JSON → message discarded (not requeued) + - RabbitMQ transport errors → message requeued + - Unexpected exceptions → raised for shutdown or external handling + """ if self._closing: logger.debug("[RabbitMQ] Skipping message because consumer is closing") return @@ -147,6 +177,36 @@ class RabbitMQConsumer: raise async def validate_delivery(self,response,message: aio_pika.IncomingMessage): + """ + Validates delivery results returned by the Expo Push API after a message is sent. + + This function inspects both the transport-level and API-level delivery statuses and + takes appropriate follow-up actions: + + - Successful deliveries are logged and counted as published. + - API-level errors trigger token expiration or redirection to the DLQ. + - Failures (e.g., network or unknown issues) are requeued for retry. + + Right now the is not stored for further validation but needs to be implemented if the userbase grows. + + Expected response structure: + { + "": { + "status": "ok" | "error" | "failure", + "data": { + "data": [ + { + "status": "ok" | "error", + "id": "", + "details": { "error": "" } #Only exists if there is an error + } + ] + } + } + } + + See: https://docs.expo.dev/push-notifications/sending-notifications/ + """ for uuid, result in response.items(): status = result.get("status") data_list = result.get("data", {}).get("data", [])