diff --git a/db.py b/db.py new file mode 100644 index 0000000..672f529 --- /dev/null +++ b/db.py @@ -0,0 +1,42 @@ +import mysql.connector +from mysql.connector import pooling +import threading + +MYSQL_CONFIG = { + "host": "localhost", + "user": "florian", + "password": "password123++", + "database": "app" +} + +# Lock to ensure thread-safe pool creation +_pool_lock = threading.Lock() +_connection_pool = None + +def get_connection_pool(): + global _connection_pool + with _pool_lock: + if _connection_pool is None: + _connection_pool = mysql.connector.pooling.MySQLConnectionPool( + pool_name="mypool", + pool_size=5, + pool_reset_session=True, + **MYSQL_CONFIG + ) + return _connection_pool + +# Dependency for FastAPI +def get_db(): + pool = get_connection_pool() + conn = pool.get_connection() + try: + yield conn + finally: + conn.close() + +if __name__ == "__main__": + # Manual test + for conn in get_db(): + cursor = conn.cursor(dictionary=True) + cursor.execute("SELECT NOW() AS ts") + print(cursor.fetchone()) diff --git a/hvac_handler.py b/hvac_handler.py new file mode 100644 index 0000000..5784555 --- /dev/null +++ b/hvac_handler.py @@ -0,0 +1,18 @@ +import base64 +import hvac + + +client = hvac.Client( + url='http://127.0.0.1:8200', + token='root' +) + +def decrypt_token(ciphertext: str) -> str: + response = client.secrets.transit.decrypt_data( + name='push-tokens', + ciphertext=ciphertext + ) + plaintext_b64 = response['data']['plaintext'] + return base64.b64decode(plaintext_b64).decode() + + diff --git a/internal_api_database.sql b/internal_api_database.sql new file mode 100644 index 0000000..38293e3 --- /dev/null +++ b/internal_api_database.sql @@ -0,0 +1,11 @@ +CREATE TABLE internal_api_keys ( + id BIGSERIAL PRIMARY KEY, + program_name VARCHAR(255) NOT NULL, + api_key VARCHAR(255) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + status VARCHAR(20) NOT NULL DEFAULT 'active', + description TEXT NULL +); + +CREATE INDEX idx_internal_api_keys_program_name + ON internal_api_keys(program_name); diff --git a/logger_handler.py b/logger_handler.py new file mode 100644 index 0000000..25c121d --- /dev/null +++ b/logger_handler.py @@ -0,0 +1,13 @@ +import logging + +def setup_logger(name: str) -> logging.Logger: + logger = logging.getLogger(name) + if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + return logger diff --git a/main.py b/main.py new file mode 100644 index 0000000..2b5860d --- /dev/null +++ b/main.py @@ -0,0 +1,70 @@ +from fastapi import FastAPI, Query, Depends, HTTPException, Header +from fastapi.responses import JSONResponse +from fastapi.security.api_key import APIKeyHeader +from starlette.exceptions import HTTPException as StarletteHTTPException +from typing import Dict +from pydantic import BaseModel +from validator import verify_api_key +from db import get_db +from logger_handler import setup_logger +from rabbitmq_handler import send_message_to_rmq +import uvicorn +from uvicorn_logging_config import LOGGING_CONFIG + + + +logger = setup_logger(__name__) + +api_key_header_internal = APIKeyHeader(name="X-API-Key-Internal") + +class Notification(BaseModel): + receipent_user_id : int + message : Dict + +api = FastAPI( + title="Internal Notifier API", + description="API to forward messages to RabbitMQ", + version="1.0.0" +) + +def verify_api_key_dependency_internal(db=Depends(get_db), api_key: str = Depends(api_key_header_internal)) -> str: + cursor = db.cursor() + cursor.execute("SELECT program_name, api_key FROM internal_api_keys WHERE status = 'active'") + for program_name, hashed_key in cursor.fetchall(): + if verify_api_key(api_key=api_key, hashed=hashed_key): + return program_name + raise HTTPException(status_code=403, detail="Unauthorized") + +@api.exception_handler(StarletteHTTPException) +async def custom_http_exception_handler(request,exc): + if exc.status_code == 404: + return JSONResponse( + status_code=403, + content={"detail": "Unauthorized"} + ) + return JSONResponse( + status_code=exc.status_code, + content={"detail": exc.detail} + ) + + +@api.post("/internal/receive-notifications") +def receive_notifications( + notification_data: Notification, + db = Depends(get_db), + program_name: str = Depends(verify_api_key_dependency_internal) +): + logger.info(f"Received notifcation data from {program_name} for RMQ") + send_message_to_rmq(notification_data.user_id,notification_data.message) + logger.info("Successfully delivered message to RMQ") + return {"status": "queued"} + + +if __name__ == "__main__": + uvicorn.run( + "main:api", + host="0.0.0.0", + port=8101, + log_config=LOGGING_CONFIG, + log_level="info" + ) diff --git a/rabbitmq.md b/rabbitmq.md new file mode 100644 index 0000000..df91c04 --- /dev/null +++ b/rabbitmq.md @@ -0,0 +1,29 @@ +rabbitmqctl add_vhost app_notifications +rabbitmqctl add_user notifier strongpassword +rabbitmqctl set_user_tags notifier management +rabbitmqctl set_permissions -p app_notifications notifier ".*" ".*" ".*" +rabbitmqadmin --username "admin" --password "admin" declare exchange --vhost "app_notifications" --name "app_notifications" --type "topic" --durable "true" +rabbitmqadmin --username "admin" --password "admin" declare queue --vhost "app_notifications" --name "notifications_retry" + --durable "true" + rabbitmqadmin --username "admin" --password "admin" declare queue --vhost "app_notifications" --name "notifications_dlq" + --durable "true" +rabbitmqadmin --username "admin" --password "admin" declare queue --vhost "app_notifications" --name "notifications" + --durable "true" + rabbitmqadmin --username "admin" --password "admin" declare binding --vhost "app_notifications" --source "app_notifications" --destination "notifications" --destination-type "queue" --routing-key "notify.*" + + # Retry policy: messages stay for 30s before going back to main queue +rabbitmqctl set_policy \ + --vhost app_notifications \ + retry_policy "^notifications_retry$" \ + '{"dead-letter-exchange":"app_notifications", + "dead-letter-routing-key":"notify.retry", + "message-ttl":30000}' \ + --apply-to queues + +# DLQ policy: permanent dead letter storage +rabbitmqctl set_policy \ + --vhost app_notifications \ + dlq_policy "^notifications$" \ + '{"dead-letter-exchange":"app_notifications", + "dead-letter-routing-key":"notify.dlq"}' \ + --apply-to queues \ No newline at end of file diff --git a/rabbitmq_handler.py b/rabbitmq_handler.py new file mode 100644 index 0000000..d617614 --- /dev/null +++ b/rabbitmq_handler.py @@ -0,0 +1,21 @@ +import pika +from typing import Dict +import json + +def send_message_to_rmq(user_id: int, message: Dict): + credentials = pika.credentials.PlainCredentials(username="notifier",password="strongpassword") + conn_params = pika.ConnectionParameters(host="localhost", + credentials=credentials,virtual_host="app_notifications") + connection = pika.BlockingConnection(conn_params) + #connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + channel.confirm_delivery() + + channel.basic_publish(exchange='app_notifications', + routing_key=f"notify.user.{user_id}", + body=json.dumps(message), + properties=pika.BasicProperties( + content_type="application/json", + delivery_mode=2 + )) + diff --git a/uvicorn_logging_config.py b/uvicorn_logging_config.py new file mode 100644 index 0000000..a2854db --- /dev/null +++ b/uvicorn_logging_config.py @@ -0,0 +1,39 @@ +LOGGING_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "default": { + "format": "%(asctime)s - %(levelname)s - %(name)s - %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S", + } + }, + "handlers": { + "default": { + "class": "logging.StreamHandler", + "formatter": "default", + "stream": "ext://sys.stdout" + } + }, + "loggers": { + "": { # root logger + "handlers": ["default"], + "level": "INFO", + "propagate": False + }, + "uvicorn": { + "handlers": ["default"], + "level": "INFO", + "propagate": False + }, + "uvicorn.error": { + "handlers": ["default"], + "level": "INFO", + "propagate": False + }, + "uvicorn.access": { + "handlers": ["default"], + "level": "INFO", + "propagate": False + } + } +} diff --git a/validator.py b/validator.py new file mode 100644 index 0000000..bb9a9cc --- /dev/null +++ b/validator.py @@ -0,0 +1,20 @@ +from argon2 import PasswordHasher + +ph = PasswordHasher() + +def hash_api_key(api_key: str) -> str: + return ph.hash(api_key) + +def verify_api_key(api_key: str, hashed: str) -> bool: + try: + return ph.verify(hashed, api_key) + except Exception: + return False + +if __name__=="__main__": + plain_key = "super-secret-api-key" + #hashed_key = hash_api_key(plain_key) + hashed_key = '$argon2id$v=19$m=65536,t=3,p=4$vqU+MRafVW1b8AtF+zHb0w$p1J4Gyb0jhlVtKgYyjTITxfU97YaayeS3s3qFFP5sVM' + + print("Hashed API Key:", hashed_key) + print("Verification:", verify_api_key(plain_key, hashed_key))