Initial draft

This commit is contained in:
Florian 2025-10-03 19:39:17 +02:00
parent ea27160c0d
commit d356ec8428
9 changed files with 263 additions and 0 deletions

42
db.py Normal file
View File

@ -0,0 +1,42 @@
import mysql.connector
from mysql.connector import pooling
import threading
MYSQL_CONFIG = {
"host": "localhost",
"user": "florian",
"password": "password123++",
"database": "app"
}
# Lock to ensure thread-safe pool creation
_pool_lock = threading.Lock()
_connection_pool = None
def get_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool is None:
_connection_pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=5,
pool_reset_session=True,
**MYSQL_CONFIG
)
return _connection_pool
# Dependency for FastAPI
def get_db():
pool = get_connection_pool()
conn = pool.get_connection()
try:
yield conn
finally:
conn.close()
if __name__ == "__main__":
# Manual test
for conn in get_db():
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT NOW() AS ts")
print(cursor.fetchone())

18
hvac_handler.py Normal file
View File

@ -0,0 +1,18 @@
import base64
import hvac
client = hvac.Client(
url='http://127.0.0.1:8200',
token='root'
)
def decrypt_token(ciphertext: str) -> str:
response = client.secrets.transit.decrypt_data(
name='push-tokens',
ciphertext=ciphertext
)
plaintext_b64 = response['data']['plaintext']
return base64.b64decode(plaintext_b64).decode()

11
internal_api_database.sql Normal file
View File

@ -0,0 +1,11 @@
CREATE TABLE internal_api_keys (
id BIGSERIAL PRIMARY KEY,
program_name VARCHAR(255) NOT NULL,
api_key VARCHAR(255) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
status VARCHAR(20) NOT NULL DEFAULT 'active',
description TEXT NULL
);
CREATE INDEX idx_internal_api_keys_program_name
ON internal_api_keys(program_name);

13
logger_handler.py Normal file
View File

@ -0,0 +1,13 @@
import logging
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger

70
main.py Normal file
View File

@ -0,0 +1,70 @@
from fastapi import FastAPI, Query, Depends, HTTPException, Header
from fastapi.responses import JSONResponse
from fastapi.security.api_key import APIKeyHeader
from starlette.exceptions import HTTPException as StarletteHTTPException
from typing import Dict
from pydantic import BaseModel
from validator import verify_api_key
from db import get_db
from logger_handler import setup_logger
from rabbitmq_handler import send_message_to_rmq
import uvicorn
from uvicorn_logging_config import LOGGING_CONFIG
logger = setup_logger(__name__)
api_key_header_internal = APIKeyHeader(name="X-API-Key-Internal")
class Notification(BaseModel):
receipent_user_id : int
message : Dict
api = FastAPI(
title="Internal Notifier API",
description="API to forward messages to RabbitMQ",
version="1.0.0"
)
def verify_api_key_dependency_internal(db=Depends(get_db), api_key: str = Depends(api_key_header_internal)) -> str:
cursor = db.cursor()
cursor.execute("SELECT program_name, api_key FROM internal_api_keys WHERE status = 'active'")
for program_name, hashed_key in cursor.fetchall():
if verify_api_key(api_key=api_key, hashed=hashed_key):
return program_name
raise HTTPException(status_code=403, detail="Unauthorized")
@api.exception_handler(StarletteHTTPException)
async def custom_http_exception_handler(request,exc):
if exc.status_code == 404:
return JSONResponse(
status_code=403,
content={"detail": "Unauthorized"}
)
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail}
)
@api.post("/internal/receive-notifications")
def receive_notifications(
notification_data: Notification,
db = Depends(get_db),
program_name: str = Depends(verify_api_key_dependency_internal)
):
logger.info(f"Received notifcation data from {program_name} for RMQ")
send_message_to_rmq(notification_data.user_id,notification_data.message)
logger.info("Successfully delivered message to RMQ")
return {"status": "queued"}
if __name__ == "__main__":
uvicorn.run(
"main:api",
host="0.0.0.0",
port=8101,
log_config=LOGGING_CONFIG,
log_level="info"
)

29
rabbitmq.md Normal file
View File

@ -0,0 +1,29 @@
rabbitmqctl add_vhost app_notifications
rabbitmqctl add_user notifier strongpassword
rabbitmqctl set_user_tags notifier management
rabbitmqctl set_permissions -p app_notifications notifier ".*" ".*" ".*"
rabbitmqadmin --username "admin" --password "admin" declare exchange --vhost "app_notifications" --name "app_notifications" --type "topic" --durable "true"
rabbitmqadmin --username "admin" --password "admin" declare queue --vhost "app_notifications" --name "notifications_retry"
--durable "true"
rabbitmqadmin --username "admin" --password "admin" declare queue --vhost "app_notifications" --name "notifications_dlq"
--durable "true"
rabbitmqadmin --username "admin" --password "admin" declare queue --vhost "app_notifications" --name "notifications"
--durable "true"
rabbitmqadmin --username "admin" --password "admin" declare binding --vhost "app_notifications" --source "app_notifications" --destination "notifications" --destination-type "queue" --routing-key "notify.*"
# Retry policy: messages stay for 30s before going back to main queue
rabbitmqctl set_policy \
--vhost app_notifications \
retry_policy "^notifications_retry$" \
'{"dead-letter-exchange":"app_notifications",
"dead-letter-routing-key":"notify.retry",
"message-ttl":30000}' \
--apply-to queues
# DLQ policy: permanent dead letter storage
rabbitmqctl set_policy \
--vhost app_notifications \
dlq_policy "^notifications$" \
'{"dead-letter-exchange":"app_notifications",
"dead-letter-routing-key":"notify.dlq"}' \
--apply-to queues

21
rabbitmq_handler.py Normal file
View File

@ -0,0 +1,21 @@
import pika
from typing import Dict
import json
def send_message_to_rmq(user_id: int, message: Dict):
credentials = pika.credentials.PlainCredentials(username="notifier",password="strongpassword")
conn_params = pika.ConnectionParameters(host="localhost",
credentials=credentials,virtual_host="app_notifications")
connection = pika.BlockingConnection(conn_params)
#connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.confirm_delivery()
channel.basic_publish(exchange='app_notifications',
routing_key=f"notify.user.{user_id}",
body=json.dumps(message),
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
))

39
uvicorn_logging_config.py Normal file
View File

@ -0,0 +1,39 @@
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(levelname)s - %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"handlers": {
"default": {
"class": "logging.StreamHandler",
"formatter": "default",
"stream": "ext://sys.stdout"
}
},
"loggers": {
"": { # root logger
"handlers": ["default"],
"level": "INFO",
"propagate": False
},
"uvicorn": {
"handlers": ["default"],
"level": "INFO",
"propagate": False
},
"uvicorn.error": {
"handlers": ["default"],
"level": "INFO",
"propagate": False
},
"uvicorn.access": {
"handlers": ["default"],
"level": "INFO",
"propagate": False
}
}
}

20
validator.py Normal file
View File

@ -0,0 +1,20 @@
from argon2 import PasswordHasher
ph = PasswordHasher()
def hash_api_key(api_key: str) -> str:
return ph.hash(api_key)
def verify_api_key(api_key: str, hashed: str) -> bool:
try:
return ph.verify(hashed, api_key)
except Exception:
return False
if __name__=="__main__":
plain_key = "super-secret-api-key"
#hashed_key = hash_api_key(plain_key)
hashed_key = '$argon2id$v=19$m=65536,t=3,p=4$vqU+MRafVW1b8AtF+zHb0w$p1J4Gyb0jhlVtKgYyjTITxfU97YaayeS3s3qFFP5sVM'
print("Hashed API Key:", hashed_key)
print("Verification:", verify_api_key(plain_key, hashed_key))