From 3758c758080521220e38c6b8545f1edccd4b0dc4 Mon Sep 17 00:00:00 2001 From: Florian Date: Fri, 17 Oct 2025 09:55:51 +0200 Subject: [PATCH] Unified logging behaviour - Logger doesn't start with log level DEBUG by default, instead reads a environment variable - Secret handler raises exceptions instead of using the module os to exit - Added extensive debug logging - Fixed RabbitMQ exception catch being too broad in scope --- src/db.py | 14 +++++++++++++ src/logger_handler.py | 8 ++++++- src/main.py | 18 ++++++++++++---- src/rabbitmq_handler.py | 21 ++++++++++++------- src/secret_handler.py | 12 ++++++----- src/uvicorn_logging_config.py | 39 ----------------------------------- 6 files changed, 56 insertions(+), 56 deletions(-) delete mode 100644 src/uvicorn_logging_config.py diff --git a/src/db.py b/src/db.py index 8f70f85..7d4dd8c 100644 --- a/src/db.py +++ b/src/db.py @@ -35,6 +35,7 @@ def create_connection_pool(): global _connection_pool with _pool_lock: if _connection_pool is not None: + logger.debug("[MySQL] Pool already exists, returning existing pool") return for attempt in range (1,MAX_RETRIES+1): try: @@ -53,6 +54,7 @@ def create_connection_pool(): except Error as e: logger.warning(f"[MySQL] Attempt {attempt} failed: {e}") if attempt < MAX_RETRIES: + logger.debug(f"[MySQL] Retrying in {RETRY_DELAY} seconds...") time.sleep(RETRY_DELAY) logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.") @@ -62,34 +64,43 @@ def close_connection_pool(): global _connection_pool with _pool_lock: if _connection_pool: + logger.debug(f"[MySQL] Closing pool: {_connection_pool}") _connection_pool = None logger.info("[MySQL] Connection pool closed") _stop_healthcheck.set() + logger.debug("[MySQL] Healthcheck stop flag set") def get_connection_pool(): global _connection_pool with _pool_lock: if _connection_pool is None: + logger.debug("[MySQL] No pool found, creating one") create_connection_pool() + else: + logger.debug(f"[MySQL] Returning existing pool: {_connection_pool}") return _connection_pool def get_db(): pool = get_connection_pool() + logger.debug(f"[MySQL] Acquiring connection from pool: {pool}") conn = pool.get_connection() try: conn.ping(reconnect=True, attempts=3, delay=1) + logger.debug("[MySQL] Connection alive") except Error as e: logger.warning(f"[MySQL] Connection dead, recreating pool: {e}") create_connection_pool() pool = get_connection_pool() conn = pool.get_connection() + logger.debug("[MySQL] Reconnected successfully") try: yield conn finally: if conn and conn.is_connected(): conn.close() + logger.debug("[MySQL] Connection returned to pool") def _pool_healthcheck(): while not _stop_healthcheck.is_set(): @@ -97,11 +108,13 @@ def _pool_healthcheck(): with _pool_lock: pool = _connection_pool if not pool: + logger.debug("[MySQL] Healthcheck skipped, pool not initialized") continue try: conn = pool.get_connection() conn.ping(reconnect=True, attempts=3, delay=1) conn.close() + logger.debug(f"[MySQL] Pool healthcheck succeeded") except Error as e: logger.warning(f"[MySQL] Pool healthcheck failed: {e}") create_connection_pool() @@ -110,6 +123,7 @@ def _pool_healthcheck(): def start_healthcheck_thread(): global _health_thread if _health_thread and _health_thread.is_alive(): + logger.debug("[MySQL] Healthcheck thread already running") return _stop_healthcheck.clear() _health_thread = threading.Thread(target=_pool_healthcheck, daemon=True) diff --git a/src/logger_handler.py b/src/logger_handler.py index 25c121d..3911736 100644 --- a/src/logger_handler.py +++ b/src/logger_handler.py @@ -1,4 +1,9 @@ import logging +import os + +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}: + LOG_LEVEL = "INFO" def setup_logger(name: str) -> logging.Logger: logger = logging.getLogger(name) @@ -9,5 +14,6 @@ def setup_logger(name: str) -> logging.Logger: ) handler.setFormatter(formatter) logger.addHandler(handler) - logger.setLevel(logging.DEBUG) + logger.setLevel(getattr(logging, LOG_LEVEL)) + logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}") return logger diff --git a/src/main.py b/src/main.py index 635a060..b847442 100644 --- a/src/main.py +++ b/src/main.py @@ -6,10 +6,9 @@ from typing import Dict from pydantic import BaseModel from validator import verify_api_key from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread -from logger_handler import setup_logger +from logger_handler import setup_logger, LOG_LEVEL from rabbitmq_handler import RabbitMQProducer import uvicorn -from uvicorn_logging_config import LOGGING_CONFIG from contextlib import asynccontextmanager from metrics_server import REQUEST_COUNTER import asyncio @@ -54,26 +53,34 @@ api = FastAPI( @api.middleware("http") async def prometheus_middleware(request: Request, call_next): + logger.debug(f"Incoming request: {request.method} {request.url.path}") status = 500 try: response = await call_next(request) status = response.status_code + logger.debug(f"Request processed with status: {status}") except Exception: + logger.exception(f"Exception during request processing: {e}") raise finally: REQUEST_COUNTER.labels(request.method, request.url.path, status).inc() + logger.debug(f"Prometheus counter incremented: {request.method} {request.url.path} {status}") return response def verify_api_key_dependency_internal(db=Depends(get_db), api_key: str = Depends(api_key_header_internal)) -> str: + logger.debug(f"Verifying internal API key: {api_key}") cursor = db.cursor() cursor.execute("SELECT program_name, api_key FROM internal_api_keys WHERE status = 'active'") for program_name, hashed_key in cursor.fetchall(): if verify_api_key(api_key=api_key, hashed=hashed_key): + logger.debug(f"API key verified for program: {program_name}") return program_name + logger.warning("Internal API key verification failed") raise HTTPException(status_code=403, detail="Unauthorized") @api.exception_handler(StarletteHTTPException) async def custom_http_exception_handler(request,exc): + logger.debug(f"Handling HTTP exception: {exc}") if exc.status_code == 404: return JSONResponse( status_code=403, @@ -93,6 +100,7 @@ async def receive_notifications( ): logger.info(f"Received notifcation data from {program_name} for RMQ") + logger.debug(f"Notification data: {notification_data}") await request.app.state.rmq_producer.publish( notification_data.receipent_user_id, notification_data.message @@ -101,13 +109,15 @@ async def receive_notifications( return {"status": "queued"} async def start_servers(): - config_main = uvicorn.Config("main:api", host="0.0.0.0", port=8101, log_level="info") - config_metrics = uvicorn.Config("metrics_server:metrics_api", host="0.0.0.0", port=9000, log_level="info") + logger.debug("Starting FastAPI and metrics servers") + config_main = uvicorn.Config("main:api", host="0.0.0.0", port=8101, log_level=LOG_LEVEL.lower()) + config_metrics = uvicorn.Config("metrics_server:metrics_api", host="0.0.0.0", port=9000, log_level=LOG_LEVEL.lower()) server_main = uvicorn.Server(config_main) server_metrics = uvicorn.Server(config_metrics) await asyncio.gather(server_main.serve(), server_metrics.serve()) + logger.debug("Servers started") if __name__ == "__main__": asyncio.run(start_servers()) diff --git a/src/rabbitmq_handler.py b/src/rabbitmq_handler.py index 1aa4495..1d32649 100644 --- a/src/rabbitmq_handler.py +++ b/src/rabbitmq_handler.py @@ -1,5 +1,6 @@ import asyncio import aio_pika +from aio_pika.exceptions import AMQPException from secret_handler import return_credentials import os from logger_handler import setup_logger @@ -27,26 +28,30 @@ class RabbitMQProducer: self._flush_task: asyncio.Task | None = None self._closing = False self._ready = asyncio.Event() + logger.debug(f"[RabbitMQ] Initialized producer for exchange '{self.exchange_name}'") async def connect(self): + logger.info(f"[RabbitMQ] Connecting to {self.url}...") self.connection = await aio_pika.connect_robust(self.url) self.channel = await self.connection.channel(publisher_confirms=True) self.exchange = await self.channel.declare_exchange( self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True ) - logger.info(f"[aio-pika] Connected and exchange '{self.exchange_name}' ready.") + logger.info(f"[RabbitMQ] Connected and exchange '{self.exchange_name}' ready.") self._ready.set() self._flush_task = asyncio.create_task(self._flush_queue_loop()) + logger.debug("[RabbitMQ] Flush queue loop started") async def publish(self, routing_key: int, message: dict): await self._queue.put((routing_key, message)) logger.debug(f"[RabbitMQ] Queued message for {routing_key}") async def _flush_queue_loop(self): + logger.debug("[RabbitMQ] Waiting for ready signal before flushing queue") await self._ready.wait() - logger.debug(f"here") while True: routing_key, message = await self._queue.get() + logger.debug(f"[RabbitMQ] Publishing message to routing key {routing_key}: {message}") try: await self.exchange.publish( aio_pika.Message( @@ -57,25 +62,27 @@ class RabbitMQProducer: routing_key=f"notify.user.{routing_key}", mandatory=True ) - logger.debug(f"[aio-pika] Published message to notify.user.{routing_key}") - except Exception as e: - logger.warning(f"[aio-pika] Publish failed, requeuing: {e}") + logger.debug(f"[RabbitMQ] Successfully published to notify.user.{routing_key}") + except (AMQPException, ConnectionError, OSError) as e: + logger.warning(f"[RabbitMQ] Publish failed, requeuing message: {e}", exc_info=True) await asyncio.sleep(2) await self._queue.put((routing_key, message)) finally: self._queue.task_done() async def close(self): + logger.info("[RabbitMQ] Closing producer...") self._closing = True if self._flush_task and not self._flush_task.done(): self._flush_task.cancel() try: await self._flush_task + logger.debug("[RabbitMQ] Flush task cancelled successfully") except asyncio.CancelledError: - pass + logger.debug("[RabbitMQ] Flush task cancellation caught") if self.connection: await self.connection.close() - logger.info("[aio-pika] Connection closed.") + logger.info("[RabbitMQ] Connection closed") async def main(): diff --git a/src/secret_handler.py b/src/secret_handler.py index 33d66a5..2f0d133 100644 --- a/src/secret_handler.py +++ b/src/secret_handler.py @@ -1,12 +1,14 @@ -import sys +from logger_handler import setup_logger + +logger = setup_logger(__name__) def return_credentials(path: str)->str: try: with open (path) as file: return file.read().strip() except FileNotFoundError: - print(f"[FATAL] Secret file not found: {path}") - sys.exit(1) + logger.fatal(f"[FATAL] Secret file not found: {path}") + raise except Exception as e: - print(f"[FATAL] Failed to read secret file {path}: {e}") - sys.exit(1) + logger.fatal(f"[FATAL] Failed to read secret file {path}: {e}") + raise diff --git a/src/uvicorn_logging_config.py b/src/uvicorn_logging_config.py deleted file mode 100644 index a2854db..0000000 --- a/src/uvicorn_logging_config.py +++ /dev/null @@ -1,39 +0,0 @@ -LOGGING_CONFIG = { - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "default": { - "format": "%(asctime)s - %(levelname)s - %(name)s - %(message)s", - "datefmt": "%Y-%m-%d %H:%M:%S", - } - }, - "handlers": { - "default": { - "class": "logging.StreamHandler", - "formatter": "default", - "stream": "ext://sys.stdout" - } - }, - "loggers": { - "": { # root logger - "handlers": ["default"], - "level": "INFO", - "propagate": False - }, - "uvicorn": { - "handlers": ["default"], - "level": "INFO", - "propagate": False - }, - "uvicorn.error": { - "handlers": ["default"], - "level": "INFO", - "propagate": False - }, - "uvicorn.access": { - "handlers": ["default"], - "level": "INFO", - "propagate": False - } - } -} -- 2.43.0