diff --git a/src/rabbitmq_handler.py b/src/rabbitmq_handler.py index 2581a0d..a29c8bd 100644 --- a/src/rabbitmq_handler.py +++ b/src/rabbitmq_handler.py @@ -144,10 +144,16 @@ class RabbitMQConsumer: if self._closing: logger.debug("[RabbitMQ] Skipping message because consumer is closing") return + async with message.process(): try: data = json.loads(message.body.decode()) logger.debug(f"[RabbitMQ] Received message: {data}") + + if message.routing_key in [self.dlq_queue_name, "notify.dlq"]: + logger.info(f"[RabbitMQ] Message in DLQ, skipping processing: {data}") + return + uuid = data.get("uuid") if uuid: @@ -164,7 +170,15 @@ class RabbitMQConsumer: decrypted_token = decrypt_token(token) token_map[uuid] = decrypted_token response = await send_notification(message=data,push_tokens=token_map) - await self.validate_delivery(response,message) + + # Validate delivery - any errors here should NOT prevent acknowledgment + # because notifications were already sent. Instead, we handle retries/DLQ + # by publishing new messages. + try: + await self.validate_delivery(response,message) + except Exception as validation_error: + logger.error(f"[RabbitMQ] Error during delivery validation: {validation_error}", exc_info=True) + metrics.MSG_FAILED.inc() except json.JSONDecodeError as e: logger.error(f"[RabbitMQ] Bad message, discarding: {e}", exc_info=True) @@ -174,7 +188,7 @@ class RabbitMQConsumer: await message.nack(requeue=True) except Exception as e: logger.critical(f"[RabbitMQ] Fatal error: {e}", exc_info=True) - raise + metrics.MSG_FAILED.inc() async def validate_delivery(self,response,message: aio_pika.IncomingMessage): """ @@ -208,37 +222,52 @@ class RabbitMQConsumer: See: https://docs.expo.dev/push-notifications/sending-notifications/ """ for uuid, result in response.items(): - status = result.get("status") - data_list = result.get("data", {}).get("data", []) - api_status = data_list[0].get("status") if data_list else None + try: + status = result.get("status") + data_list = result.get("data", {}).get("data", []) - if status == "ok" and api_status == "ok": - logger.info(f"[RabbitMQ] Notification delivered successfully to {uuid}") - metrics.MSG_PUBLISHED.inc() - logger.debug(f"[RabbitMQ] Metrics updated for published message: uuid={uuid}") + api_status = None + api_error = None + if data_list and isinstance(data_list, list) and len(data_list) > 0: + first_item = data_list[0] + if isinstance(first_item, dict): + api_status = first_item.get("status") + api_error = first_item.get("details", {}).get("error") + + logger.debug(f"[RabbitMQ] Processing delivery for uuid={uuid}, status={status}, api_status={api_status}") + + if status == "ok" and api_status == "ok": + logger.info(f"[RabbitMQ] Notification delivered successfully to {uuid}") + metrics.MSG_PUBLISHED.inc() + logger.debug(f"[RabbitMQ] Metrics updated for published message: uuid={uuid}") - if status == "ok" and api_status == "error": - api_error = data_list[0].get("details", {}).get("error") - if api_error == "DeviceNotRegistered": - expired = await remove_inactive_push_token(uuid, self.db_manager) - if expired: - logger.info(f"[RabbitMQ] Device no longer registered for uuid {uuid}, marked expired") + elif status == "ok" and api_status == "error": + if api_error == "DeviceNotRegistered": + expired = await remove_inactive_push_token(uuid, self.db_manager) + if expired: + logger.info(f"[RabbitMQ] Device no longer registered for uuid {uuid}, marked expired") + else: + logger.error(f"[RabbitMQ] Failed expiring token for uuid: {uuid}") else: - logger.error(f"[RabbitMQ] Failed expiring token for uuid: {uuid}") - else: + await self.send_message_to_dlq(uuid, message) + logger.debug(f"[RabbitMQ] Message sent to DLQ due to API error: uuid={uuid}, error={api_error}") + + + elif status == "error": await self.send_message_to_dlq(uuid, message) - logger.debug(f"[RabbitMQ] Message sent to DLQ due to API error: uuid={uuid}") + logger.debug(f"[RabbitMQ] Message sent to DLQ due to status=error: uuid={uuid}") + elif status == "failure": + await self.send_message_to_retry_queue(uuid, message) + logger.debug(f"[RabbitMQ] Message requeued for retry due to status=failure: uuid={uuid}") - if status == "error": - await self.send_message_to_dlq(uuid, message) - logger.debug(f"[RabbitMQ] Message sent to DLQ due to status=error: uuid={uuid}") - - if status == "failure": - await self.send_message_to_retry_queue(uuid, message) - logger.debug(f"[RabbitMQ] Message requeued for retry due to status=failure: uuid={uuid}") - + except Exception as e: + logger.error(f"[RabbitMQ] Error processing delivery validation for uuid={uuid}: {e}", exc_info=True) + try: + await self.send_message_to_dlq(uuid,message) + except Exception as dlQ_error: + logger.error(f"[RabbitMQ] Failed to send to DLQ for uuid={uuid}: {dlq_error}") async def consume(self):