First commit
This commit is contained in:
parent
82275843b4
commit
d193bc05f1
11
Dockerfile
Normal file
11
Dockerfile
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
FROM python:3.12-slim
|
||||||
|
|
||||||
|
COPY requirements.txt .
|
||||||
|
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY src/ /app/
|
||||||
|
|
||||||
|
ENTRYPOINT ["python","main.py"]
|
||||||
21
requirements.txt
Normal file
21
requirements.txt
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
aio-pika==9.5.7
|
||||||
|
aiodns==3.5.0
|
||||||
|
aiohappyeyeballs==2.6.1
|
||||||
|
aiohttp==3.13.0
|
||||||
|
aiomysql==0.2.0
|
||||||
|
aiormq==6.9.0
|
||||||
|
aiosignal==1.4.0
|
||||||
|
attrs==25.4.0
|
||||||
|
Brotli==1.1.0
|
||||||
|
cffi==2.0.0
|
||||||
|
frozenlist==1.8.0
|
||||||
|
idna==3.11
|
||||||
|
multidict==6.7.0
|
||||||
|
pamqp==3.3.0
|
||||||
|
propcache==0.4.1
|
||||||
|
pycares==4.11.0
|
||||||
|
pycparser==2.23
|
||||||
|
PyMySQL==1.1.2
|
||||||
|
typing_extensions==4.15.0
|
||||||
|
yarl==1.22.0
|
||||||
|
zstandard==0.25.0
|
||||||
93
src/db.py
Normal file
93
src/db.py
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
import aiomysql
|
||||||
|
import asyncio
|
||||||
|
from secret_handler import return_credentials
|
||||||
|
import os
|
||||||
|
from logger_handler import setup_logger
|
||||||
|
|
||||||
|
|
||||||
|
db_username = return_credentials("/etc/secrets/db_username")
|
||||||
|
db_password = return_credentials("/etc/secrets/db_password")
|
||||||
|
db_host = os.getenv("BACKEND_PN_DB_HOST","localhost")
|
||||||
|
db_database = os.getenv("BACKEND_PN_DB_DATABASE","app")
|
||||||
|
|
||||||
|
logger = setup_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DBManager:
|
||||||
|
def __init__(self, host, user, password, db, port=3306, pool_size=5, health_interval=60):
|
||||||
|
self.host = host
|
||||||
|
self.user = user
|
||||||
|
self.password = password
|
||||||
|
self.db = db
|
||||||
|
self.port = port
|
||||||
|
self.pool_size = pool_size
|
||||||
|
self._pool: aiomysql.Pool | None = None
|
||||||
|
self._health_interval = health_interval
|
||||||
|
self._health_task: asyncio.Task | None = None
|
||||||
|
self._closing = False
|
||||||
|
|
||||||
|
async def connect(self):
|
||||||
|
self._pool = await aiomysql.create_pool(
|
||||||
|
host=self.host,
|
||||||
|
user=self.user,
|
||||||
|
password=self.password,
|
||||||
|
db=self.db,
|
||||||
|
port=self.port,
|
||||||
|
minsize=1,
|
||||||
|
maxsize=self.pool_size,
|
||||||
|
autocommit=True,
|
||||||
|
cursorclass=aiomysql.DictCursor
|
||||||
|
)
|
||||||
|
logger.info("[DB] Connection pool created")
|
||||||
|
self._health_task = asyncio.create_task(self._healthcheck_loop())
|
||||||
|
|
||||||
|
async def _healthcheck_loop(self):
|
||||||
|
while not self._closing:
|
||||||
|
await asyncio.sleep(self._health_interval)
|
||||||
|
try:
|
||||||
|
async with self.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute("SELECT 1")
|
||||||
|
logger.debug("[DB] Healthcheck OK")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[DB] Healthcheck failed: {e}")
|
||||||
|
|
||||||
|
async def acquire(self):
|
||||||
|
if not self._pool:
|
||||||
|
raise RuntimeError("DB pool not initialized")
|
||||||
|
return await self._pool.acquire()
|
||||||
|
|
||||||
|
async def release(self, conn):
|
||||||
|
if self._pool:
|
||||||
|
self._pool.release(conn)
|
||||||
|
|
||||||
|
async def execute(self, query, *args, retries=3):
|
||||||
|
for attempt in range(1, retries + 1):
|
||||||
|
conn = await self.acquire()
|
||||||
|
try:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute(query, args)
|
||||||
|
if cur.description:
|
||||||
|
return await cur.fetchall()
|
||||||
|
return None
|
||||||
|
except aiomysql.OperationalError as e:
|
||||||
|
logger.warning(f"[DB] Query failed (attempt {attempt}/{retries}): {e}")
|
||||||
|
await asyncio.sleep(2 ** (attempt - 1))
|
||||||
|
finally:
|
||||||
|
await self.release(conn)
|
||||||
|
raise RuntimeError("DB query failed after retries")
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
self._closing = True
|
||||||
|
if self._health_task and not self._health_task.done():
|
||||||
|
self._health_task.cancel()
|
||||||
|
try:
|
||||||
|
await self._health_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
if self._pool:
|
||||||
|
self._pool.close()
|
||||||
|
await self._pool.wait_closed()
|
||||||
|
logger.info("[DB] Connection pool closed")
|
||||||
|
|
||||||
|
db_manager = DBManager(host=db_host, port=3306, user=db_username, password=db_password, db=db_database)
|
||||||
13
src/logger_handler.py
Normal file
13
src/logger_handler.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
def setup_logger(name: str) -> logging.Logger:
|
||||||
|
logger = logging.getLogger(name)
|
||||||
|
if not logger.handlers:
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
formatter = logging.Formatter(
|
||||||
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
logger.addHandler(handler)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
return logger
|
||||||
87
src/rabbitmq_handler.py
Normal file
87
src/rabbitmq_handler.py
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
import asyncio
|
||||||
|
import aio_pika
|
||||||
|
from aio_pika.exceptions import AMQPException
|
||||||
|
from secret_handler import return_credentials
|
||||||
|
import os
|
||||||
|
from logger_handler import setup_logger
|
||||||
|
import json
|
||||||
|
from db import db_manager
|
||||||
|
from send_notification import send_notification
|
||||||
|
|
||||||
|
logger = setup_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
rmq_username = return_credentials("/etc/secrets/rmq_username")
|
||||||
|
rmq_password = return_credentials("/etc/secrets/rmq_password")
|
||||||
|
rmq_host = os.getenv("BACKEND_API_INTERNAL_RMQ_HOST", "localhost")
|
||||||
|
rmq_vhost = os.getenv("BACKEND_API_INTERNAL_RMQ_VHOST", "app_notifications")
|
||||||
|
rmq_exchange = os.getenv("BACKEND_API_INTERNAL_RMQ_EXCHANGE", "app_notifications")
|
||||||
|
|
||||||
|
RABBITMQ_URL = f"amqp://{rmq_username}:{rmq_password}@{rmq_host}/{rmq_vhost}"
|
||||||
|
|
||||||
|
|
||||||
|
class RabbitMQConsumer:
|
||||||
|
def __init__(self, url=RABBITMQ_URL, db_manager=db_manager, exchange_name=rmq_exchange):
|
||||||
|
self.url = url
|
||||||
|
self.exchange_name = exchange_name
|
||||||
|
self.connection: aio_pika.RobustConnection | None = None
|
||||||
|
self.channel: aio_pika.RobustChannel | None = None
|
||||||
|
self.exchange: aio_pika.Exchange | None = None
|
||||||
|
self.queue: aio_pika.Queue | None = None
|
||||||
|
self._closing = False
|
||||||
|
self.db_manager = db_manager
|
||||||
|
|
||||||
|
async def connect(self):
|
||||||
|
self.connection = await aio_pika.connect_robust(self.url)
|
||||||
|
self.channel = await self.connection.channel()
|
||||||
|
self.exchange = await self.channel.declare_exchange(
|
||||||
|
self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
|
||||||
|
)
|
||||||
|
self.queue = await self.channel.declare_queue("backend_push_notifications", durable=True)
|
||||||
|
await self.queue.bind(self.exchange, routing_key="notify.user.*")
|
||||||
|
logger.info("[Consumer] Connected, queue bound to notify.user.*")
|
||||||
|
|
||||||
|
async def handle_message(self, message: aio_pika.IncomingMessage):
|
||||||
|
if self._closing:
|
||||||
|
return
|
||||||
|
async with message.process():
|
||||||
|
try:
|
||||||
|
data = json.loads(message.body.decode())
|
||||||
|
logger.info(f"[Consumer] Received: {data}")
|
||||||
|
await send_notification(routing_key=message.routing_key,message=data,db_manager=self.db_manager)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.error(f"[Consumer] Bad message, discarding: {e}")
|
||||||
|
await message.nack(requeue=False)
|
||||||
|
except AMQPException as e:
|
||||||
|
logger.error(f"[Consumer] RabbitMQ error: {e}")
|
||||||
|
await message.nack(requeue=True)
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"[Consumer] Fatal error: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
async def consume(self):
|
||||||
|
if self._closing:
|
||||||
|
return
|
||||||
|
if not self.queue:
|
||||||
|
raise RuntimeError("Queue not initialized")
|
||||||
|
await self.queue.consume(self.handle_message, no_ack=False)
|
||||||
|
logger.info("[Consumer] Consuming messages...")
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
self._closing = True
|
||||||
|
if self.connection:
|
||||||
|
await self.connection.close()
|
||||||
|
logger.info("[aio-pika] Connection closed.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
consumer = RabbitMQConsumer(db_manager=db_manager)
|
||||||
|
await consumer.connect()
|
||||||
|
logger.info("Creating MySQL connection pool...")
|
||||||
|
await db_manager.connect()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
|
|
||||||
12
src/secret_handler.py
Normal file
12
src/secret_handler.py
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
import sys
|
||||||
|
|
||||||
|
def return_credentials(path: str)->str:
|
||||||
|
try:
|
||||||
|
with open (path) as file:
|
||||||
|
return file.read().strip()
|
||||||
|
except FileNotFoundError:
|
||||||
|
print(f"[FATAL] Secret file not found: {path}")
|
||||||
|
sys.exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[FATAL] Failed to read secret file {path}: {e}")
|
||||||
|
sys.exit(1)
|
||||||
83
src/send_notification.py
Normal file
83
src/send_notification.py
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
import aiohttp
|
||||||
|
import asyncio
|
||||||
|
from logger_handler import setup_logger
|
||||||
|
|
||||||
|
API_ENDPOINT="https://exp.host/fakeUSer/api/v2/push/send"
|
||||||
|
logger = setup_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def send_notification(
|
||||||
|
routing_key: str,
|
||||||
|
message: dict,
|
||||||
|
db_manager,
|
||||||
|
max_retries: int = 5,
|
||||||
|
timeout: int = 5,
|
||||||
|
):
|
||||||
|
push_tokens = await database_lookup(routing_key, db_manager)
|
||||||
|
if not push_tokens:
|
||||||
|
logger.warning(f"No push tokens found for user {routing_key}")
|
||||||
|
return
|
||||||
|
|
||||||
|
results = {}
|
||||||
|
for token, uuid in push_tokens:
|
||||||
|
results[token] = await _send_to_token(token, uuid, message, max_retries, timeout)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int):
|
||||||
|
payload = create_payload(token, message)
|
||||||
|
|
||||||
|
for attempt in range(1, max_retries + 1):
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
async with session.post(
|
||||||
|
url=API_ENDPOINT,
|
||||||
|
json=payload,
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
timeout=timeout
|
||||||
|
) as response:
|
||||||
|
await response.raise_for_status()
|
||||||
|
logger.info(f"Notification sent successfully to uuid {uuid}")
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
||||||
|
logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}")
|
||||||
|
await asyncio.sleep(2 ** (attempt - 1))
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected failure for uuid {uuid}: {e}")
|
||||||
|
return {"status": "error", "exception": str(e)}
|
||||||
|
|
||||||
|
logger.error(f"Failed to send notification to uuid {uuid} after {max_retries} attempts")
|
||||||
|
return {"status": "failed"}
|
||||||
|
|
||||||
|
def create_payload(push_token: str, message: dict) -> dict:
|
||||||
|
return {
|
||||||
|
"to": push_token,
|
||||||
|
"title": message.get("title"),
|
||||||
|
"body": message.get("body"),
|
||||||
|
"data": {
|
||||||
|
"link": message.get("link"),
|
||||||
|
"category": message.get("category")
|
||||||
|
},
|
||||||
|
"sound": "default",
|
||||||
|
"priority": "high"
|
||||||
|
}
|
||||||
|
|
||||||
|
async def database_lookup(routing_key: str, db_manager):
|
||||||
|
try:
|
||||||
|
user_id = int(routing_key.split('.')[-1])
|
||||||
|
except ValueError:
|
||||||
|
logger.error(f"[DB] Invalid user id supplied:{routing_key}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
async with db_manager.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute("SELECT tokend_id AS uuid,token FROM device_tokens WHERE user_id=%s",
|
||||||
|
(user_id,))
|
||||||
|
if cur.description:
|
||||||
|
return await cur.fetchall()
|
||||||
|
return []
|
||||||
|
|
||||||
Loading…
x
Reference in New Issue
Block a user