Unified logging behaviour #1

Merged
florian merged 1 commits from feature/extended-logging into main 2025-10-17 11:02:22 +02:00
6 changed files with 120 additions and 35 deletions

View File

@ -1,2 +1,38 @@
# backend-push-notifications
# Backend Push Notifications
## Overview
This part of the backend handles device push notifications for registered users. It consumes messages from RabbitMQ, looks up user device tokens from a MySQL database, and sends notifications via Expo's API. Failed deliveries are retried or sent to a dead-letter queue.
## Requirements
- Python 3.12+
- MySQL database
- RabbitMQ
- Python packages from requirements.txt
## Configuration
**Environment variables:**
- `LOG_LEVEL` (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- `RABBITMQ_URL`
- `MYSQL_HOST`, `MYSQL_USER`, `MYSQL_PASSWORD`, `MYSQL_DATABASE`
- `API_ENDPOINT` (Expo push API)
## Metrics
Metrics are exposed on port `9000` for Prometheus, including:
- `MSG_PUBLISHED`
- `MSG_FAILED`
- `MSG_RETRY`
- Queue sizes

View File

@ -4,7 +4,7 @@ import asyncio
from secret_handler import return_credentials
import os
from logger_handler import setup_logger
import pymysql.err
db_username = return_credentials("/etc/secrets/db_username")
db_password = return_credentials("/etc/secrets/db_password")
@ -26,6 +26,7 @@ class DBManager:
self._health_interval = health_interval
self._health_task: asyncio.Task | None = None
self._closing = False
logger.debug(f"[DB] Initialized DBManager: host={host}, db={db}, port={port}, pool_size={pool_size}")
async def connect(self):
self._pool = await aiomysql.create_pool(
@ -40,38 +41,48 @@ class DBManager:
cursorclass=aiomysql.DictCursor
)
logger.info("[DB] Connection pool created")
logger.debug(f"[DB] Pool object: {self._pool}")
self._health_task = asyncio.create_task(self._healthcheck_loop())
logger.debug("[DB] Healthcheck task started")
async def _healthcheck_loop(self):
logger.debug("[DB] Healthcheck loop running")
while not self._closing:
await asyncio.sleep(self._health_interval)
try:
async with self.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
except Exception as e:
logger.debug("[DB] Pool healthcheck succeeded")
except pymysql.err.Error as e:
logger.warning(f"[DB] Healthcheck failed: {e}")
@asynccontextmanager
async def acquire(self):
logger.debug("[DB] Acquiring connection from pool")
conn = await self._pool.acquire()
logger.debug(f"[DB] Connection acquired: {conn}")
try:
yield conn
finally:
self._pool.release(conn)
logger.debug(f"[DB] Connection released: {conn}")
async def release(self, conn):
if self._pool:
self._pool.release(conn)
logger.debug(f"[DB] Connection manually released: {conn}")
async def close(self):
logger.info("[DB] Closing connection pool...")
self._closing = True
if self._health_task and not self._health_task.done():
self._health_task.cancel()
try:
await self._health_task
logger.debug("[DB] Healthcheck task cancelled")
except asyncio.CancelledError:
pass
logger.debug("[DB] Healthcheck task cancellation caught")
if self._pool:
self._pool.close()
await self._pool.wait_closed()

View File

@ -1,4 +1,9 @@
import logging
import os
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
LOG_LEVEL = "INFO"
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
@ -9,5 +14,6 @@ def setup_logger(name: str) -> logging.Logger:
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.setLevel(getattr(logging, LOG_LEVEL))
logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}")
return logger

View File

@ -23,8 +23,9 @@ RABBITMQ_URL = f"amqp://{rmq_username}:{rmq_password}@{rmq_host}/{rmq_vhost}"
class RabbitMQConsumer:
def __init__(self, url=RABBITMQ_URL, db_manager=db_manager, exchange_name=rmq_exchange):
def __init__(self, url=RABBITMQ_URL, db_manager=db_manager, exchange_name=rmq_exchange,rmq_host=rmq_host):
self.url = url
self.rmq_host = rmq_host
self.exchange_name = exchange_name
self.connection: aio_pika.RobustConnection | None = None
self.channel: aio_pika.RobustChannel | None = None
@ -37,8 +38,11 @@ class RabbitMQConsumer:
self.dlq_queue_name = "notifications_dlq"
self.max_retries = 5
self.retry_ttl = 5 * 60 * 10000
logger.debug(f"[RabbitMQ] Initialized consumer for exchange {self.exchange_name}")
async def connect(self):
logger.info(f"[RabbitMQ] Connecting to {self.rmq_host}...")
self.connection = await aio_pika.connect_robust(self.url)
self.channel = await self.connection.channel()
self.exchange = await self.channel.declare_exchange(
@ -46,6 +50,8 @@ class RabbitMQConsumer:
)
self.queue = await self.channel.declare_queue(self.queue_name, durable=True)
await self.queue.bind(self.exchange, routing_key="notify.user.*")
logger.debug(f"[RabbitMQ] Bound queue '{self.queue_name}' to exchange '{self.exchange_name}'")
retry_queue_args = {
"x-message-ttl": self.retry_ttl,
"x-dead-letter-exchange": self.exchange_name,
@ -61,7 +67,7 @@ class RabbitMQConsumer:
asyncio.create_task(metrics.update_queue_gauge(self.channel, self.retry_queue_name))
asyncio.create_task(metrics.update_queue_gauge(self.channel, self.dlq_queue_name))
logger.info("[RMQ] Connected, exchange, retry, and DLQ queues ready.")
logger.info("[RabbitMQ] Connected, exchange, retry, and DLQ queues ready.")
async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage):
data = json.loads(message.body.decode())
@ -74,7 +80,7 @@ class RabbitMQConsumer:
),
routing_key=self.dlq_queue_name
)
logger.warning(f"Message sent to DLQ: {data['uuid']}")
logger.warning(f"[RabbitMQ] Message sent to DLQ: {data['uuid']}")
metrics.MSG_FAILED.inc()
async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage):
@ -88,8 +94,10 @@ class RabbitMQConsumer:
uuid=uuid,
retry_count=retry_count
).inc()
logger.debug(f"[RabbitMQ] Metrics updated for retry: uuid={uuid}, retry_count={retry_count}")
if retry_count > self.max_retries:
logger.debug(f"[RabbitMQ] Retry count exceeded max_retries={self.max_retries}, sending to DLQ")
await self.send_message_to_dlq(uuid, message)
return
await self.channel.default_exchange.publish(
@ -100,23 +108,25 @@ class RabbitMQConsumer:
),
routing_key=self.retry_queue_name
)
logger.info(f"Message requeued for retry #{retry_count}")
logger.info(f"[RabbitMQ] Message requeued for retry #{retry_count}")
async def handle_message(self, message: aio_pika.IncomingMessage):
if self._closing:
logger.debug("[RabbitMQ] Skipping message because consumer is closing")
return
async with message.process():
try:
data = json.loads(message.body.decode())
logger.info(f"[RMQ] Received: {data}")
logger.debug(f"[RabbitMQ] Received message: {data}")
uuid = data.get("uuid")
if uuid:
encrypted_tokens = await database_lookup_by_uuid(uuid, db_manager)
else:
encrypted_tokens = await database_lookup_by_user_id(message.routing_key, db_manager)
if not encrypted_tokens:
logger.warning(f"No push tokens found for user {message.routing_key}")
logger.warning(f"[RabbitMQ] No push tokens found for routing key {message.routing_key}")
return
token_map = {row["uuid"]: row["token"].decode() for row in encrypted_tokens}
@ -127,13 +137,13 @@ class RabbitMQConsumer:
await self.validate_delivery(response,message)
except json.JSONDecodeError as e:
logger.error(f"[RMQ] Bad message, discarding: {e}")
logger.error(f"[RabbitMQ] Bad message, discarding: {e}", exc_info=True)
await message.nack(requeue=False)
except AMQPException as e:
logger.error(f"[RMQ] RabbitMQ error: {e}")
logger.error(f"[RabbitMQ] RabbitMQ error: {e}", exc_info=True)
await message.nack(requeue=True)
except Exception as e:
logger.critical(f"[RMQ] Fatal error: {e}")
logger.critical(f"[RabbitMQ] Fatal error: {e}", exc_info=True)
raise
async def validate_delivery(self,response,message: aio_pika.IncomingMessage):
@ -143,45 +153,53 @@ class RabbitMQConsumer:
api_status = data_list[0].get("status") if data_list else None
if status == "ok" and api_status == "ok":
logger.info(f"Notification delivered successfully to {uuid}")
logger.info(f"[RabbitMQ] Notification delivered successfully to {uuid}")
metrics.MSG_PUBLISHED.inc()
logger.debug(f"[RabbitMQ] Metrics updated for published message: uuid={uuid}")
if status == "ok" and api_status == "error":
api_error = data_list[0].get("details", {}).get("error")
if api_error == "DeviceNotRegistered":
expired = await remove_inactive_push_token(uuid, self.db_manager)
if expired:
logger.info(f"Device no longer registered for uuid {uuid}, setting as expired")
logger.info(f"[RabbitMQ] Device no longer registered for uuid {uuid}, marked expired")
else:
logger.error(f"Failed expiring token for uuid: {uuid}")
logger.error(f"[RabbitMQ] Failed expiring token for uuid: {uuid}")
else:
await self.send_message_to_dlq(uuid, message)
logger.debug(f"[RabbitMQ] Message sent to DLQ due to API error: uuid={uuid}")
if status == "error":
await self.send_message_to_dlq(uuid, message)
logger.debug(f"[RabbitMQ] Message sent to DLQ due to status=error: uuid={uuid}")
if status == "failure":
await self.send_message_to_retry_queue(uuid, message)
logger.debug(f"[RabbitMQ] Message requeued for retry due to status=failure: uuid={uuid}")
async def consume(self):
if self._closing:
logger.debug("[RabbitMQ] Consumer is closing, not starting consume")
return
if not self.queue:
raise RuntimeError("Queue not initialized")
raise RuntimeError("[RabbitMQ] Queue not initialized")
await self.queue.consume(self.handle_message, no_ack=False)
logger.info("[RMQ] Consuming messages...")
logger.info("[RabbitMQ] Consuming messages...")
async def close(self):
self._closing = True
if self.connection:
await self.connection.close()
logger.info("[aio-pika] Connection closed.")
logger.info("[RabbitMQ] Connection closed")
async def main():
logger.info("Starting application")
logger.info("[APP] Starting application")
await db_manager.connect()
consumer = RabbitMQConsumer(db_manager=db_manager)
await consumer.connect()
@ -192,7 +210,7 @@ async def main():
asyncio.get_running_loop().add_signal_handler(sig, stop_event.set)
await stop_event.wait()
logger.info("Stopping application")
logger.info("[APP] Stopping application")
await consumer.close()
await db_manager.close()

View File

@ -35,6 +35,7 @@ def return_credentials(path: str)->str:
async def database_lookup_by_user_id(routing_key: str, db_manager):
try:
user_id = int(routing_key.split('.')[-1])
logger.debug(f"[DB] Looking up tokens for user_id={user_id}")
except ValueError:
logger.error(f"[DB] Invalid user id supplied:{routing_key}")
return []
@ -43,22 +44,34 @@ async def database_lookup_by_user_id(routing_key: str, db_manager):
async with conn.cursor() as cur:
await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE user_id=%s",
(user_id,))
logger.debug(f"[DB] Executed query for user_id={user_id}")
if cur.description:
return await cur.fetchall()
rows = await cur.fetchall()
logger.debug(f"[DB] Retrieved {len(rows)} tokens for user_id={user_id}")
return rows
logger.debug(f"[DB] No tokens found for user_id={user_id}")
return []
async def database_lookup_by_uuid(uuid: str, db_manager):
logger.debug(f"[DB] Looking up token for uuid={uuid}")
async with db_manager.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE token_id=%s",
(uuid,))
logger.debug(f"[DB] Executed query for uuid={uuid}")
if cur.description:
return await cur.fetchall()
rows = await cur.fetchall()
logger.debug(f"[DB] Retrieved {len(rows)} tokens for uuid={uuid}")
return rows
logger.debug(f"[DB] No token found for uuid={uuid}")
return []
async def remove_inactive_push_token(uuid :str, db_manager):
logger.debug(f"[DB] Expiring token for uuid={uuid}")
async with db_manager.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("UPDATE device_tokens SET status='expired' WHERE token_id=%s",
(uuid,))
return cur.rowcount > 0
success = cur.rowcount > 0
logger.debug(f"[DB] Token expiration for uuid={uuid} success={success}")
return success

View File

@ -15,20 +15,18 @@ retryable_exceptions = (
)
async def send_notification(
message: dict,
push_tokens,
max_retries: int = 5,
timeout: int = 5,
):
async def send_notification(message: dict, push_tokens, max_retries: int = 5, timeout: int = 5):
logger.debug(f"[Notification] Sending to {len(push_tokens)} push tokens")
results = {}
for uuid, token in push_tokens.items():
logger.debug(f"[Notification] Preparing to send to uuid={uuid}")
results[uuid] = await _send_to_token(token, uuid, message, max_retries, timeout)
return results
async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int):
payload = create_payload(token, message)
logger.debug(f"[Notification] Payload for uuid={uuid}: {payload}")
resolver = aiohttp.AsyncResolver()
connector = aiohttp.TCPConnector(resolver=resolver)
@ -47,18 +45,19 @@ async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int,
return {"status":"ok","data":data}
except retryable_exceptions as e:
logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}")
logger.warning(f"[Notification] Attempt {attempt}/{max_retries} failed for uuid={uuid}: {type(e).__name__}: {e}", exc_info=True)
await asyncio.sleep(2 ** (attempt - 1))
logger.debug(f"[Notification] Retrying uuid={uuid}, attempt {attempt + 1}")
except Exception as e:
logger.error(f"Unexpected failure for uuid {uuid}: {e}")
logger.error(f"[Notification] Unexpected failure for uuid={uuid}: {e}", exc_info=True)
return {"status": "error", "exception": str(e)}
logger.error(f"Failed to send notification to uuid {uuid} after {max_retries} attempts")
return {"status": "failure"}
def create_payload(push_token: str, message: dict) -> dict:
return {
payload = {
"to": push_token,
"title": message.get("title"),
"body": message.get("body"),
@ -70,3 +69,5 @@ def create_payload(push_token: str, message: dict) -> dict:
"sound": "default",
"priority": "high"
}
logger.debug(f"[Notification] Created payload: {payload}")
return payload