Added logic to handle the Expo API response
This commit is contained in:
parent
b0e88a1dbc
commit
0ad61950c8
@ -49,7 +49,6 @@ class DBManager:
|
|||||||
async with self.acquire() as conn:
|
async with self.acquire() as conn:
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
await cur.execute("SELECT 1")
|
await cur.execute("SELECT 1")
|
||||||
logger.debug("[DB] Healthcheck OK")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"[DB] Healthcheck failed: {e}")
|
logger.warning(f"[DB] Healthcheck failed: {e}")
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import aio_pika
|
import aio_pika
|
||||||
from aio_pika.exceptions import AMQPException
|
from aio_pika.exceptions import AMQPException
|
||||||
from secret_handler import return_credentials, database_lookup, decrypt_token
|
from secret_handler import return_credentials, database_lookup, decrypt_token, remove_inactive_push_token
|
||||||
import os
|
import os
|
||||||
from logger_handler import setup_logger
|
from logger_handler import setup_logger
|
||||||
import json
|
import json
|
||||||
@ -40,7 +40,13 @@ class RabbitMQConsumer:
|
|||||||
)
|
)
|
||||||
self.queue = await self.channel.declare_queue("notifications", durable=True)
|
self.queue = await self.channel.declare_queue("notifications", durable=True)
|
||||||
await self.queue.bind(self.exchange, routing_key="notify.user.*")
|
await self.queue.bind(self.exchange, routing_key="notify.user.*")
|
||||||
logger.info("[Consumer] Connected, queue bound to notify.user.*")
|
logger.info("[RMQ] Connected, queue bound to notify.user.*")
|
||||||
|
|
||||||
|
async def send_message_to_dlq(self, uuid: str, message: aio_pika.IncomingMessage):
|
||||||
|
return
|
||||||
|
|
||||||
|
async def send_message_to_retry_queue(self):
|
||||||
|
return
|
||||||
|
|
||||||
async def handle_message(self, message: aio_pika.IncomingMessage):
|
async def handle_message(self, message: aio_pika.IncomingMessage):
|
||||||
if self._closing:
|
if self._closing:
|
||||||
@ -48,7 +54,7 @@ class RabbitMQConsumer:
|
|||||||
async with message.process():
|
async with message.process():
|
||||||
try:
|
try:
|
||||||
data = json.loads(message.body.decode())
|
data = json.loads(message.body.decode())
|
||||||
logger.info(f"[Consumer] Received: {data}")
|
logger.info(f"[RMQ] Received: {data}")
|
||||||
logger.info(message.routing_key)
|
logger.info(message.routing_key)
|
||||||
encrypted_tokens = await database_lookup(message.routing_key, db_manager)
|
encrypted_tokens = await database_lookup(message.routing_key, db_manager)
|
||||||
if not encrypted_tokens:
|
if not encrypted_tokens:
|
||||||
@ -59,25 +65,50 @@ class RabbitMQConsumer:
|
|||||||
for uuid, token in token_map.items():
|
for uuid, token in token_map.items():
|
||||||
decrypted_token = decrypt_token(token)
|
decrypted_token = decrypt_token(token)
|
||||||
token_map[uuid] = decrypted_token
|
token_map[uuid] = decrypted_token
|
||||||
await send_notification(message=data,push_tokens=token_map)
|
response = await send_notification(message=data,push_tokens=token_map)
|
||||||
|
await self.validate_delivery(response,message)
|
||||||
|
logger.debug(response)
|
||||||
|
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error(f"[Consumer] Bad message, discarding: {e}")
|
logger.error(f"[RMQ] Bad message, discarding: {e}")
|
||||||
await message.nack(requeue=False)
|
await message.nack(requeue=False)
|
||||||
except AMQPException as e:
|
except AMQPException as e:
|
||||||
logger.error(f"[Consumer] RabbitMQ error: {e}")
|
logger.error(f"[RMQ] RabbitMQ error: {e}")
|
||||||
await message.nack(requeue=True)
|
await message.nack(requeue=True)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"[Consumer] Fatal error: {e}")
|
logger.critical(f"[RMQ] Fatal error: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
async def validate_delivery(self,response,message: aio_pika.IncomingMessage):
|
||||||
|
for uuid, result in response.items():
|
||||||
|
status = result.get("status")
|
||||||
|
data_list = result.get("data", {}).get("data", [])
|
||||||
|
api_status = data_list[0].get("status") if data_list else None
|
||||||
|
|
||||||
|
if status == "ok" and api_status == "ok":
|
||||||
|
logger.info(f"Notification delivered successfully to {uuid}")
|
||||||
|
|
||||||
|
if status == "ok" and api_status == "error":
|
||||||
|
api_error = data_list[0].get("details", {}).get("error")
|
||||||
|
if api_error == "DeviceNotRegistered":
|
||||||
|
logger.info(f"Device no longer registered for uuid {uuid}, setting as expired")
|
||||||
|
await remove_inactive_push_token(uuid, self.db_manager)
|
||||||
|
else:
|
||||||
|
await self.send_message_to_dlq(uuid, message)
|
||||||
|
|
||||||
|
if status == "error":
|
||||||
|
await self.send_message_to_dlq(uuid, message)
|
||||||
|
if status == "failure":
|
||||||
|
await self.send_message_to_retry_queue(uuid, message)
|
||||||
|
|
||||||
|
|
||||||
async def consume(self):
|
async def consume(self):
|
||||||
if self._closing:
|
if self._closing:
|
||||||
return
|
return
|
||||||
if not self.queue:
|
if not self.queue:
|
||||||
raise RuntimeError("Queue not initialized")
|
raise RuntimeError("Queue not initialized")
|
||||||
await self.queue.consume(self.handle_message, no_ack=False)
|
await self.queue.consume(self.handle_message, no_ack=False)
|
||||||
logger.info("[Consumer] Consuming messages...")
|
logger.info("[RMQ] Consuming messages...")
|
||||||
|
|
||||||
async def close(self):
|
async def close(self):
|
||||||
self._closing = True
|
self._closing = True
|
||||||
@ -88,14 +119,16 @@ class RabbitMQConsumer:
|
|||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
logger.info("Starting application")
|
||||||
await db_manager.connect()
|
await db_manager.connect()
|
||||||
consumer = RabbitMQConsumer(db_manager=db_manager)
|
consumer = RabbitMQConsumer(db_manager=db_manager)
|
||||||
await consumer.connect()
|
await consumer.connect()
|
||||||
await consumer.consume()
|
await consumer.consume()
|
||||||
stop_event = asyncio.Event()
|
stop_event = asyncio.Event()
|
||||||
|
|
||||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||||
asyncio.get_running_loop().add_signal_handler(sig, stop_event.set)
|
asyncio.get_running_loop().add_signal_handler(sig, stop_event.set)
|
||||||
|
logger.info("Stopping application")
|
||||||
await stop_event.wait()
|
await stop_event.wait()
|
||||||
await consumer.close()
|
await consumer.close()
|
||||||
await db_manager.close()
|
await db_manager.close()
|
||||||
|
|||||||
@ -48,3 +48,9 @@ async def database_lookup(routing_key: str, db_manager):
|
|||||||
if cur.description:
|
if cur.description:
|
||||||
return await cur.fetchall()
|
return await cur.fetchall()
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
async def remove_inactive_push_token(uuid :str, db_manager):
|
||||||
|
async with db_manager.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute("UPDATE device_tokens SET status='expired' WHERE token_id=%s",
|
||||||
|
(uuid,))
|
||||||
|
|||||||
@ -16,7 +16,6 @@ async def send_notification(
|
|||||||
results = {}
|
results = {}
|
||||||
for uuid, token in push_tokens.items():
|
for uuid, token in push_tokens.items():
|
||||||
results[uuid] = await _send_to_token(token, uuid, message, max_retries, timeout)
|
results[uuid] = await _send_to_token(token, uuid, message, max_retries, timeout)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int):
|
async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int, timeout: int):
|
||||||
@ -32,8 +31,9 @@ async def _send_to_token(token: str, uuid:str , message: dict, max_retries: int,
|
|||||||
timeout=timeout
|
timeout=timeout
|
||||||
) as response:
|
) as response:
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
data = await response.json()
|
||||||
logger.info(f"Notification sent successfully to uuid {uuid}")
|
logger.info(f"Notification sent successfully to uuid {uuid}")
|
||||||
return {"status": "ok"}
|
return {"status":"ok","data":data}
|
||||||
|
|
||||||
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
||||||
logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}")
|
logger.warning(f"Attempt {attempt}/{max_retries} failed for uuid {uuid}: {type(e).__name__}")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user