Unified logging behaviour
- Logger doesn't start with log level DEBUG by default, instead reads a environment variable - Added extensive debug logging - Wrote a readme - Changed database healthcheck loop to only catch pymsql errors
This commit is contained in:
parent
37476c814e
commit
b272e069d4
38
README.md
38
README.md
@ -1,2 +1,38 @@
|
||||
# backend-push-notifications
|
||||
# Backend Push Notifications
|
||||
|
||||
|
||||
## Overview
|
||||
This part of the backend handles device push notifications for registered users. It consumes messages from RabbitMQ, looks up user device tokens from a MySQL database, and sends notifications via Expo's API. Failed deliveries are retried or sent to a dead-letter queue.
|
||||
|
||||
|
||||
## Requirements
|
||||
- Python 3.12+
|
||||
- MySQL database
|
||||
- RabbitMQ
|
||||
- Python packages from requirements.txt
|
||||
|
||||
|
||||
## Configuration
|
||||
|
||||
**Environment variables:**
|
||||
|
||||
- `LOG_LEVEL` (DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
||||
|
||||
- `RABBITMQ_URL`
|
||||
|
||||
- `MYSQL_HOST`, `MYSQL_USER`, `MYSQL_PASSWORD`, `MYSQL_DATABASE`
|
||||
|
||||
- `API_ENDPOINT` (Expo push API)
|
||||
|
||||
|
||||
## Metrics
|
||||
|
||||
Metrics are exposed on port `9000` for Prometheus, including:
|
||||
|
||||
- `MSG_PUBLISHED`
|
||||
|
||||
- `MSG_FAILED`
|
||||
|
||||
- `MSG_RETRY`
|
||||
|
||||
- Queue sizes
|
||||
|
||||
17
src/db.py
17
src/db.py
@ -4,7 +4,7 @@ import asyncio
|
||||
from secret_handler import return_credentials
|
||||
import os
|
||||
from logger_handler import setup_logger
|
||||
|
||||
import pymysql.err
|
||||
|
||||
db_username = return_credentials("/etc/secrets/db_username")
|
||||
db_password = return_credentials("/etc/secrets/db_password")
|
||||
@ -26,6 +26,7 @@ class DBManager:
|
||||
self._health_interval = health_interval
|
||||
self._health_task: asyncio.Task | None = None
|
||||
self._closing = False
|
||||
logger.debug(f"[DB] Initialized DBManager: host={host}, db={db}, port={port}, pool_size={pool_size}")
|
||||
|
||||
async def connect(self):
|
||||
self._pool = await aiomysql.create_pool(
|
||||
@ -40,38 +41,48 @@ class DBManager:
|
||||
cursorclass=aiomysql.DictCursor
|
||||
)
|
||||
logger.info("[DB] Connection pool created")
|
||||
logger.debug(f"[DB] Pool object: {self._pool}")
|
||||
self._health_task = asyncio.create_task(self._healthcheck_loop())
|
||||
logger.debug("[DB] Healthcheck task started")
|
||||
|
||||
async def _healthcheck_loop(self):
|
||||
logger.debug("[DB] Healthcheck loop running")
|
||||
while not self._closing:
|
||||
await asyncio.sleep(self._health_interval)
|
||||
try:
|
||||
async with self.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
await cur.execute("SELECT 1")
|
||||
except Exception as e:
|
||||
logger.debug("[DB] Pool healthcheck succeeded")
|
||||
except pymysql.err.Error as e:
|
||||
logger.warning(f"[DB] Healthcheck failed: {e}")
|
||||
|
||||
@asynccontextmanager
|
||||
async def acquire(self):
|
||||
logger.debug("[DB] Acquiring connection from pool")
|
||||
conn = await self._pool.acquire()
|
||||
logger.debug(f"[DB] Connection acquired: {conn}")
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
self._pool.release(conn)
|
||||
logger.debug(f"[DB] Connection released: {conn}")
|
||||
|
||||
async def release(self, conn):
|
||||
if self._pool:
|
||||
self._pool.release(conn)
|
||||
logger.debug(f"[DB] Connection manually released: {conn}")
|
||||
|
||||
async def close(self):
|
||||
logger.info("[DB] Closing connection pool...")
|
||||
self._closing = True
|
||||
if self._health_task and not self._health_task.done():
|
||||
self._health_task.cancel()
|
||||
try:
|
||||
await self._health_task
|
||||
logger.debug("[DB] Healthcheck task cancelled")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.debug("[DB] Healthcheck task cancellation caught")
|
||||
if self._pool:
|
||||
self._pool.close()
|
||||
await self._pool.wait_closed()
|
||||
|
||||
@ -1,4 +1,9 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
|
||||
LOG_LEVEL = "INFO"
|
||||
|
||||
def setup_logger(name: str) -> logging.Logger:
|
||||
logger = logging.getLogger(name)
|
||||
@ -9,5 +14,6 @@ def setup_logger(name: str) -> logging.Logger:
|
||||
)
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.setLevel(getattr(logging, LOG_LEVEL))
|
||||
logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}")
|
||||
return logger
|
||||
|
||||
@ -23,8 +23,9 @@ RABBITMQ_URL = f"amqp://{rmq_username}:{rmq_password}@{rmq_host}/{rmq_vhost}"
|
||||
|
||||
|
||||
class RabbitMQConsumer:
|
||||
def __init__(self, url=RABBITMQ_URL, db_manager=db_manager, exchange_name=rmq_exchange):
|
||||
def __init__(self, url=RABBITMQ_URL, db_manager=db_manager, exchange_name=rmq_exchange,rmq_host=rmq_host):
|
||||
self.url = url
|
||||
self.rmq_host = rmq_host
|
||||
self.exchange_name = exchange_name
|
||||
self.connection: aio_pika.RobustConnection | None = None
|
||||
self.channel: aio_pika.RobustChannel | None = None
|
||||
@ -37,8 +38,11 @@ class RabbitMQConsumer:
|
||||
self.dlq_queue_name = "notifications_dlq"
|
||||
self.max_retries = 5
|
||||
self.retry_ttl = 5 * 60 * 10000
|
||||
logger.debug(f"[RabbitMQ] Initialized consumer for exchange {self.exchange_name}")
|
||||
|
||||
|
||||
async def connect(self):
|
||||
logger.info(f"[RabbitMQ] Connecting to {self.rmq_host}...")
|
||||
self.connection = await aio_pika.connect_robust(self.url)
|
||||
self.channel = await self.connection.channel()
|
||||
self.exchange = await self.channel.declare_exchange(
|
||||
@ -46,6 +50,8 @@ class RabbitMQConsumer:
|
||||
)
|
||||
self.queue = await self.channel.declare_queue(self.queue_name, durable=True)
|
||||
await self.queue.bind(self.exchange, routing_key="notify.user.*")
|
||||
logger.debug(f"[RabbitMQ] Bound queue '{self.queue_name}' to exchange '{self.exchange_name}'")
|
||||
|
||||
retry_queue_args = {
|
||||
"x-message-ttl": self.retry_ttl,
|
||||
"x-dead-letter-exchange": self.exchange_name,
|
||||
@ -61,7 +67,7 @@ class RabbitMQConsumer:
|
||||
asyncio.create_task(metrics.update_queue_gauge(self.channel, self.retry_queue_name))
|
||||
asyncio.create_task(metrics.update_queue_gauge(self.channel, self.dlq_queue_name))
|
||||
|
||||
logger.info("[RMQ] Connected, exchange, retry, and DLQ queues ready.")
|
||||
logger.info("[RabbitMQ] Connected, exchange, retry, and DLQ queues ready.")
|
||||
|
||||
async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage):
|
||||
data = json.loads(message.body.decode())
|
||||
@ -74,7 +80,7 @@ class RabbitMQConsumer:
|
||||
),
|
||||
routing_key=self.dlq_queue_name
|
||||
)
|
||||
logger.warning(f"Message sent to DLQ: {data['uuid']}")
|
||||
logger.warning(f"[RabbitMQ] Message sent to DLQ: {data['uuid']}")
|
||||
metrics.MSG_FAILED.inc()
|
||||
|
||||
async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage):
|
||||
@ -88,8 +94,10 @@ class RabbitMQConsumer:
|
||||
uuid=uuid,
|
||||
retry_count=retry_count
|
||||
).inc()
|
||||
logger.debug(f"[RabbitMQ] Metrics updated for retry: uuid={uuid}, retry_count={retry_count}")
|
||||
|
||||
if retry_count > self.max_retries:
|
||||
logger.debug(f"[RabbitMQ] Retry count exceeded max_retries={self.max_retries}, sending to DLQ")
|
||||
await self.send_message_to_dlq(uuid, message)
|
||||
return
|
||||
await self.channel.default_exchange.publish(
|
||||
@ -100,23 +108,25 @@ class RabbitMQConsumer:
|
||||
),
|
||||
routing_key=self.retry_queue_name
|
||||
)
|
||||
logger.info(f"Message requeued for retry #{retry_count}")
|
||||
logger.info(f"[RabbitMQ] Message requeued for retry #{retry_count}")
|
||||
|
||||
async def handle_message(self, message: aio_pika.IncomingMessage):
|
||||
if self._closing:
|
||||
logger.debug("[RabbitMQ] Skipping message because consumer is closing")
|
||||
return
|
||||
async with message.process():
|
||||
try:
|
||||
data = json.loads(message.body.decode())
|
||||
logger.info(f"[RMQ] Received: {data}")
|
||||
logger.debug(f"[RabbitMQ] Received message: {data}")
|
||||
uuid = data.get("uuid")
|
||||
|
||||
if uuid:
|
||||
encrypted_tokens = await database_lookup_by_uuid(uuid, db_manager)
|
||||
else:
|
||||
encrypted_tokens = await database_lookup_by_user_id(message.routing_key, db_manager)
|
||||
|
||||
if not encrypted_tokens:
|
||||
logger.warning(f"No push tokens found for user {message.routing_key}")
|
||||
logger.warning(f"[RabbitMQ] No push tokens found for routing key {message.routing_key}")
|
||||
return
|
||||
|
||||
token_map = {row["uuid"]: row["token"].decode() for row in encrypted_tokens}
|
||||
@ -127,13 +137,13 @@ class RabbitMQConsumer:
|
||||
await self.validate_delivery(response,message)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"[RMQ] Bad message, discarding: {e}")
|
||||
logger.error(f"[RabbitMQ] Bad message, discarding: {e}", exc_info=True)
|
||||
await message.nack(requeue=False)
|
||||
except AMQPException as e:
|
||||
logger.error(f"[RMQ] RabbitMQ error: {e}")
|
||||
logger.error(f"[RabbitMQ] RabbitMQ error: {e}", exc_info=True)
|
||||
await message.nack(requeue=True)
|
||||
except Exception as e:
|
||||
logger.critical(f"[RMQ] Fatal error: {e}")
|
||||
logger.critical(f"[RabbitMQ] Fatal error: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def validate_delivery(self,response,message: aio_pika.IncomingMessage):
|
||||
@ -143,45 +153,53 @@ class RabbitMQConsumer:
|
||||
api_status = data_list[0].get("status") if data_list else None
|
||||
|
||||
if status == "ok" and api_status == "ok":
|
||||
logger.info(f"Notification delivered successfully to {uuid}")
|
||||
logger.info(f"[RabbitMQ] Notification delivered successfully to {uuid}")
|
||||
metrics.MSG_PUBLISHED.inc()
|
||||
logger.debug(f"[RabbitMQ] Metrics updated for published message: uuid={uuid}")
|
||||
|
||||
|
||||
if status == "ok" and api_status == "error":
|
||||
api_error = data_list[0].get("details", {}).get("error")
|
||||
if api_error == "DeviceNotRegistered":
|
||||
expired = await remove_inactive_push_token(uuid, self.db_manager)
|
||||
if expired:
|
||||
logger.info(f"Device no longer registered for uuid {uuid}, setting as expired")
|
||||
logger.info(f"[RabbitMQ] Device no longer registered for uuid {uuid}, marked expired")
|
||||
else:
|
||||
logger.error(f"Failed expiring token for uuid: {uuid}")
|
||||
logger.error(f"[RabbitMQ] Failed expiring token for uuid: {uuid}")
|
||||
else:
|
||||
await self.send_message_to_dlq(uuid, message)
|
||||
logger.debug(f"[RabbitMQ] Message sent to DLQ due to API error: uuid={uuid}")
|
||||
|
||||
|
||||
if status == "error":
|
||||
await self.send_message_to_dlq(uuid, message)
|
||||
logger.debug(f"[RabbitMQ] Message sent to DLQ due to status=error: uuid={uuid}")
|
||||
|
||||
if status == "failure":
|
||||
await self.send_message_to_retry_queue(uuid, message)
|
||||
logger.debug(f"[RabbitMQ] Message requeued for retry due to status=failure: uuid={uuid}")
|
||||
|
||||
|
||||
|
||||
async def consume(self):
|
||||
if self._closing:
|
||||
logger.debug("[RabbitMQ] Consumer is closing, not starting consume")
|
||||
return
|
||||
if not self.queue:
|
||||
raise RuntimeError("Queue not initialized")
|
||||
raise RuntimeError("[RabbitMQ] Queue not initialized")
|
||||
await self.queue.consume(self.handle_message, no_ack=False)
|
||||
logger.info("[RMQ] Consuming messages...")
|
||||
logger.info("[RabbitMQ] Consuming messages...")
|
||||
|
||||
async def close(self):
|
||||
self._closing = True
|
||||
if self.connection:
|
||||
await self.connection.close()
|
||||
logger.info("[aio-pika] Connection closed.")
|
||||
logger.info("[RabbitMQ] Connection closed")
|
||||
|
||||
|
||||
|
||||
async def main():
|
||||
logger.info("Starting application")
|
||||
logger.info("[APP] Starting application")
|
||||
await db_manager.connect()
|
||||
consumer = RabbitMQConsumer(db_manager=db_manager)
|
||||
await consumer.connect()
|
||||
@ -192,7 +210,7 @@ async def main():
|
||||
asyncio.get_running_loop().add_signal_handler(sig, stop_event.set)
|
||||
|
||||
await stop_event.wait()
|
||||
logger.info("Stopping application")
|
||||
logger.info("[APP] Stopping application")
|
||||
await consumer.close()
|
||||
await db_manager.close()
|
||||
|
||||
|
||||
@ -35,6 +35,7 @@ def return_credentials(path: str)->str:
|
||||
async def database_lookup_by_user_id(routing_key: str, db_manager):
|
||||
try:
|
||||
user_id = int(routing_key.split('.')[-1])
|
||||
logger.debug(f"[DB] Looking up tokens for user_id={user_id}")
|
||||
except ValueError:
|
||||
logger.error(f"[DB] Invalid user id supplied:{routing_key}")
|
||||
return []
|
||||
@ -43,22 +44,34 @@ async def database_lookup_by_user_id(routing_key: str, db_manager):
|
||||
async with conn.cursor() as cur:
|
||||
await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE user_id=%s",
|
||||
(user_id,))
|
||||
logger.debug(f"[DB] Executed query for user_id={user_id}")
|
||||
if cur.description:
|
||||
return await cur.fetchall()
|
||||
rows = await cur.fetchall()
|
||||
logger.debug(f"[DB] Retrieved {len(rows)} tokens for user_id={user_id}")
|
||||
return rows
|
||||
logger.debug(f"[DB] No tokens found for user_id={user_id}")
|
||||
return []
|
||||
|
||||
async def database_lookup_by_uuid(uuid: str, db_manager):
|
||||
logger.debug(f"[DB] Looking up token for uuid={uuid}")
|
||||
async with db_manager.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE token_id=%s",
|
||||
(uuid,))
|
||||
logger.debug(f"[DB] Executed query for uuid={uuid}")
|
||||
if cur.description:
|
||||
return await cur.fetchall()
|
||||
rows = await cur.fetchall()
|
||||
logger.debug(f"[DB] Retrieved {len(rows)} tokens for uuid={uuid}")
|
||||
return rows
|
||||
logger.debug(f"[DB] No token found for uuid={uuid}")
|
||||
return []
|
||||
|
||||
async def remove_inactive_push_token(uuid :str, db_manager):
|
||||
logger.debug(f"[DB] Expiring token for uuid={uuid}")
|
||||
async with db_manager.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
await cur.execute("UPDATE device_tokens SET status='expired' WHERE token_id=%s",
|
||||
(uuid,))
|
||||
return cur.rowcount > 0
|
||||
success = cur.rowcount > 0
|
||||
logger.debug(f"[DB] Token expiration for uuid={uuid} success={success}")
|
||||
return success
|
||||
|
||||
@ -15,20 +15,18 @@ retryable_exceptions = (
|
||||
)
|
||||
|
||||
|
||||
async def send_notification(
|
||||
message: dict,
|
||||
push_tokens,
|
||||
max_retries: int = 5,
|
||||
timeout: int = 5,
|
||||
):
|
||||
|
||||
async def send_notification(message: dict, push_tokens, max_retries: int = 5, timeout: int = 5):
|
||||
logger.debug(f"[Notification] Sending to {len(push_tokens)} push tokens")
|
||||
results = {}
|
||||
for uuid, token in push_tokens.items():
|
||||
logger.debug(f"[Notification] Preparing to send to uuid={uuid}")
|
||||
results[uuid] = await _send_to_token(token, uuid, message, max_retries, timeout)
|
||||
return results
|
||||
|
||||
async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int):
|
||||
payload = create_payload(token, message)
|
||||
logger.debug(f"[Notification] Payload for uuid={uuid}: {payload}")
|
||||
|
||||
resolver = aiohttp.AsyncResolver()
|
||||
connector = aiohttp.TCPConnector(resolver=resolver)
|
||||
|
||||
@ -47,18 +45,19 @@ async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int,
|
||||
return {"status":"ok","data":data}
|
||||
|
||||
except retryable_exceptions as e:
|
||||
logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}")
|
||||
logger.warning(f"[Notification] Attempt {attempt}/{max_retries} failed for uuid={uuid}: {type(e).__name__}: {e}", exc_info=True)
|
||||
await asyncio.sleep(2 ** (attempt - 1))
|
||||
logger.debug(f"[Notification] Retrying uuid={uuid}, attempt {attempt + 1}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected failure for uuid {uuid}: {e}")
|
||||
logger.error(f"[Notification] Unexpected failure for uuid={uuid}: {e}", exc_info=True)
|
||||
return {"status": "error", "exception": str(e)}
|
||||
|
||||
logger.error(f"Failed to send notification to uuid {uuid} after {max_retries} attempts")
|
||||
return {"status": "failure"}
|
||||
|
||||
def create_payload(push_token: str, message: dict) -> dict:
|
||||
return {
|
||||
payload = {
|
||||
"to": push_token,
|
||||
"title": message.get("title"),
|
||||
"body": message.get("body"),
|
||||
@ -70,3 +69,5 @@ def create_payload(push_token: str, message: dict) -> dict:
|
||||
"sound": "default",
|
||||
"priority": "high"
|
||||
}
|
||||
logger.debug(f"[Notification] Created payload: {payload}")
|
||||
return payload
|
||||
Loading…
x
Reference in New Issue
Block a user