From b272e069d41dddcecc73af55d4f9bf9079cf0a33 Mon Sep 17 00:00:00 2001 From: Florian Date: Fri, 17 Oct 2025 10:58:37 +0200 Subject: [PATCH] Unified logging behaviour - Logger doesn't start with log level DEBUG by default, instead reads a environment variable - Added extensive debug logging - Wrote a readme - Changed database healthcheck loop to only catch pymsql errors --- README.md | 38 ++++++++++++++++++++++++++++- src/db.py | 17 ++++++++++--- src/logger_handler.py | 8 ++++++- src/rabbitmq_handler.py | 52 +++++++++++++++++++++++++++------------- src/secret_handler.py | 19 ++++++++++++--- src/send_notification.py | 21 ++++++++-------- 6 files changed, 120 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index d4b8041..26452f6 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,38 @@ -# backend-push-notifications +# Backend Push Notifications + +## Overview +This part of the backend handles device push notifications for registered users. It consumes messages from RabbitMQ, looks up user device tokens from a MySQL database, and sends notifications via Expo's API. Failed deliveries are retried or sent to a dead-letter queue. + + +## Requirements +- Python 3.12+ +- MySQL database +- RabbitMQ +- Python packages from requirements.txt + + +## Configuration + +**Environment variables:** + +- `LOG_LEVEL` (DEBUG, INFO, WARNING, ERROR, CRITICAL) + +- `RABBITMQ_URL` + +- `MYSQL_HOST`, `MYSQL_USER`, `MYSQL_PASSWORD`, `MYSQL_DATABASE` + +- `API_ENDPOINT` (Expo push API) + + +## Metrics + +Metrics are exposed on port `9000` for Prometheus, including: + +- `MSG_PUBLISHED` + +- `MSG_FAILED` + +- `MSG_RETRY` + +- Queue sizes diff --git a/src/db.py b/src/db.py index f6c67d0..25fd2e5 100644 --- a/src/db.py +++ b/src/db.py @@ -4,7 +4,7 @@ import asyncio from secret_handler import return_credentials import os from logger_handler import setup_logger - +import pymysql.err db_username = return_credentials("/etc/secrets/db_username") db_password = return_credentials("/etc/secrets/db_password") @@ -26,6 +26,7 @@ class DBManager: self._health_interval = health_interval self._health_task: asyncio.Task | None = None self._closing = False + logger.debug(f"[DB] Initialized DBManager: host={host}, db={db}, port={port}, pool_size={pool_size}") async def connect(self): self._pool = await aiomysql.create_pool( @@ -40,38 +41,48 @@ class DBManager: cursorclass=aiomysql.DictCursor ) logger.info("[DB] Connection pool created") + logger.debug(f"[DB] Pool object: {self._pool}") self._health_task = asyncio.create_task(self._healthcheck_loop()) + logger.debug("[DB] Healthcheck task started") async def _healthcheck_loop(self): + logger.debug("[DB] Healthcheck loop running") while not self._closing: await asyncio.sleep(self._health_interval) try: async with self.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT 1") - except Exception as e: + logger.debug("[DB] Pool healthcheck succeeded") + except pymysql.err.Error as e: logger.warning(f"[DB] Healthcheck failed: {e}") @asynccontextmanager async def acquire(self): + logger.debug("[DB] Acquiring connection from pool") conn = await self._pool.acquire() + logger.debug(f"[DB] Connection acquired: {conn}") try: yield conn finally: self._pool.release(conn) + logger.debug(f"[DB] Connection released: {conn}") async def release(self, conn): if self._pool: self._pool.release(conn) + logger.debug(f"[DB] Connection manually released: {conn}") async def close(self): + logger.info("[DB] Closing connection pool...") self._closing = True if self._health_task and not self._health_task.done(): self._health_task.cancel() try: await self._health_task + logger.debug("[DB] Healthcheck task cancelled") except asyncio.CancelledError: - pass + logger.debug("[DB] Healthcheck task cancellation caught") if self._pool: self._pool.close() await self._pool.wait_closed() diff --git a/src/logger_handler.py b/src/logger_handler.py index 25c121d..3911736 100644 --- a/src/logger_handler.py +++ b/src/logger_handler.py @@ -1,4 +1,9 @@ import logging +import os + +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}: + LOG_LEVEL = "INFO" def setup_logger(name: str) -> logging.Logger: logger = logging.getLogger(name) @@ -9,5 +14,6 @@ def setup_logger(name: str) -> logging.Logger: ) handler.setFormatter(formatter) logger.addHandler(handler) - logger.setLevel(logging.DEBUG) + logger.setLevel(getattr(logging, LOG_LEVEL)) + logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}") return logger diff --git a/src/rabbitmq_handler.py b/src/rabbitmq_handler.py index 72de00a..535cb86 100644 --- a/src/rabbitmq_handler.py +++ b/src/rabbitmq_handler.py @@ -23,8 +23,9 @@ RABBITMQ_URL = f"amqp://{rmq_username}:{rmq_password}@{rmq_host}/{rmq_vhost}" class RabbitMQConsumer: - def __init__(self, url=RABBITMQ_URL, db_manager=db_manager, exchange_name=rmq_exchange): + def __init__(self, url=RABBITMQ_URL, db_manager=db_manager, exchange_name=rmq_exchange,rmq_host=rmq_host): self.url = url + self.rmq_host = rmq_host self.exchange_name = exchange_name self.connection: aio_pika.RobustConnection | None = None self.channel: aio_pika.RobustChannel | None = None @@ -37,8 +38,11 @@ class RabbitMQConsumer: self.dlq_queue_name = "notifications_dlq" self.max_retries = 5 self.retry_ttl = 5 * 60 * 10000 + logger.debug(f"[RabbitMQ] Initialized consumer for exchange {self.exchange_name}") + async def connect(self): + logger.info(f"[RabbitMQ] Connecting to {self.rmq_host}...") self.connection = await aio_pika.connect_robust(self.url) self.channel = await self.connection.channel() self.exchange = await self.channel.declare_exchange( @@ -46,6 +50,8 @@ class RabbitMQConsumer: ) self.queue = await self.channel.declare_queue(self.queue_name, durable=True) await self.queue.bind(self.exchange, routing_key="notify.user.*") + logger.debug(f"[RabbitMQ] Bound queue '{self.queue_name}' to exchange '{self.exchange_name}'") + retry_queue_args = { "x-message-ttl": self.retry_ttl, "x-dead-letter-exchange": self.exchange_name, @@ -61,7 +67,7 @@ class RabbitMQConsumer: asyncio.create_task(metrics.update_queue_gauge(self.channel, self.retry_queue_name)) asyncio.create_task(metrics.update_queue_gauge(self.channel, self.dlq_queue_name)) - logger.info("[RMQ] Connected, exchange, retry, and DLQ queues ready.") + logger.info("[RabbitMQ] Connected, exchange, retry, and DLQ queues ready.") async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage): data = json.loads(message.body.decode()) @@ -74,7 +80,7 @@ class RabbitMQConsumer: ), routing_key=self.dlq_queue_name ) - logger.warning(f"Message sent to DLQ: {data['uuid']}") + logger.warning(f"[RabbitMQ] Message sent to DLQ: {data['uuid']}") metrics.MSG_FAILED.inc() async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage): @@ -88,8 +94,10 @@ class RabbitMQConsumer: uuid=uuid, retry_count=retry_count ).inc() + logger.debug(f"[RabbitMQ] Metrics updated for retry: uuid={uuid}, retry_count={retry_count}") if retry_count > self.max_retries: + logger.debug(f"[RabbitMQ] Retry count exceeded max_retries={self.max_retries}, sending to DLQ") await self.send_message_to_dlq(uuid, message) return await self.channel.default_exchange.publish( @@ -100,23 +108,25 @@ class RabbitMQConsumer: ), routing_key=self.retry_queue_name ) - logger.info(f"Message requeued for retry #{retry_count}") + logger.info(f"[RabbitMQ] Message requeued for retry #{retry_count}") async def handle_message(self, message: aio_pika.IncomingMessage): if self._closing: + logger.debug("[RabbitMQ] Skipping message because consumer is closing") return async with message.process(): try: data = json.loads(message.body.decode()) - logger.info(f"[RMQ] Received: {data}") + logger.debug(f"[RabbitMQ] Received message: {data}") uuid = data.get("uuid") + if uuid: encrypted_tokens = await database_lookup_by_uuid(uuid, db_manager) else: encrypted_tokens = await database_lookup_by_user_id(message.routing_key, db_manager) if not encrypted_tokens: - logger.warning(f"No push tokens found for user {message.routing_key}") + logger.warning(f"[RabbitMQ] No push tokens found for routing key {message.routing_key}") return token_map = {row["uuid"]: row["token"].decode() for row in encrypted_tokens} @@ -127,13 +137,13 @@ class RabbitMQConsumer: await self.validate_delivery(response,message) except json.JSONDecodeError as e: - logger.error(f"[RMQ] Bad message, discarding: {e}") + logger.error(f"[RabbitMQ] Bad message, discarding: {e}", exc_info=True) await message.nack(requeue=False) except AMQPException as e: - logger.error(f"[RMQ] RabbitMQ error: {e}") + logger.error(f"[RabbitMQ] RabbitMQ error: {e}", exc_info=True) await message.nack(requeue=True) except Exception as e: - logger.critical(f"[RMQ] Fatal error: {e}") + logger.critical(f"[RabbitMQ] Fatal error: {e}", exc_info=True) raise async def validate_delivery(self,response,message: aio_pika.IncomingMessage): @@ -143,45 +153,53 @@ class RabbitMQConsumer: api_status = data_list[0].get("status") if data_list else None if status == "ok" and api_status == "ok": - logger.info(f"Notification delivered successfully to {uuid}") + logger.info(f"[RabbitMQ] Notification delivered successfully to {uuid}") metrics.MSG_PUBLISHED.inc() + logger.debug(f"[RabbitMQ] Metrics updated for published message: uuid={uuid}") + if status == "ok" and api_status == "error": api_error = data_list[0].get("details", {}).get("error") if api_error == "DeviceNotRegistered": expired = await remove_inactive_push_token(uuid, self.db_manager) if expired: - logger.info(f"Device no longer registered for uuid {uuid}, setting as expired") + logger.info(f"[RabbitMQ] Device no longer registered for uuid {uuid}, marked expired") else: - logger.error(f"Failed expiring token for uuid: {uuid}") + logger.error(f"[RabbitMQ] Failed expiring token for uuid: {uuid}") else: await self.send_message_to_dlq(uuid, message) + logger.debug(f"[RabbitMQ] Message sent to DLQ due to API error: uuid={uuid}") + if status == "error": await self.send_message_to_dlq(uuid, message) + logger.debug(f"[RabbitMQ] Message sent to DLQ due to status=error: uuid={uuid}") if status == "failure": await self.send_message_to_retry_queue(uuid, message) + logger.debug(f"[RabbitMQ] Message requeued for retry due to status=failure: uuid={uuid}") + async def consume(self): if self._closing: + logger.debug("[RabbitMQ] Consumer is closing, not starting consume") return if not self.queue: - raise RuntimeError("Queue not initialized") + raise RuntimeError("[RabbitMQ] Queue not initialized") await self.queue.consume(self.handle_message, no_ack=False) - logger.info("[RMQ] Consuming messages...") + logger.info("[RabbitMQ] Consuming messages...") async def close(self): self._closing = True if self.connection: await self.connection.close() - logger.info("[aio-pika] Connection closed.") + logger.info("[RabbitMQ] Connection closed") async def main(): - logger.info("Starting application") + logger.info("[APP] Starting application") await db_manager.connect() consumer = RabbitMQConsumer(db_manager=db_manager) await consumer.connect() @@ -192,7 +210,7 @@ async def main(): asyncio.get_running_loop().add_signal_handler(sig, stop_event.set) await stop_event.wait() - logger.info("Stopping application") + logger.info("[APP] Stopping application") await consumer.close() await db_manager.close() diff --git a/src/secret_handler.py b/src/secret_handler.py index 311e358..8b22319 100644 --- a/src/secret_handler.py +++ b/src/secret_handler.py @@ -35,6 +35,7 @@ def return_credentials(path: str)->str: async def database_lookup_by_user_id(routing_key: str, db_manager): try: user_id = int(routing_key.split('.')[-1]) + logger.debug(f"[DB] Looking up tokens for user_id={user_id}") except ValueError: logger.error(f"[DB] Invalid user id supplied:{routing_key}") return [] @@ -43,22 +44,34 @@ async def database_lookup_by_user_id(routing_key: str, db_manager): async with conn.cursor() as cur: await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE user_id=%s", (user_id,)) + logger.debug(f"[DB] Executed query for user_id={user_id}") if cur.description: - return await cur.fetchall() + rows = await cur.fetchall() + logger.debug(f"[DB] Retrieved {len(rows)} tokens for user_id={user_id}") + return rows + logger.debug(f"[DB] No tokens found for user_id={user_id}") return [] async def database_lookup_by_uuid(uuid: str, db_manager): + logger.debug(f"[DB] Looking up token for uuid={uuid}") async with db_manager.acquire() as conn: async with conn.cursor() as cur: await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE token_id=%s", (uuid,)) + logger.debug(f"[DB] Executed query for uuid={uuid}") if cur.description: - return await cur.fetchall() + rows = await cur.fetchall() + logger.debug(f"[DB] Retrieved {len(rows)} tokens for uuid={uuid}") + return rows + logger.debug(f"[DB] No token found for uuid={uuid}") return [] async def remove_inactive_push_token(uuid :str, db_manager): + logger.debug(f"[DB] Expiring token for uuid={uuid}") async with db_manager.acquire() as conn: async with conn.cursor() as cur: await cur.execute("UPDATE device_tokens SET status='expired' WHERE token_id=%s", (uuid,)) - return cur.rowcount > 0 + success = cur.rowcount > 0 + logger.debug(f"[DB] Token expiration for uuid={uuid} success={success}") + return success diff --git a/src/send_notification.py b/src/send_notification.py index 619ddcb..038d06f 100644 --- a/src/send_notification.py +++ b/src/send_notification.py @@ -15,20 +15,18 @@ retryable_exceptions = ( ) -async def send_notification( - message: dict, - push_tokens, - max_retries: int = 5, - timeout: int = 5, -): - +async def send_notification(message: dict, push_tokens, max_retries: int = 5, timeout: int = 5): + logger.debug(f"[Notification] Sending to {len(push_tokens)} push tokens") results = {} for uuid, token in push_tokens.items(): + logger.debug(f"[Notification] Preparing to send to uuid={uuid}") results[uuid] = await _send_to_token(token, uuid, message, max_retries, timeout) return results async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int): payload = create_payload(token, message) + logger.debug(f"[Notification] Payload for uuid={uuid}: {payload}") + resolver = aiohttp.AsyncResolver() connector = aiohttp.TCPConnector(resolver=resolver) @@ -47,18 +45,19 @@ async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, return {"status":"ok","data":data} except retryable_exceptions as e: - logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}") + logger.warning(f"[Notification] Attempt {attempt}/{max_retries} failed for uuid={uuid}: {type(e).__name__}: {e}", exc_info=True) await asyncio.sleep(2 ** (attempt - 1)) + logger.debug(f"[Notification] Retrying uuid={uuid}, attempt {attempt + 1}") except Exception as e: - logger.error(f"Unexpected failure for uuid {uuid}: {e}") + logger.error(f"[Notification] Unexpected failure for uuid={uuid}: {e}", exc_info=True) return {"status": "error", "exception": str(e)} logger.error(f"Failed to send notification to uuid {uuid} after {max_retries} attempts") return {"status": "failure"} def create_payload(push_token: str, message: dict) -> dict: - return { + payload = { "to": push_token, "title": message.get("title"), "body": message.get("body"), @@ -70,3 +69,5 @@ def create_payload(push_token: str, message: dict) -> dict: "sound": "default", "priority": "high" } + logger.debug(f"[Notification] Created payload: {payload}") + return payload \ No newline at end of file -- 2.43.0