RMQ: Added Prometheus metrics and cleaned up code for deployment
This commit is contained in:
parent
f8b66d742f
commit
37476c814e
@ -6,16 +6,16 @@ aiomysql==0.2.0
|
|||||||
aiormq==6.9.0
|
aiormq==6.9.0
|
||||||
aiosignal==1.4.0
|
aiosignal==1.4.0
|
||||||
attrs==25.4.0
|
attrs==25.4.0
|
||||||
Brotli==1.1.0
|
|
||||||
cffi==2.0.0
|
cffi==2.0.0
|
||||||
|
cryptography==46.0.2
|
||||||
frozenlist==1.8.0
|
frozenlist==1.8.0
|
||||||
idna==3.11
|
idna==3.11
|
||||||
multidict==6.7.0
|
multidict==6.7.0
|
||||||
pamqp==3.3.0
|
pamqp==3.3.0
|
||||||
|
prometheus_client==0.23.1
|
||||||
propcache==0.4.1
|
propcache==0.4.1
|
||||||
pycares==4.11.0
|
pycares==4.11.0
|
||||||
pycparser==2.23
|
pycparser==2.23
|
||||||
PyMySQL==1.1.2
|
PyMySQL==1.1.2
|
||||||
typing_extensions==4.15.0
|
typing_extensions==4.15.0
|
||||||
yarl==1.22.0
|
yarl==1.22.0
|
||||||
zstandard==0.25.0
|
|
||||||
|
|||||||
@ -77,6 +77,6 @@ class DBManager:
|
|||||||
await self._pool.wait_closed()
|
await self._pool.wait_closed()
|
||||||
logger.info("[DB] Connection pool closed")
|
logger.info("[DB] Connection pool closed")
|
||||||
|
|
||||||
#db_manager = DBManager(host=db_host, port=3306, user=db_username, password=db_password, db=db_database)
|
db_manager = DBManager(host=db_host, port=3306, user=db_username, password=db_password, db=db_database)
|
||||||
db_manager = DBManager(host=db_host, port=30006, user=db_username, password=db_password, db=db_database)
|
|
||||||
|
|
||||||
|
|||||||
39
src/metrics.py
Normal file
39
src/metrics.py
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
from prometheus_client import Counter, Gauge, start_http_server
|
||||||
|
from logger_handler import setup_logger
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
logger = setup_logger(__name__)
|
||||||
|
|
||||||
|
MSG_PUBLISHED = Counter(
|
||||||
|
"rmq_messages_published_total",
|
||||||
|
"Total number of messages successfully published to RabbitMQ"
|
||||||
|
)
|
||||||
|
|
||||||
|
MSG_FAILED = Counter(
|
||||||
|
"rmq_messages_failed_total",
|
||||||
|
"Total number of messages sent to the dead-letter queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
MSG_RETRY = Counter(
|
||||||
|
"rmq_messages_retry_total",
|
||||||
|
"Total number of messages retried via retry queue",
|
||||||
|
["queue_name", "uuid", "retry_count"]
|
||||||
|
)
|
||||||
|
|
||||||
|
QUEUE_MESSAGES = Gauge(
|
||||||
|
"rmq_queue_messages",
|
||||||
|
"Current number of messages pending in the queue",
|
||||||
|
["queue_name"]
|
||||||
|
)
|
||||||
|
|
||||||
|
def start_metrics_server(port: int = 9000):
|
||||||
|
start_http_server(port)
|
||||||
|
logger.info(f"Prometheus metrics exposed at http://0.0.0.0:{port}/metrics")
|
||||||
|
|
||||||
|
async def update_queue_gauge(channel, queue_name: str):
|
||||||
|
"""Periodically update queue depth"""
|
||||||
|
queue = await channel.declare_queue(queue_name, passive=True)
|
||||||
|
while True:
|
||||||
|
info = await queue.declare()
|
||||||
|
QUEUE_MESSAGES.labels(queue_name=queue_name).set(info.message_count)
|
||||||
|
await asyncio.sleep(10)
|
||||||
@ -8,6 +8,7 @@ import json
|
|||||||
from db import db_manager
|
from db import db_manager
|
||||||
from send_notification import send_notification
|
from send_notification import send_notification
|
||||||
import signal
|
import signal
|
||||||
|
import metrics
|
||||||
|
|
||||||
logger = setup_logger(__name__)
|
logger = setup_logger(__name__)
|
||||||
|
|
||||||
@ -31,6 +32,7 @@ class RabbitMQConsumer:
|
|||||||
self.queue: aio_pika.Queue | None = None
|
self.queue: aio_pika.Queue | None = None
|
||||||
self._closing = False
|
self._closing = False
|
||||||
self.db_manager = db_manager
|
self.db_manager = db_manager
|
||||||
|
self.queue_name = "notifications"
|
||||||
self.retry_queue_name = "notifications_retry"
|
self.retry_queue_name = "notifications_retry"
|
||||||
self.dlq_queue_name = "notifications_dlq"
|
self.dlq_queue_name = "notifications_dlq"
|
||||||
self.max_retries = 5
|
self.max_retries = 5
|
||||||
@ -42,7 +44,7 @@ class RabbitMQConsumer:
|
|||||||
self.exchange = await self.channel.declare_exchange(
|
self.exchange = await self.channel.declare_exchange(
|
||||||
self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
|
self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
|
||||||
)
|
)
|
||||||
self.queue = await self.channel.declare_queue("notifications", durable=True)
|
self.queue = await self.channel.declare_queue(self.queue_name, durable=True)
|
||||||
await self.queue.bind(self.exchange, routing_key="notify.user.*")
|
await self.queue.bind(self.exchange, routing_key="notify.user.*")
|
||||||
retry_queue_args = {
|
retry_queue_args = {
|
||||||
"x-message-ttl": self.retry_ttl,
|
"x-message-ttl": self.retry_ttl,
|
||||||
@ -54,6 +56,11 @@ class RabbitMQConsumer:
|
|||||||
)
|
)
|
||||||
await self.channel.declare_queue(self.dlq_queue_name, durable=True)
|
await self.channel.declare_queue(self.dlq_queue_name, durable=True)
|
||||||
|
|
||||||
|
metrics.start_metrics_server(port=9000)
|
||||||
|
asyncio.create_task(metrics.update_queue_gauge(self.channel, self.queue_name))
|
||||||
|
asyncio.create_task(metrics.update_queue_gauge(self.channel, self.retry_queue_name))
|
||||||
|
asyncio.create_task(metrics.update_queue_gauge(self.channel, self.dlq_queue_name))
|
||||||
|
|
||||||
logger.info("[RMQ] Connected, exchange, retry, and DLQ queues ready.")
|
logger.info("[RMQ] Connected, exchange, retry, and DLQ queues ready.")
|
||||||
|
|
||||||
async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage):
|
async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage):
|
||||||
@ -68,6 +75,7 @@ class RabbitMQConsumer:
|
|||||||
routing_key=self.dlq_queue_name
|
routing_key=self.dlq_queue_name
|
||||||
)
|
)
|
||||||
logger.warning(f"Message sent to DLQ: {data['uuid']}")
|
logger.warning(f"Message sent to DLQ: {data['uuid']}")
|
||||||
|
metrics.MSG_FAILED.inc()
|
||||||
|
|
||||||
async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage):
|
async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage):
|
||||||
data = json.loads(message.body.decode())
|
data = json.loads(message.body.decode())
|
||||||
@ -75,6 +83,12 @@ class RabbitMQConsumer:
|
|||||||
retry_count = data.get("retry_count", 0) + 1
|
retry_count = data.get("retry_count", 0) + 1
|
||||||
data["retry_count"] = retry_count
|
data["retry_count"] = retry_count
|
||||||
|
|
||||||
|
metrics.MSG_RETRY.labels(
|
||||||
|
queue_name=self.retry_queue_name,
|
||||||
|
uuid=uuid,
|
||||||
|
retry_count=retry_count
|
||||||
|
).inc()
|
||||||
|
|
||||||
if retry_count > self.max_retries:
|
if retry_count > self.max_retries:
|
||||||
await self.send_message_to_dlq(uuid, message)
|
await self.send_message_to_dlq(uuid, message)
|
||||||
return
|
return
|
||||||
@ -111,8 +125,7 @@ class RabbitMQConsumer:
|
|||||||
token_map[uuid] = decrypted_token
|
token_map[uuid] = decrypted_token
|
||||||
response = await send_notification(message=data,push_tokens=token_map)
|
response = await send_notification(message=data,push_tokens=token_map)
|
||||||
await self.validate_delivery(response,message)
|
await self.validate_delivery(response,message)
|
||||||
logger.debug(response)
|
|
||||||
|
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error(f"[RMQ] Bad message, discarding: {e}")
|
logger.error(f"[RMQ] Bad message, discarding: {e}")
|
||||||
await message.nack(requeue=False)
|
await message.nack(requeue=False)
|
||||||
@ -131,6 +144,7 @@ class RabbitMQConsumer:
|
|||||||
|
|
||||||
if status == "ok" and api_status == "ok":
|
if status == "ok" and api_status == "ok":
|
||||||
logger.info(f"Notification delivered successfully to {uuid}")
|
logger.info(f"Notification delivered successfully to {uuid}")
|
||||||
|
metrics.MSG_PUBLISHED.inc()
|
||||||
|
|
||||||
if status == "ok" and api_status == "error":
|
if status == "ok" and api_status == "error":
|
||||||
api_error = data_list[0].get("details", {}).get("error")
|
api_error = data_list[0].get("details", {}).get("error")
|
||||||
|
|||||||
@ -1,11 +1,20 @@
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
|
import aiodns
|
||||||
import asyncio
|
import asyncio
|
||||||
from logger_handler import setup_logger
|
from logger_handler import setup_logger
|
||||||
|
|
||||||
#API_ENDPOINT="https://exp.host/fakeUSer/api/v2/push/send"
|
API_ENDPOINT="https://exp.host/--/api/v2/push/send"
|
||||||
API_ENDPOINT="http://127.0.0.1:8000/honk"
|
|
||||||
logger = setup_logger(__name__)
|
logger = setup_logger(__name__)
|
||||||
|
|
||||||
|
retryable_exceptions = (
|
||||||
|
aiohttp.ClientConnectionError,
|
||||||
|
aiohttp.ServerDisconnectedError,
|
||||||
|
aiohttp.ClientOSError,
|
||||||
|
aiohttp.ServerTimeoutError,
|
||||||
|
asyncio.TimeoutError,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def send_notification(
|
async def send_notification(
|
||||||
message: dict,
|
message: dict,
|
||||||
push_tokens,
|
push_tokens,
|
||||||
@ -20,10 +29,12 @@ async def send_notification(
|
|||||||
|
|
||||||
async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int):
|
async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int):
|
||||||
payload = create_payload(token, message)
|
payload = create_payload(token, message)
|
||||||
|
resolver = aiohttp.AsyncResolver()
|
||||||
|
connector = aiohttp.TCPConnector(resolver=resolver)
|
||||||
|
|
||||||
for attempt in range(1, max_retries + 1):
|
for attempt in range(1, max_retries + 1):
|
||||||
try:
|
try:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession(connector=connector) as session:
|
||||||
async with session.post(
|
async with session.post(
|
||||||
url=API_ENDPOINT,
|
url=API_ENDPOINT,
|
||||||
json=payload,
|
json=payload,
|
||||||
@ -35,7 +46,7 @@ async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int,
|
|||||||
logger.info(f"Notification sent successfully to uuid {uuid}")
|
logger.info(f"Notification sent successfully to uuid {uuid}")
|
||||||
return {"status":"ok","data":data}
|
return {"status":"ok","data":data}
|
||||||
|
|
||||||
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
except retryable_exceptions as e:
|
||||||
logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}")
|
logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}")
|
||||||
await asyncio.sleep(2 ** (attempt - 1))
|
await asyncio.sleep(2 ** (attempt - 1))
|
||||||
|
|
||||||
@ -53,7 +64,8 @@ def create_payload(push_token: str, message: dict) -> dict:
|
|||||||
"body": message.get("body"),
|
"body": message.get("body"),
|
||||||
"data": {
|
"data": {
|
||||||
"link": message.get("link"),
|
"link": message.get("link"),
|
||||||
"category": message.get("category")
|
"category": message.get("category"),
|
||||||
|
"timestamp": message.get("timestamp")
|
||||||
},
|
},
|
||||||
"sound": "default",
|
"sound": "default",
|
||||||
"priority": "high"
|
"priority": "high"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user