fix/dlq #4
@ -26,7 +26,40 @@ jobs:
|
|||||||
- name: Log in to GHCR
|
- name: Log in to GHCR
|
||||||
run: echo "${{ secrets.GHCR_TOKEN }}" | docker login ghcr.io -u ${{ secrets.GHCR_USERNAME }} --password-stdin
|
run: echo "${{ secrets.GHCR_TOKEN }}" | docker login ghcr.io -u ${{ secrets.GHCR_USERNAME }} --password-stdin
|
||||||
|
|
||||||
|
- name: Get Latest Tag
|
||||||
|
id: get_tag
|
||||||
|
run: |
|
||||||
|
BASE64_TOKEN=$(echo "${{ secrets.GHCR_TOKEN }}" | base64)
|
||||||
|
LATEST_TAG=$(curl -s -H "Authorization: Bearer $BASE64_TOKEN" \
|
||||||
|
https://ghcr.io/v2/${{ env.GHCR_ORG }}/${{ env.IMAGE_NAME }}/tags/list \
|
||||||
|
| jq -r '.tags | map(select(test("^[0-9]+$"))) | map(tonumber) | max // 0')
|
||||||
|
NEXT_TAG=$((LATEST_TAG + 1))
|
||||||
|
echo "latest=$LATEST_TAG" >> $GITHUB_OUTPUT
|
||||||
|
echo "next=$NEXT_TAG" >> $GITHUB_OUTPUT
|
||||||
|
|
||||||
- name: Build and Push Docker Image
|
- name: Build and Push Docker Image
|
||||||
run: |
|
run: |
|
||||||
docker build -t ghcr.io/gansejunge/${{ env.IMAGE_NAME }}:1 .
|
docker build -t ghcr.io/gansejunge/${{ env.IMAGE_NAME }}:${{ steps.get_tag.outputs.next }} .
|
||||||
docker push ghcr.io/gansejunge/${{ env.IMAGE_NAME }}:1
|
docker push ghcr.io/gansejunge/${{ env.IMAGE_NAME }}:${{ steps.get_tag.outputs.next }}
|
||||||
|
|
||||||
|
- name: Clone ops-deployment repo
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
with:
|
||||||
|
repository: notifier/ops-deployment
|
||||||
|
path: ./ops-deployment
|
||||||
|
token: ${{ secrets.BOT_TOKEN }}
|
||||||
|
|
||||||
|
- name: Update backend-push-notifications deployment
|
||||||
|
working-directory: ./ops-deployment
|
||||||
|
run: |
|
||||||
|
NEW_TAG=${{ steps.get_tag.outputs.next }}
|
||||||
|
NEW_IMAGE="${{ env.IMAGE_PATH }}:$NEW_TAG"
|
||||||
|
git config user.name "automation-bot"
|
||||||
|
git config user.email "automation-bot@gansejunge.com"
|
||||||
|
|
||||||
|
sed -i "s|ghcr.io/$GHCR_ORG/$IMAGE_NAME:[0-9]\+|$NEW_IMAGE|g" "$OPS_PATH"
|
||||||
|
|
||||||
|
COMMIT_URL="https://git.gansejunge.com/${GITHUB_REPOSITORY}/commit/${GITHUB_SHA}"
|
||||||
|
git add $OPS_PATH
|
||||||
|
git commit -m "Update backend-push-notifications image to version $NEW_TAG" -m "Linked build commit: $COMMIT_URL"
|
||||||
|
git push origin $OPS_BRANCH_BASE
|
||||||
@ -144,10 +144,16 @@ class RabbitMQConsumer:
|
|||||||
if self._closing:
|
if self._closing:
|
||||||
logger.debug("[RabbitMQ] Skipping message because consumer is closing")
|
logger.debug("[RabbitMQ] Skipping message because consumer is closing")
|
||||||
return
|
return
|
||||||
|
|
||||||
async with message.process():
|
async with message.process():
|
||||||
try:
|
try:
|
||||||
data = json.loads(message.body.decode())
|
data = json.loads(message.body.decode())
|
||||||
logger.debug(f"[RabbitMQ] Received message: {data}")
|
logger.debug(f"[RabbitMQ] Received message: {data}")
|
||||||
|
|
||||||
|
if message.routing_key in [self.dlq_queue_name, "notify.dlq"]:
|
||||||
|
logger.info(f"[RabbitMQ] Message in DLQ, skipping processing: {data}")
|
||||||
|
return
|
||||||
|
|
||||||
uuid = data.get("uuid")
|
uuid = data.get("uuid")
|
||||||
|
|
||||||
if uuid:
|
if uuid:
|
||||||
@ -164,7 +170,15 @@ class RabbitMQConsumer:
|
|||||||
decrypted_token = decrypt_token(token)
|
decrypted_token = decrypt_token(token)
|
||||||
token_map[uuid] = decrypted_token
|
token_map[uuid] = decrypted_token
|
||||||
response = await send_notification(message=data,push_tokens=token_map)
|
response = await send_notification(message=data,push_tokens=token_map)
|
||||||
await self.validate_delivery(response,message)
|
|
||||||
|
# Validate delivery - any errors here should NOT prevent acknowledgment
|
||||||
|
# because notifications were already sent. Instead, we handle retries/DLQ
|
||||||
|
# by publishing new messages.
|
||||||
|
try:
|
||||||
|
await self.validate_delivery(response,message)
|
||||||
|
except Exception as validation_error:
|
||||||
|
logger.error(f"[RabbitMQ] Error during delivery validation: {validation_error}", exc_info=True)
|
||||||
|
metrics.MSG_FAILED.inc()
|
||||||
|
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error(f"[RabbitMQ] Bad message, discarding: {e}", exc_info=True)
|
logger.error(f"[RabbitMQ] Bad message, discarding: {e}", exc_info=True)
|
||||||
@ -174,7 +188,7 @@ class RabbitMQConsumer:
|
|||||||
await message.nack(requeue=True)
|
await message.nack(requeue=True)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"[RabbitMQ] Fatal error: {e}", exc_info=True)
|
logger.critical(f"[RabbitMQ] Fatal error: {e}", exc_info=True)
|
||||||
raise
|
metrics.MSG_FAILED.inc()
|
||||||
|
|
||||||
async def validate_delivery(self,response,message: aio_pika.IncomingMessage):
|
async def validate_delivery(self,response,message: aio_pika.IncomingMessage):
|
||||||
"""
|
"""
|
||||||
@ -208,37 +222,52 @@ class RabbitMQConsumer:
|
|||||||
See: https://docs.expo.dev/push-notifications/sending-notifications/
|
See: https://docs.expo.dev/push-notifications/sending-notifications/
|
||||||
"""
|
"""
|
||||||
for uuid, result in response.items():
|
for uuid, result in response.items():
|
||||||
status = result.get("status")
|
try:
|
||||||
data_list = result.get("data", {}).get("data", [])
|
status = result.get("status")
|
||||||
api_status = data_list[0].get("status") if data_list else None
|
data_list = result.get("data", {}).get("data", [])
|
||||||
|
|
||||||
if status == "ok" and api_status == "ok":
|
api_status = None
|
||||||
logger.info(f"[RabbitMQ] Notification delivered successfully to {uuid}")
|
api_error = None
|
||||||
metrics.MSG_PUBLISHED.inc()
|
if data_list and isinstance(data_list, list) and len(data_list) > 0:
|
||||||
logger.debug(f"[RabbitMQ] Metrics updated for published message: uuid={uuid}")
|
first_item = data_list[0]
|
||||||
|
if isinstance(first_item, dict):
|
||||||
|
api_status = first_item.get("status")
|
||||||
|
api_error = first_item.get("details", {}).get("error")
|
||||||
|
|
||||||
|
logger.debug(f"[RabbitMQ] Processing delivery for uuid={uuid}, status={status}, api_status={api_status}")
|
||||||
|
|
||||||
|
if status == "ok" and api_status == "ok":
|
||||||
|
logger.info(f"[RabbitMQ] Notification delivered successfully to {uuid}")
|
||||||
|
metrics.MSG_PUBLISHED.inc()
|
||||||
|
logger.debug(f"[RabbitMQ] Metrics updated for published message: uuid={uuid}")
|
||||||
|
|
||||||
|
|
||||||
if status == "ok" and api_status == "error":
|
elif status == "ok" and api_status == "error":
|
||||||
api_error = data_list[0].get("details", {}).get("error")
|
if api_error == "DeviceNotRegistered":
|
||||||
if api_error == "DeviceNotRegistered":
|
expired = await remove_inactive_push_token(uuid, self.db_manager)
|
||||||
expired = await remove_inactive_push_token(uuid, self.db_manager)
|
if expired:
|
||||||
if expired:
|
logger.info(f"[RabbitMQ] Device no longer registered for uuid {uuid}, marked expired")
|
||||||
logger.info(f"[RabbitMQ] Device no longer registered for uuid {uuid}, marked expired")
|
else:
|
||||||
|
logger.error(f"[RabbitMQ] Failed expiring token for uuid: {uuid}")
|
||||||
else:
|
else:
|
||||||
logger.error(f"[RabbitMQ] Failed expiring token for uuid: {uuid}")
|
await self.send_message_to_dlq(uuid, message)
|
||||||
else:
|
logger.debug(f"[RabbitMQ] Message sent to DLQ due to API error: uuid={uuid}, error={api_error}")
|
||||||
|
|
||||||
|
|
||||||
|
elif status == "error":
|
||||||
await self.send_message_to_dlq(uuid, message)
|
await self.send_message_to_dlq(uuid, message)
|
||||||
logger.debug(f"[RabbitMQ] Message sent to DLQ due to API error: uuid={uuid}")
|
logger.debug(f"[RabbitMQ] Message sent to DLQ due to status=error: uuid={uuid}")
|
||||||
|
|
||||||
|
elif status == "failure":
|
||||||
|
await self.send_message_to_retry_queue(uuid, message)
|
||||||
|
logger.debug(f"[RabbitMQ] Message requeued for retry due to status=failure: uuid={uuid}")
|
||||||
|
|
||||||
if status == "error":
|
except Exception as e:
|
||||||
await self.send_message_to_dlq(uuid, message)
|
logger.error(f"[RabbitMQ] Error processing delivery validation for uuid={uuid}: {e}", exc_info=True)
|
||||||
logger.debug(f"[RabbitMQ] Message sent to DLQ due to status=error: uuid={uuid}")
|
try:
|
||||||
|
await self.send_message_to_dlq(uuid,message)
|
||||||
if status == "failure":
|
except Exception as dlQ_error:
|
||||||
await self.send_message_to_retry_queue(uuid, message)
|
logger.error(f"[RabbitMQ] Failed to send to DLQ for uuid={uuid}: {dlq_error}")
|
||||||
logger.debug(f"[RabbitMQ] Message requeued for retry due to status=failure: uuid={uuid}")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def consume(self):
|
async def consume(self):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user