From d193bc05f18453a899b2e646c6f4431d89f71cc5 Mon Sep 17 00:00:00 2001 From: Florian Date: Mon, 13 Oct 2025 11:35:07 +0200 Subject: [PATCH] First commit --- Dockerfile | 11 +++++ requirements.txt | 21 +++++++++ src/db.py | 93 ++++++++++++++++++++++++++++++++++++++++ src/logger_handler.py | 13 ++++++ src/rabbitmq_handler.py | 87 +++++++++++++++++++++++++++++++++++++ src/secret_handler.py | 12 ++++++ src/send_notification.py | 83 +++++++++++++++++++++++++++++++++++ 7 files changed, 320 insertions(+) create mode 100644 Dockerfile create mode 100644 requirements.txt create mode 100644 src/db.py create mode 100644 src/logger_handler.py create mode 100644 src/rabbitmq_handler.py create mode 100644 src/secret_handler.py create mode 100644 src/send_notification.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..a54e052 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12-slim + +COPY requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +WORKDIR /app + +COPY src/ /app/ + +ENTRYPOINT ["python","main.py"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f5290a2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,21 @@ +aio-pika==9.5.7 +aiodns==3.5.0 +aiohappyeyeballs==2.6.1 +aiohttp==3.13.0 +aiomysql==0.2.0 +aiormq==6.9.0 +aiosignal==1.4.0 +attrs==25.4.0 +Brotli==1.1.0 +cffi==2.0.0 +frozenlist==1.8.0 +idna==3.11 +multidict==6.7.0 +pamqp==3.3.0 +propcache==0.4.1 +pycares==4.11.0 +pycparser==2.23 +PyMySQL==1.1.2 +typing_extensions==4.15.0 +yarl==1.22.0 +zstandard==0.25.0 diff --git a/src/db.py b/src/db.py new file mode 100644 index 0000000..8a22a68 --- /dev/null +++ b/src/db.py @@ -0,0 +1,93 @@ +import aiomysql +import asyncio +from secret_handler import return_credentials +import os +from logger_handler import setup_logger + + +db_username = return_credentials("/etc/secrets/db_username") +db_password = return_credentials("/etc/secrets/db_password") +db_host = os.getenv("BACKEND_PN_DB_HOST","localhost") +db_database = os.getenv("BACKEND_PN_DB_DATABASE","app") + +logger = setup_logger(__name__) + + +class DBManager: + def __init__(self, host, user, password, db, port=3306, pool_size=5, health_interval=60): + self.host = host + self.user = user + self.password = password + self.db = db + self.port = port + self.pool_size = pool_size + self._pool: aiomysql.Pool | None = None + self._health_interval = health_interval + self._health_task: asyncio.Task | None = None + self._closing = False + + async def connect(self): + self._pool = await aiomysql.create_pool( + host=self.host, + user=self.user, + password=self.password, + db=self.db, + port=self.port, + minsize=1, + maxsize=self.pool_size, + autocommit=True, + cursorclass=aiomysql.DictCursor + ) + logger.info("[DB] Connection pool created") + self._health_task = asyncio.create_task(self._healthcheck_loop()) + + async def _healthcheck_loop(self): + while not self._closing: + await asyncio.sleep(self._health_interval) + try: + async with self.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT 1") + logger.debug("[DB] Healthcheck OK") + except Exception as e: + logger.warning(f"[DB] Healthcheck failed: {e}") + + async def acquire(self): + if not self._pool: + raise RuntimeError("DB pool not initialized") + return await self._pool.acquire() + + async def release(self, conn): + if self._pool: + self._pool.release(conn) + + async def execute(self, query, *args, retries=3): + for attempt in range(1, retries + 1): + conn = await self.acquire() + try: + async with conn.cursor() as cur: + await cur.execute(query, args) + if cur.description: + return await cur.fetchall() + return None + except aiomysql.OperationalError as e: + logger.warning(f"[DB] Query failed (attempt {attempt}/{retries}): {e}") + await asyncio.sleep(2 ** (attempt - 1)) + finally: + await self.release(conn) + raise RuntimeError("DB query failed after retries") + + async def close(self): + self._closing = True + if self._health_task and not self._health_task.done(): + self._health_task.cancel() + try: + await self._health_task + except asyncio.CancelledError: + pass + if self._pool: + self._pool.close() + await self._pool.wait_closed() + logger.info("[DB] Connection pool closed") + +db_manager = DBManager(host=db_host, port=3306, user=db_username, password=db_password, db=db_database) \ No newline at end of file diff --git a/src/logger_handler.py b/src/logger_handler.py new file mode 100644 index 0000000..25c121d --- /dev/null +++ b/src/logger_handler.py @@ -0,0 +1,13 @@ +import logging + +def setup_logger(name: str) -> logging.Logger: + logger = logging.getLogger(name) + if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + return logger diff --git a/src/rabbitmq_handler.py b/src/rabbitmq_handler.py new file mode 100644 index 0000000..981d8cf --- /dev/null +++ b/src/rabbitmq_handler.py @@ -0,0 +1,87 @@ +import asyncio +import aio_pika +from aio_pika.exceptions import AMQPException +from secret_handler import return_credentials +import os +from logger_handler import setup_logger +import json +from db import db_manager +from send_notification import send_notification + +logger = setup_logger(__name__) + + +rmq_username = return_credentials("/etc/secrets/rmq_username") +rmq_password = return_credentials("/etc/secrets/rmq_password") +rmq_host = os.getenv("BACKEND_API_INTERNAL_RMQ_HOST", "localhost") +rmq_vhost = os.getenv("BACKEND_API_INTERNAL_RMQ_VHOST", "app_notifications") +rmq_exchange = os.getenv("BACKEND_API_INTERNAL_RMQ_EXCHANGE", "app_notifications") + +RABBITMQ_URL = f"amqp://{rmq_username}:{rmq_password}@{rmq_host}/{rmq_vhost}" + + +class RabbitMQConsumer: + def __init__(self, url=RABBITMQ_URL, db_manager=db_manager, exchange_name=rmq_exchange): + self.url = url + self.exchange_name = exchange_name + self.connection: aio_pika.RobustConnection | None = None + self.channel: aio_pika.RobustChannel | None = None + self.exchange: aio_pika.Exchange | None = None + self.queue: aio_pika.Queue | None = None + self._closing = False + self.db_manager = db_manager + + async def connect(self): + self.connection = await aio_pika.connect_robust(self.url) + self.channel = await self.connection.channel() + self.exchange = await self.channel.declare_exchange( + self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True + ) + self.queue = await self.channel.declare_queue("backend_push_notifications", durable=True) + await self.queue.bind(self.exchange, routing_key="notify.user.*") + logger.info("[Consumer] Connected, queue bound to notify.user.*") + + async def handle_message(self, message: aio_pika.IncomingMessage): + if self._closing: + return + async with message.process(): + try: + data = json.loads(message.body.decode()) + logger.info(f"[Consumer] Received: {data}") + await send_notification(routing_key=message.routing_key,message=data,db_manager=self.db_manager) + except json.JSONDecodeError as e: + logger.error(f"[Consumer] Bad message, discarding: {e}") + await message.nack(requeue=False) + except AMQPException as e: + logger.error(f"[Consumer] RabbitMQ error: {e}") + await message.nack(requeue=True) + except Exception as e: + logger.critical(f"[Consumer] Fatal error: {e}") + raise + + async def consume(self): + if self._closing: + return + if not self.queue: + raise RuntimeError("Queue not initialized") + await self.queue.consume(self.handle_message, no_ack=False) + logger.info("[Consumer] Consuming messages...") + + async def close(self): + self._closing = True + if self.connection: + await self.connection.close() + logger.info("[aio-pika] Connection closed.") + + + +async def main(): + consumer = RabbitMQConsumer(db_manager=db_manager) + await consumer.connect() + logger.info("Creating MySQL connection pool...") + await db_manager.connect() + + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/src/secret_handler.py b/src/secret_handler.py new file mode 100644 index 0000000..33d66a5 --- /dev/null +++ b/src/secret_handler.py @@ -0,0 +1,12 @@ +import sys + +def return_credentials(path: str)->str: + try: + with open (path) as file: + return file.read().strip() + except FileNotFoundError: + print(f"[FATAL] Secret file not found: {path}") + sys.exit(1) + except Exception as e: + print(f"[FATAL] Failed to read secret file {path}: {e}") + sys.exit(1) diff --git a/src/send_notification.py b/src/send_notification.py new file mode 100644 index 0000000..90b63cc --- /dev/null +++ b/src/send_notification.py @@ -0,0 +1,83 @@ +import aiohttp +import asyncio +from logger_handler import setup_logger + +API_ENDPOINT="https://exp.host/fakeUSer/api/v2/push/send" +logger = setup_logger(__name__) + + + + +async def send_notification( + routing_key: str, + message: dict, + db_manager, + max_retries: int = 5, + timeout: int = 5, +): + push_tokens = await database_lookup(routing_key, db_manager) + if not push_tokens: + logger.warning(f"No push tokens found for user {routing_key}") + return + + results = {} + for token, uuid in push_tokens: + results[token] = await _send_to_token(token, uuid, message, max_retries, timeout) + + return results + +async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int): + payload = create_payload(token, message) + + for attempt in range(1, max_retries + 1): + try: + async with aiohttp.ClientSession() as session: + async with session.post( + url=API_ENDPOINT, + json=payload, + headers={"Content-Type": "application/json"}, + timeout=timeout + ) as response: + await response.raise_for_status() + logger.info(f"Notification sent successfully to uuid {uuid}") + return {"status": "ok"} + + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}") + await asyncio.sleep(2 ** (attempt - 1)) + + except Exception as e: + logger.error(f"Unexpected failure for uuid {uuid}: {e}") + return {"status": "error", "exception": str(e)} + + logger.error(f"Failed to send notification to uuid {uuid} after {max_retries} attempts") + return {"status": "failed"} + +def create_payload(push_token: str, message: dict) -> dict: + return { + "to": push_token, + "title": message.get("title"), + "body": message.get("body"), + "data": { + "link": message.get("link"), + "category": message.get("category") + }, + "sound": "default", + "priority": "high" + } + +async def database_lookup(routing_key: str, db_manager): + try: + user_id = int(routing_key.split('.')[-1]) + except ValueError: + logger.error(f"[DB] Invalid user id supplied:{routing_key}") + return [] + + async with db_manager.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT tokend_id AS uuid,token FROM device_tokens WHERE user_id=%s", + (user_id,)) + if cur.description: + return await cur.fetchall() + return [] +