from fastapi import FastAPI, Query, Depends, HTTPException, Header from fastapi.responses import JSONResponse from fastapi.security.api_key import APIKeyHeader from starlette.exceptions import HTTPException as StarletteHTTPException from typing import Optional,List,Dict from pydantic import BaseModel from validator import is_valid_platform,is_valid_token,verify_api_key from hvac_handler import encrypt_token from db import get_db from logger_handler import setup_logger import uuid from rabbitmq_handler import send_message_to_rmq from hashlib import sha256 import uvicorn from uvicorn_logging_config import LOGGING_CONFIG logger = setup_logger(__name__) api_key_header = APIKeyHeader(name="X-API-Key") api_key_header_internal = APIKeyHeader(name="X-API-Key-Internal") def hash_token(token: str) -> str: return sha256(token.encode()).hexdigest() class TokenRequest(BaseModel): user_id : int token : str platform : str app_ver : str locale : Optional[str] = None topics : Optional[List[str]] = None class Notification(BaseModel): user_id : int message : Dict api = FastAPI( title="Device Token Management", description="API for requesting tokens", version="1.0.0" ) def verify_api_key_dependency_internal(): return True def verify_api_key_dependency(db=Depends(get_db), api_key: str = Depends(api_key_header)) -> int: cursor = db.cursor() cursor.execute("SELECT user_id, api_key FROM users WHERE status = 'active'") for user_id, hashed_key in cursor.fetchall(): if verify_api_key(api_key=api_key, hashed=hashed_key): return user_id raise HTTPException(status_code=403, detail="Unauthorized here") @api.exception_handler(StarletteHTTPException) async def custom_http_exception_handler(request,exc): if exc.status_code == 404: return JSONResponse( status_code=401, content={"detail": "Unauthorized"} ) return JSONResponse( status_code=exc.status_code, content={"detail": exc.detail} ) @api.post("/register_token") def register_token( request_data: TokenRequest, db = Depends(get_db), user_id: int = Depends(verify_api_key_dependency) ): logger.info(f"Registering token for user_id={user_id}, platform={request_data.platform}") if not is_valid_platform(request_data.platform) or not is_valid_token(request_data.token): raise HTTPException(status_code=403,detail="Unathorized") secure_token = encrypt_token(request_data.token) hashed_token = hash_token(request_data.token) try: cursor = db.cursor() cursor.execute( "SELECT * FROM device_tokens WHERE user_id=%s AND hashed_token=%s", (user_id,hashed_token)) existing = cursor.fetchone() if existing: cursor.execute(""" UPDATE device_tokens SET platform=%s, app_ver=%s, locale=%s, topics=%s, last_seen_at=NOW() WHERE user_id=%s AND hashed_token=%s """, (request_data.platform, request_data.app_ver, request_data.locale, request_data.topics, user_id, hashed_token )) else: token_id = str(uuid.uuid4()) logger.info(f"Creating new entry user_id={user_id}, token_id={token_id}") cursor.execute(""" INSERT INTO device_tokens (token_id, user_id, platform, token, hashed_token, status, app_ver, locale, topics, created_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW()) """, (token_id, user_id, request_data.platform, secure_token, hashed_token, 'active', request_data.app_ver, request_data.locale, request_data.topics )) db.commit() logger.info(f"Success: Registering token for user_id={user_id}, platform={request_data.platform}") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) return {"status":"registered"} @api.post("/unregister-token") def unregister_token( request_data: TokenRequest, db = Depends(get_db), user_id: int = Depends(verify_api_key_dependency) ): logger.info(f"Unregistering token for user_id={user_id}, platform={request_data.platform}") hashed_token = hash_token(request_data.token) try: cursor = db.cursor() cursor.execute(""" UPDATE device_tokens SET status=%s, last_seen_at=NOW() WHERE hashed_token=%s """, ('expired', hashed_token)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) logger.info(f"Success: Unregistering token for user_id={user_id}, platform={request_data.platform}") return {"status":"unregistered"} @api.post("/internal/receive-notifications") def receive_notifications( notification_data: Notification, db = Depends(get_db), is_allowed: bool = Depends(verify_api_key_dependency_internal) ): send_message_to_rmq(notification_data.user_id,notification_data.message) return {"status": "queued"} if __name__ == "__main__": uvicorn.run( "main:api", host="0.0.0.0", port=8100, log_config=LOGGING_CONFIG, log_level="info" )