40 lines
1.1 KiB
Python
40 lines
1.1 KiB
Python
from prometheus_client import Counter, Gauge, start_http_server
|
|
from logger_handler import setup_logger
|
|
import asyncio
|
|
|
|
logger = setup_logger(__name__)
|
|
|
|
MSG_PUBLISHED = Counter(
|
|
"rmq_messages_published_total",
|
|
"Total number of messages successfully published to RabbitMQ"
|
|
)
|
|
|
|
MSG_FAILED = Counter(
|
|
"rmq_messages_failed_total",
|
|
"Total number of messages sent to the dead-letter queue"
|
|
)
|
|
|
|
MSG_RETRY = Counter(
|
|
"rmq_messages_retry_total",
|
|
"Total number of messages retried via retry queue",
|
|
["queue_name", "uuid", "retry_count"]
|
|
)
|
|
|
|
QUEUE_MESSAGES = Gauge(
|
|
"rmq_queue_messages",
|
|
"Current number of messages pending in the queue",
|
|
["queue_name"]
|
|
)
|
|
|
|
def start_metrics_server(port: int = 9000):
|
|
start_http_server(port)
|
|
logger.info(f"Prometheus metrics exposed at http://0.0.0.0:{port}/metrics")
|
|
|
|
async def update_queue_gauge(channel, queue_name: str):
|
|
"""Periodically update queue depth"""
|
|
queue = await channel.declare_queue(queue_name, passive=True)
|
|
while True:
|
|
info = await queue.declare()
|
|
QUEUE_MESSAGES.labels(queue_name=queue_name).set(info.message_count)
|
|
await asyncio.sleep(10)
|