From 0ad61950c899efa4183e5c9e681dcea0e024d36b Mon Sep 17 00:00:00 2001 From: Florian Date: Tue, 14 Oct 2025 12:07:27 +0200 Subject: [PATCH] Added logic to handle the Expo API response --- src/db.py | 1 - src/rabbitmq_handler.py | 51 +++++++++++++++++++++++++++++++++------- src/secret_handler.py | 8 ++++++- src/send_notification.py | 4 ++-- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/db.py b/src/db.py index f598269..19f063c 100644 --- a/src/db.py +++ b/src/db.py @@ -49,7 +49,6 @@ class DBManager: async with self.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 1") - logger.debug("[DB] Healthcheck OK") except Exception as e: logger.warning(f"[DB] Healthcheck failed: {e}") diff --git a/src/rabbitmq_handler.py b/src/rabbitmq_handler.py index fda7064..e60d1fb 100644 --- a/src/rabbitmq_handler.py +++ b/src/rabbitmq_handler.py @@ -1,7 +1,7 @@ import asyncio import aio_pika from aio_pika.exceptions import AMQPException -from secret_handler import return_credentials, database_lookup, decrypt_token +from secret_handler import return_credentials, database_lookup, decrypt_token, remove_inactive_push_token import os from logger_handler import setup_logger import json @@ -40,7 +40,13 @@ class RabbitMQConsumer: ) self.queue = await self.channel.declare_queue("notifications", durable=True) await self.queue.bind(self.exchange, routing_key="notify.user.*") - logger.info("[Consumer] Connected, queue bound to notify.user.*") + logger.info("[RMQ] Connected, queue bound to notify.user.*") + + async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage): + return + + async def send_message_to_retry_queue(self): + return async def handle_message(self, message: aio_pika.IncomingMessage): if self._closing: @@ -48,7 +54,7 @@ class RabbitMQConsumer: async with message.process(): try: data = json.loads(message.body.decode()) - logger.info(f"[Consumer] Received: {data}") + logger.info(f"[RMQ] Received: {data}") logger.info(message.routing_key) encrypted_tokens = await database_lookup(message.routing_key, db_manager) if not encrypted_tokens: @@ -59,25 +65,50 @@ class RabbitMQConsumer: for uuid, token in token_map.items(): decrypted_token = decrypt_token(token) token_map[uuid] = decrypted_token - await send_notification(message=data,push_tokens=token_map) + response = await send_notification(message=data,push_tokens=token_map) + await self.validate_delivery(response,message) + logger.debug(response) except json.JSONDecodeError as e: - logger.error(f"[Consumer] Bad message, discarding: {e}") + logger.error(f"[RMQ] Bad message, discarding: {e}") await message.nack(requeue=False) except AMQPException as e: - logger.error(f"[Consumer] RabbitMQ error: {e}") + logger.error(f"[RMQ] RabbitMQ error: {e}") await message.nack(requeue=True) except Exception as e: - logger.critical(f"[Consumer] Fatal error: {e}") + logger.critical(f"[RMQ] Fatal error: {e}") raise + async def validate_delivery(self,response,message: aio_pika.IncomingMessage): + for uuid, result in response.items(): + status = result.get("status") + data_list = result.get("data", {}).get("data", []) + api_status = data_list[0].get("status") if data_list else None + + if status == "ok" and api_status == "ok": + logger.info(f"Notification delivered successfully to {uuid}") + + if status == "ok" and api_status == "error": + api_error = data_list[0].get("details", {}).get("error") + if api_error == "DeviceNotRegistered": + logger.info(f"Device no longer registered for uuid {uuid}, setting as expired") + await remove_inactive_push_token(uuid, self.db_manager) + else: + await self.send_message_to_dlq(uuid, message) + + if status == "error": + await self.send_message_to_dlq(uuid, message) + if status == "failure": + await self.send_message_to_retry_queue(uuid, message) + + async def consume(self): if self._closing: return if not self.queue: raise RuntimeError("Queue not initialized") await self.queue.consume(self.handle_message, no_ack=False) - logger.info("[Consumer] Consuming messages...") + logger.info("[RMQ] Consuming messages...") async def close(self): self._closing = True @@ -88,14 +119,16 @@ class RabbitMQConsumer: async def main(): + logger.info("Starting application") await db_manager.connect() consumer = RabbitMQConsumer(db_manager=db_manager) await consumer.connect() await consumer.consume() stop_event = asyncio.Event() + for sig in (signal.SIGINT, signal.SIGTERM): asyncio.get_running_loop().add_signal_handler(sig, stop_event.set) - + logger.info("Stopping application") await stop_event.wait() await consumer.close() await db_manager.close() diff --git a/src/secret_handler.py b/src/secret_handler.py index d9cd8f4..005d809 100644 --- a/src/secret_handler.py +++ b/src/secret_handler.py @@ -47,4 +47,10 @@ async def database_lookup(routing_key: str, db_manager): (user_id,)) if cur.description: return await cur.fetchall() - return [] \ No newline at end of file + return [] + +async def remove_inactive_push_token(uuid :str, db_manager): + async with db_manager.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute("UPDATE device_tokens SET status='expired' WHERE token_id=%s", + (uuid,)) diff --git a/src/send_notification.py b/src/send_notification.py index 27dd8d6..aab0290 100644 --- a/src/send_notification.py +++ b/src/send_notification.py @@ -16,7 +16,6 @@ async def send_notification( results = {} for uuid, token in push_tokens.items(): results[uuid] = await _send_to_token(token, uuid, message, max_retries, timeout) - return results async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int): @@ -32,8 +31,9 @@ async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout=timeout ) as response: response.raise_for_status() + data = await response.json() logger.info(f"Notification sent successfully to uuid {uuid}") - return {"status": "ok"} + return {"status":"ok","data":data} except (aiohttp.ClientError, asyncio.TimeoutError) as e: logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}")