fix/dlq #4

Merged
florian merged 3 commits from fix/dlq into main 2025-10-17 22:33:46 +02:00
Showing only changes of commit 5bbdc979d4 - Show all commits

View File

@ -144,10 +144,16 @@ class RabbitMQConsumer:
if self._closing: if self._closing:
logger.debug("[RabbitMQ] Skipping message because consumer is closing") logger.debug("[RabbitMQ] Skipping message because consumer is closing")
return return
async with message.process(): async with message.process():
try: try:
data = json.loads(message.body.decode()) data = json.loads(message.body.decode())
logger.debug(f"[RabbitMQ] Received message: {data}") logger.debug(f"[RabbitMQ] Received message: {data}")
if message.routing_key in [self.dlq_queue_name, "notify.dlq"]:
logger.info(f"[RabbitMQ] Message in DLQ, skipping processing: {data}")
return
uuid = data.get("uuid") uuid = data.get("uuid")
if uuid: if uuid:
@ -164,7 +170,15 @@ class RabbitMQConsumer:
decrypted_token = decrypt_token(token) decrypted_token = decrypt_token(token)
token_map[uuid] = decrypted_token token_map[uuid] = decrypted_token
response = await send_notification(message=data,push_tokens=token_map) response = await send_notification(message=data,push_tokens=token_map)
await self.validate_delivery(response,message)
# Validate delivery - any errors here should NOT prevent acknowledgment
# because notifications were already sent. Instead, we handle retries/DLQ
# by publishing new messages.
try:
await self.validate_delivery(response,message)
except Exception as validation_error:
logger.error(f"[RabbitMQ] Error during delivery validation: {validation_error}", exc_info=True)
metrics.MSG_FAILED.inc()
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.error(f"[RabbitMQ] Bad message, discarding: {e}", exc_info=True) logger.error(f"[RabbitMQ] Bad message, discarding: {e}", exc_info=True)
@ -174,7 +188,7 @@ class RabbitMQConsumer:
await message.nack(requeue=True) await message.nack(requeue=True)
except Exception as e: except Exception as e:
logger.critical(f"[RabbitMQ] Fatal error: {e}", exc_info=True) logger.critical(f"[RabbitMQ] Fatal error: {e}", exc_info=True)
raise metrics.MSG_FAILED.inc()
async def validate_delivery(self,response,message: aio_pika.IncomingMessage): async def validate_delivery(self,response,message: aio_pika.IncomingMessage):
""" """
@ -208,37 +222,52 @@ class RabbitMQConsumer:
See: https://docs.expo.dev/push-notifications/sending-notifications/ See: https://docs.expo.dev/push-notifications/sending-notifications/
""" """
for uuid, result in response.items(): for uuid, result in response.items():
status = result.get("status") try:
data_list = result.get("data", {}).get("data", []) status = result.get("status")
api_status = data_list[0].get("status") if data_list else None data_list = result.get("data", {}).get("data", [])
if status == "ok" and api_status == "ok": api_status = None
logger.info(f"[RabbitMQ] Notification delivered successfully to {uuid}") api_error = None
metrics.MSG_PUBLISHED.inc() if data_list and isinstance(data_list, list) and len(data_list) > 0:
logger.debug(f"[RabbitMQ] Metrics updated for published message: uuid={uuid}") first_item = data_list[0]
if isinstance(first_item, dict):
api_status = first_item.get("status")
api_error = first_item.get("details", {}).get("error")
logger.debug(f"[RabbitMQ] Processing delivery for uuid={uuid}, status={status}, api_status={api_status}")
if status == "ok" and api_status == "ok":
logger.info(f"[RabbitMQ] Notification delivered successfully to {uuid}")
metrics.MSG_PUBLISHED.inc()
logger.debug(f"[RabbitMQ] Metrics updated for published message: uuid={uuid}")
if status == "ok" and api_status == "error": elif status == "ok" and api_status == "error":
api_error = data_list[0].get("details", {}).get("error") if api_error == "DeviceNotRegistered":
if api_error == "DeviceNotRegistered": expired = await remove_inactive_push_token(uuid, self.db_manager)
expired = await remove_inactive_push_token(uuid, self.db_manager) if expired:
if expired: logger.info(f"[RabbitMQ] Device no longer registered for uuid {uuid}, marked expired")
logger.info(f"[RabbitMQ] Device no longer registered for uuid {uuid}, marked expired") else:
logger.error(f"[RabbitMQ] Failed expiring token for uuid: {uuid}")
else: else:
logger.error(f"[RabbitMQ] Failed expiring token for uuid: {uuid}") await self.send_message_to_dlq(uuid, message)
else: logger.debug(f"[RabbitMQ] Message sent to DLQ due to API error: uuid={uuid}, error={api_error}")
elif status == "error":
await self.send_message_to_dlq(uuid, message) await self.send_message_to_dlq(uuid, message)
logger.debug(f"[RabbitMQ] Message sent to DLQ due to API error: uuid={uuid}") logger.debug(f"[RabbitMQ] Message sent to DLQ due to status=error: uuid={uuid}")
elif status == "failure":
await self.send_message_to_retry_queue(uuid, message)
logger.debug(f"[RabbitMQ] Message requeued for retry due to status=failure: uuid={uuid}")
if status == "error": except Exception as e:
await self.send_message_to_dlq(uuid, message) logger.error(f"[RabbitMQ] Error processing delivery validation for uuid={uuid}: {e}", exc_info=True)
logger.debug(f"[RabbitMQ] Message sent to DLQ due to status=error: uuid={uuid}") try:
await self.send_message_to_dlq(uuid,message)
if status == "failure": except Exception as dlQ_error:
await self.send_message_to_retry_queue(uuid, message) logger.error(f"[RabbitMQ] Failed to send to DLQ for uuid={uuid}: {dlq_error}")
logger.debug(f"[RabbitMQ] Message requeued for retry due to status=failure: uuid={uuid}")
async def consume(self): async def consume(self):