RMQ: Added retrying messages sent to a retry queue and eventual parking in the DLQ
This commit is contained in:
parent
0ad61950c8
commit
f8b66d742f
@ -1,7 +1,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import aio_pika
|
import aio_pika
|
||||||
from aio_pika.exceptions import AMQPException
|
from aio_pika.exceptions import AMQPException
|
||||||
from secret_handler import return_credentials, database_lookup, decrypt_token, remove_inactive_push_token
|
from secret_handler import return_credentials, database_lookup_by_user_id, decrypt_token, remove_inactive_push_token, database_lookup_by_uuid
|
||||||
import os
|
import os
|
||||||
from logger_handler import setup_logger
|
from logger_handler import setup_logger
|
||||||
import json
|
import json
|
||||||
@ -31,6 +31,10 @@ class RabbitMQConsumer:
|
|||||||
self.queue: aio_pika.Queue | None = None
|
self.queue: aio_pika.Queue | None = None
|
||||||
self._closing = False
|
self._closing = False
|
||||||
self.db_manager = db_manager
|
self.db_manager = db_manager
|
||||||
|
self.retry_queue_name = "notifications_retry"
|
||||||
|
self.dlq_queue_name = "notifications_dlq"
|
||||||
|
self.max_retries = 5
|
||||||
|
self.retry_ttl = 5 * 60 * 10000
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
self.connection = await aio_pika.connect_robust(self.url)
|
self.connection = await aio_pika.connect_robust(self.url)
|
||||||
@ -40,13 +44,49 @@ class RabbitMQConsumer:
|
|||||||
)
|
)
|
||||||
self.queue = await self.channel.declare_queue("notifications", durable=True)
|
self.queue = await self.channel.declare_queue("notifications", durable=True)
|
||||||
await self.queue.bind(self.exchange, routing_key="notify.user.*")
|
await self.queue.bind(self.exchange, routing_key="notify.user.*")
|
||||||
logger.info("[RMQ] Connected, queue bound to notify.user.*")
|
retry_queue_args = {
|
||||||
|
"x-message-ttl": self.retry_ttl,
|
||||||
|
"x-dead-letter-exchange": self.exchange_name,
|
||||||
|
"x-dead-letter-routing-key": "notify.user.retry",
|
||||||
|
}
|
||||||
|
self.retry_queue = await self.channel.declare_queue(
|
||||||
|
self.retry_queue_name, durable=True, arguments=retry_queue_args
|
||||||
|
)
|
||||||
|
await self.channel.declare_queue(self.dlq_queue_name, durable=True)
|
||||||
|
|
||||||
|
logger.info("[RMQ] Connected, exchange, retry, and DLQ queues ready.")
|
||||||
|
|
||||||
async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage):
|
async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage):
|
||||||
return
|
data = json.loads(message.body.decode())
|
||||||
|
data["uuid"] = uuid
|
||||||
|
await self.channel.default_exchange.publish(
|
||||||
|
aio_pika.Message(
|
||||||
|
body=json.dumps(data).encode(),
|
||||||
|
content_type="application/json",
|
||||||
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
|
||||||
|
),
|
||||||
|
routing_key=self.dlq_queue_name
|
||||||
|
)
|
||||||
|
logger.warning(f"Message sent to DLQ: {data['uuid']}")
|
||||||
|
|
||||||
async def send_message_to_retry_queue(self):
|
async def send_message_to_retry_queue(self, uuid: str, message: aio_pika.IncomingMessage):
|
||||||
return
|
data = json.loads(message.body.decode())
|
||||||
|
data["uuid"] = uuid
|
||||||
|
retry_count = data.get("retry_count", 0) + 1
|
||||||
|
data["retry_count"] = retry_count
|
||||||
|
|
||||||
|
if retry_count > self.max_retries:
|
||||||
|
await self.send_message_to_dlq(uuid, message)
|
||||||
|
return
|
||||||
|
await self.channel.default_exchange.publish(
|
||||||
|
aio_pika.Message(
|
||||||
|
body=json.dumps(data).encode(),
|
||||||
|
content_type="application/json",
|
||||||
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
|
||||||
|
),
|
||||||
|
routing_key=self.retry_queue_name
|
||||||
|
)
|
||||||
|
logger.info(f"Message requeued for retry #{retry_count}")
|
||||||
|
|
||||||
async def handle_message(self, message: aio_pika.IncomingMessage):
|
async def handle_message(self, message: aio_pika.IncomingMessage):
|
||||||
if self._closing:
|
if self._closing:
|
||||||
@ -55,8 +95,12 @@ class RabbitMQConsumer:
|
|||||||
try:
|
try:
|
||||||
data = json.loads(message.body.decode())
|
data = json.loads(message.body.decode())
|
||||||
logger.info(f"[RMQ] Received: {data}")
|
logger.info(f"[RMQ] Received: {data}")
|
||||||
logger.info(message.routing_key)
|
uuid = data.get("uuid")
|
||||||
encrypted_tokens = await database_lookup(message.routing_key, db_manager)
|
if uuid:
|
||||||
|
encrypted_tokens = await database_lookup_by_uuid(uuid, db_manager)
|
||||||
|
else:
|
||||||
|
encrypted_tokens = await database_lookup_by_user_id(message.routing_key, db_manager)
|
||||||
|
|
||||||
if not encrypted_tokens:
|
if not encrypted_tokens:
|
||||||
logger.warning(f"No push tokens found for user {message.routing_key}")
|
logger.warning(f"No push tokens found for user {message.routing_key}")
|
||||||
return
|
return
|
||||||
@ -91,13 +135,17 @@ class RabbitMQConsumer:
|
|||||||
if status == "ok" and api_status == "error":
|
if status == "ok" and api_status == "error":
|
||||||
api_error = data_list[0].get("details", {}).get("error")
|
api_error = data_list[0].get("details", {}).get("error")
|
||||||
if api_error == "DeviceNotRegistered":
|
if api_error == "DeviceNotRegistered":
|
||||||
logger.info(f"Device no longer registered for uuid {uuid}, setting as expired")
|
expired = await remove_inactive_push_token(uuid, self.db_manager)
|
||||||
await remove_inactive_push_token(uuid, self.db_manager)
|
if expired:
|
||||||
|
logger.info(f"Device no longer registered for uuid {uuid}, setting as expired")
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed expiring token for uuid: {uuid}")
|
||||||
else:
|
else:
|
||||||
await self.send_message_to_dlq(uuid, message)
|
await self.send_message_to_dlq(uuid, message)
|
||||||
|
|
||||||
if status == "error":
|
if status == "error":
|
||||||
await self.send_message_to_dlq(uuid, message)
|
await self.send_message_to_dlq(uuid, message)
|
||||||
|
|
||||||
if status == "failure":
|
if status == "failure":
|
||||||
await self.send_message_to_retry_queue(uuid, message)
|
await self.send_message_to_retry_queue(uuid, message)
|
||||||
|
|
||||||
@ -128,8 +176,9 @@ async def main():
|
|||||||
|
|
||||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||||
asyncio.get_running_loop().add_signal_handler(sig, stop_event.set)
|
asyncio.get_running_loop().add_signal_handler(sig, stop_event.set)
|
||||||
logger.info("Stopping application")
|
|
||||||
await stop_event.wait()
|
await stop_event.wait()
|
||||||
|
logger.info("Stopping application")
|
||||||
await consumer.close()
|
await consumer.close()
|
||||||
await db_manager.close()
|
await db_manager.close()
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,4 @@
|
|||||||
from cryptography.fernet import Fernet
|
from cryptography.fernet import Fernet
|
||||||
import sys
|
|
||||||
import asyncio
|
|
||||||
from logger_handler import setup_logger
|
from logger_handler import setup_logger
|
||||||
|
|
||||||
logger = setup_logger(__name__)
|
logger = setup_logger(__name__)
|
||||||
@ -10,10 +8,10 @@ try:
|
|||||||
encryption_key = file.read()
|
encryption_key = file.read()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
logger.fatal("[Secret Handler] Encryption key not found")
|
logger.fatal("[Secret Handler] Encryption key not found")
|
||||||
sys.exit(1)
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.fatal(f"[Secret Handler] Failed to read encryption key: {e}")
|
logger.fatal(f"[Secret Handler] Failed to read encryption key: {e}")
|
||||||
sys.exit(1)
|
raise
|
||||||
|
|
||||||
fernet = Fernet(encryption_key)
|
fernet = Fernet(encryption_key)
|
||||||
|
|
||||||
@ -29,12 +27,12 @@ def return_credentials(path: str)->str:
|
|||||||
return file.read().strip()
|
return file.read().strip()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
logger.fatal(f"[Secret Handler] Secret file not found: {path}")
|
logger.fatal(f"[Secret Handler] Secret file not found: {path}")
|
||||||
sys.exit(1)
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.fatal(f"[Secret Handler] Failed to read secret file {path}: {e}")
|
logger.fatal(f"[Secret Handler] Failed to read secret file {path}: {e}")
|
||||||
sys.exit(1)
|
raise
|
||||||
|
|
||||||
async def database_lookup(routing_key: str, db_manager):
|
async def database_lookup_by_user_id(routing_key: str, db_manager):
|
||||||
try:
|
try:
|
||||||
user_id = int(routing_key.split('.')[-1])
|
user_id = int(routing_key.split('.')[-1])
|
||||||
except ValueError:
|
except ValueError:
|
||||||
@ -49,8 +47,18 @@ async def database_lookup(routing_key: str, db_manager):
|
|||||||
return await cur.fetchall()
|
return await cur.fetchall()
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
async def database_lookup_by_uuid(uuid: str, db_manager):
|
||||||
|
async with db_manager.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE token_id=%s",
|
||||||
|
(uuid,))
|
||||||
|
if cur.description:
|
||||||
|
return await cur.fetchall()
|
||||||
|
return []
|
||||||
|
|
||||||
async def remove_inactive_push_token(uuid :str, db_manager):
|
async def remove_inactive_push_token(uuid :str, db_manager):
|
||||||
async with db_manager.acquire() as conn:
|
async with db_manager.acquire() as conn:
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
await cur.execute("UPDATE device_tokens SET status='expired' WHERE token_id=%s",
|
await cur.execute("UPDATE device_tokens SET status='expired' WHERE token_id=%s",
|
||||||
(uuid,))
|
(uuid,))
|
||||||
|
return cur.rowcount > 0
|
||||||
|
|||||||
@ -44,7 +44,7 @@ async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int,
|
|||||||
return {"status": "error", "exception": str(e)}
|
return {"status": "error", "exception": str(e)}
|
||||||
|
|
||||||
logger.error(f"Failed to send notification to uuid {uuid} after {max_retries} attempts")
|
logger.error(f"Failed to send notification to uuid {uuid} after {max_retries} attempts")
|
||||||
return {"status": "failed"}
|
return {"status": "failure"}
|
||||||
|
|
||||||
def create_payload(push_token: str, message: dict) -> dict:
|
def create_payload(push_token: str, message: dict) -> dict:
|
||||||
return {
|
return {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user