RabbitMQ: Added detailed descriptions

This commit is contained in:
Florian 2025-10-17 14:24:46 +02:00
parent 038e3590dd
commit c583709054

View File

@ -84,6 +84,18 @@ class RabbitMQConsumer:
metrics.MSG_FAILED.inc()
async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage):
"""
Publishes a message to the retry queue with an incremented retry counter.
Behaviour:
- Each message is retried up to `max_retries` times.
- After exceeding the limit, the message is forwarded to the DLQ.
- The UUID of the intended device is embedded in the message payload.
Args:
uuid: Unique identifier for the device or token associated with the message.
message: The original RabbitMQ message being retried.
"""
data = json.loads(message.body.decode())
data["uuid"] = uuid
retry_count = data.get("retry_count", 0) + 1
@ -111,6 +123,24 @@ class RabbitMQConsumer:
logger.info(f"[RabbitMQ] Message requeued for retry #{retry_count}")
async def handle_message(self, message: aio_pika.IncomingMessage):
"""
Processes a single RabbitMQ message containing push notification data.
Message routing:
- Standard messages use the routing key: notify.user.<user_id>
- Retry messages use the routing key: notify.user.retry and include a device UUID
Behaviour:
1. Parses and validates the message payload.
2. Fetches encrypted push tokens from the database by UUID or user ID.
3. Decrypts tokens and sends notifications via the Expo Push API.
4. Delegates delivery result handling to `validate_delivery`.
Error handling:
- Invalid JSON message discarded (not requeued)
- RabbitMQ transport errors message requeued
- Unexpected exceptions raised for shutdown or external handling
"""
if self._closing:
logger.debug("[RabbitMQ] Skipping message because consumer is closing")
return
@ -147,6 +177,36 @@ class RabbitMQConsumer:
raise
async def validate_delivery(self,response,message: aio_pika.IncomingMessage):
"""
Validates delivery results returned by the Expo Push API after a message is sent.
This function inspects both the transport-level and API-level delivery statuses and
takes appropriate follow-up actions:
- Successful deliveries are logged and counted as published.
- API-level errors trigger token expiration or redirection to the DLQ.
- Failures (e.g., network or unknown issues) are requeued for retry.
Right now the <push-receipt-id> is not stored for further validation but needs to be implemented if the userbase grows.
Expected response structure:
{
"<uuid>": {
"status": "ok" | "error" | "failure",
"data": {
"data": [
{
"status": "ok" | "error",
"id": "<push-receipt-id>",
"details": { "error": "<ErrorMessage>" } #Only exists if there is an error
}
]
}
}
}
See: https://docs.expo.dev/push-notifications/sending-notifications/
"""
for uuid, result in response.items():
status = result.get("status")
data_list = result.get("data", {}).get("data", [])