Merge pull request 'Unified logging behaviour' (#6) from feature/extended-logging into main
All checks were successful
Build & Publish to GHCR / build (push) Successful in 35s
All checks were successful
Build & Publish to GHCR / build (push) Successful in 35s
Reviewed-on: #6
This commit is contained in:
commit
991a08571c
14
src/db.py
14
src/db.py
@ -35,6 +35,7 @@ def create_connection_pool():
|
||||
global _connection_pool
|
||||
with _pool_lock:
|
||||
if _connection_pool is not None:
|
||||
logger.debug("[MySQL] Pool already exists, returning existing pool")
|
||||
return
|
||||
for attempt in range (1,MAX_RETRIES+1):
|
||||
try:
|
||||
@ -53,6 +54,7 @@ def create_connection_pool():
|
||||
except Error as e:
|
||||
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
|
||||
if attempt < MAX_RETRIES:
|
||||
logger.debug(f"[MySQL] Retrying in {RETRY_DELAY} seconds...")
|
||||
time.sleep(RETRY_DELAY)
|
||||
|
||||
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
|
||||
@ -62,34 +64,43 @@ def close_connection_pool():
|
||||
global _connection_pool
|
||||
with _pool_lock:
|
||||
if _connection_pool:
|
||||
logger.debug(f"[MySQL] Closing pool: {_connection_pool}")
|
||||
_connection_pool = None
|
||||
logger.info("[MySQL] Connection pool closed")
|
||||
_stop_healthcheck.set()
|
||||
logger.debug("[MySQL] Healthcheck stop flag set")
|
||||
|
||||
def get_connection_pool():
|
||||
global _connection_pool
|
||||
with _pool_lock:
|
||||
if _connection_pool is None:
|
||||
logger.debug("[MySQL] No pool found, creating one")
|
||||
create_connection_pool()
|
||||
else:
|
||||
logger.debug(f"[MySQL] Returning existing pool: {_connection_pool}")
|
||||
return _connection_pool
|
||||
|
||||
def get_db():
|
||||
pool = get_connection_pool()
|
||||
logger.debug(f"[MySQL] Acquiring connection from pool: {pool}")
|
||||
conn = pool.get_connection()
|
||||
|
||||
try:
|
||||
conn.ping(reconnect=True, attempts=3, delay=1)
|
||||
logger.debug("[MySQL] Connection alive")
|
||||
except Error as e:
|
||||
logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
|
||||
create_connection_pool()
|
||||
pool = get_connection_pool()
|
||||
conn = pool.get_connection()
|
||||
logger.debug("[MySQL] Reconnected successfully")
|
||||
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
if conn and conn.is_connected():
|
||||
conn.close()
|
||||
logger.debug("[MySQL] Connection returned to pool")
|
||||
|
||||
def _pool_healthcheck():
|
||||
while not _stop_healthcheck.is_set():
|
||||
@ -97,11 +108,13 @@ def _pool_healthcheck():
|
||||
with _pool_lock:
|
||||
pool = _connection_pool
|
||||
if not pool:
|
||||
logger.debug("[MySQL] Healthcheck skipped, pool not initialized")
|
||||
continue
|
||||
try:
|
||||
conn = pool.get_connection()
|
||||
conn.ping(reconnect=True, attempts=3, delay=1)
|
||||
conn.close()
|
||||
logger.debug(f"[MySQL] Pool healthcheck succeeded")
|
||||
except Error as e:
|
||||
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
|
||||
create_connection_pool()
|
||||
@ -110,6 +123,7 @@ def _pool_healthcheck():
|
||||
def start_healthcheck_thread():
|
||||
global _health_thread
|
||||
if _health_thread and _health_thread.is_alive():
|
||||
logger.debug("[MySQL] Healthcheck thread already running")
|
||||
return
|
||||
_stop_healthcheck.clear()
|
||||
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
|
||||
|
||||
@ -1,4 +1,9 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
|
||||
LOG_LEVEL = "INFO"
|
||||
|
||||
def setup_logger(name: str) -> logging.Logger:
|
||||
logger = logging.getLogger(name)
|
||||
@ -9,5 +14,6 @@ def setup_logger(name: str) -> logging.Logger:
|
||||
)
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.setLevel(getattr(logging, LOG_LEVEL))
|
||||
logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}")
|
||||
return logger
|
||||
|
||||
18
src/main.py
18
src/main.py
@ -6,10 +6,9 @@ from typing import Dict
|
||||
from pydantic import BaseModel
|
||||
from validator import verify_api_key
|
||||
from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
|
||||
from logger_handler import setup_logger
|
||||
from logger_handler import setup_logger, LOG_LEVEL
|
||||
from rabbitmq_handler import RabbitMQProducer
|
||||
import uvicorn
|
||||
from uvicorn_logging_config import LOGGING_CONFIG
|
||||
from contextlib import asynccontextmanager
|
||||
from metrics_server import REQUEST_COUNTER
|
||||
import asyncio
|
||||
@ -54,26 +53,34 @@ api = FastAPI(
|
||||
|
||||
@api.middleware("http")
|
||||
async def prometheus_middleware(request: Request, call_next):
|
||||
logger.debug(f"Incoming request: {request.method} {request.url.path}")
|
||||
status = 500
|
||||
try:
|
||||
response = await call_next(request)
|
||||
status = response.status_code
|
||||
logger.debug(f"Request processed with status: {status}")
|
||||
except Exception:
|
||||
logger.exception(f"Exception during request processing: {e}")
|
||||
raise
|
||||
finally:
|
||||
REQUEST_COUNTER.labels(request.method, request.url.path, status).inc()
|
||||
logger.debug(f"Prometheus counter incremented: {request.method} {request.url.path} {status}")
|
||||
return response
|
||||
|
||||
def verify_api_key_dependency_internal(db=Depends(get_db), api_key: str = Depends(api_key_header_internal)) -> str:
|
||||
logger.debug(f"Verifying internal API key: {api_key}")
|
||||
cursor = db.cursor()
|
||||
cursor.execute("SELECT program_name, api_key FROM internal_api_keys WHERE status = 'active'")
|
||||
for program_name, hashed_key in cursor.fetchall():
|
||||
if verify_api_key(api_key=api_key, hashed=hashed_key):
|
||||
logger.debug(f"API key verified for program: {program_name}")
|
||||
return program_name
|
||||
logger.warning("Internal API key verification failed")
|
||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||
|
||||
@api.exception_handler(StarletteHTTPException)
|
||||
async def custom_http_exception_handler(request,exc):
|
||||
logger.debug(f"Handling HTTP exception: {exc}")
|
||||
if exc.status_code == 404:
|
||||
return JSONResponse(
|
||||
status_code=403,
|
||||
@ -93,6 +100,7 @@ async def receive_notifications(
|
||||
):
|
||||
|
||||
logger.info(f"Received notifcation data from {program_name} for RMQ")
|
||||
logger.debug(f"Notification data: {notification_data}")
|
||||
await request.app.state.rmq_producer.publish(
|
||||
notification_data.receipent_user_id,
|
||||
notification_data.message
|
||||
@ -101,13 +109,15 @@ async def receive_notifications(
|
||||
return {"status": "queued"}
|
||||
|
||||
async def start_servers():
|
||||
config_main = uvicorn.Config("main:api", host="0.0.0.0", port=8101, log_level="info")
|
||||
config_metrics = uvicorn.Config("metrics_server:metrics_api", host="0.0.0.0", port=9000, log_level="info")
|
||||
logger.debug("Starting FastAPI and metrics servers")
|
||||
config_main = uvicorn.Config("main:api", host="0.0.0.0", port=8101, log_level=LOG_LEVEL.lower())
|
||||
config_metrics = uvicorn.Config("metrics_server:metrics_api", host="0.0.0.0", port=9000, log_level=LOG_LEVEL.lower())
|
||||
|
||||
server_main = uvicorn.Server(config_main)
|
||||
server_metrics = uvicorn.Server(config_metrics)
|
||||
|
||||
await asyncio.gather(server_main.serve(), server_metrics.serve())
|
||||
logger.debug("Servers started")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(start_servers())
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import aio_pika
|
||||
from aio_pika.exceptions import AMQPException
|
||||
from secret_handler import return_credentials
|
||||
import os
|
||||
from logger_handler import setup_logger
|
||||
@ -27,26 +28,30 @@ class RabbitMQProducer:
|
||||
self._flush_task: asyncio.Task | None = None
|
||||
self._closing = False
|
||||
self._ready = asyncio.Event()
|
||||
logger.debug(f"[RabbitMQ] Initialized producer for exchange '{self.exchange_name}'")
|
||||
|
||||
async def connect(self):
|
||||
logger.info(f"[RabbitMQ] Connecting to {self.url}...")
|
||||
self.connection = await aio_pika.connect_robust(self.url)
|
||||
self.channel = await self.connection.channel(publisher_confirms=True)
|
||||
self.exchange = await self.channel.declare_exchange(
|
||||
self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
|
||||
)
|
||||
logger.info(f"[aio-pika] Connected and exchange '{self.exchange_name}' ready.")
|
||||
logger.info(f"[RabbitMQ] Connected and exchange '{self.exchange_name}' ready.")
|
||||
self._ready.set()
|
||||
self._flush_task = asyncio.create_task(self._flush_queue_loop())
|
||||
logger.debug("[RabbitMQ] Flush queue loop started")
|
||||
|
||||
async def publish(self, routing_key: int, message: dict):
|
||||
await self._queue.put((routing_key, message))
|
||||
logger.debug(f"[RabbitMQ] Queued message for {routing_key}")
|
||||
|
||||
async def _flush_queue_loop(self):
|
||||
logger.debug("[RabbitMQ] Waiting for ready signal before flushing queue")
|
||||
await self._ready.wait()
|
||||
logger.debug(f"here")
|
||||
while True:
|
||||
routing_key, message = await self._queue.get()
|
||||
logger.debug(f"[RabbitMQ] Publishing message to routing key {routing_key}: {message}")
|
||||
try:
|
||||
await self.exchange.publish(
|
||||
aio_pika.Message(
|
||||
@ -57,25 +62,27 @@ class RabbitMQProducer:
|
||||
routing_key=f"notify.user.{routing_key}",
|
||||
mandatory=True
|
||||
)
|
||||
logger.debug(f"[aio-pika] Published message to notify.user.{routing_key}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[aio-pika] Publish failed, requeuing: {e}")
|
||||
logger.debug(f"[RabbitMQ] Successfully published to notify.user.{routing_key}")
|
||||
except (AMQPException, ConnectionError, OSError) as e:
|
||||
logger.warning(f"[RabbitMQ] Publish failed, requeuing message: {e}", exc_info=True)
|
||||
await asyncio.sleep(2)
|
||||
await self._queue.put((routing_key, message))
|
||||
finally:
|
||||
self._queue.task_done()
|
||||
|
||||
async def close(self):
|
||||
logger.info("[RabbitMQ] Closing producer...")
|
||||
self._closing = True
|
||||
if self._flush_task and not self._flush_task.done():
|
||||
self._flush_task.cancel()
|
||||
try:
|
||||
await self._flush_task
|
||||
logger.debug("[RabbitMQ] Flush task cancelled successfully")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.debug("[RabbitMQ] Flush task cancellation caught")
|
||||
if self.connection:
|
||||
await self.connection.close()
|
||||
logger.info("[aio-pika] Connection closed.")
|
||||
logger.info("[RabbitMQ] Connection closed")
|
||||
|
||||
|
||||
async def main():
|
||||
|
||||
@ -1,12 +1,14 @@
|
||||
import sys
|
||||
from logger_handler import setup_logger
|
||||
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
def return_credentials(path: str)->str:
|
||||
try:
|
||||
with open (path) as file:
|
||||
return file.read().strip()
|
||||
except FileNotFoundError:
|
||||
print(f"[FATAL] Secret file not found: {path}")
|
||||
sys.exit(1)
|
||||
logger.fatal(f"[FATAL] Secret file not found: {path}")
|
||||
raise
|
||||
except Exception as e:
|
||||
print(f"[FATAL] Failed to read secret file {path}: {e}")
|
||||
sys.exit(1)
|
||||
logger.fatal(f"[FATAL] Failed to read secret file {path}: {e}")
|
||||
raise
|
||||
|
||||
@ -1,39 +0,0 @@
|
||||
LOGGING_CONFIG = {
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
"formatters": {
|
||||
"default": {
|
||||
"format": "%(asctime)s - %(levelname)s - %(name)s - %(message)s",
|
||||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||||
}
|
||||
},
|
||||
"handlers": {
|
||||
"default": {
|
||||
"class": "logging.StreamHandler",
|
||||
"formatter": "default",
|
||||
"stream": "ext://sys.stdout"
|
||||
}
|
||||
},
|
||||
"loggers": {
|
||||
"": { # root logger
|
||||
"handlers": ["default"],
|
||||
"level": "INFO",
|
||||
"propagate": False
|
||||
},
|
||||
"uvicorn": {
|
||||
"handlers": ["default"],
|
||||
"level": "INFO",
|
||||
"propagate": False
|
||||
},
|
||||
"uvicorn.error": {
|
||||
"handlers": ["default"],
|
||||
"level": "INFO",
|
||||
"propagate": False
|
||||
},
|
||||
"uvicorn.access": {
|
||||
"handlers": ["default"],
|
||||
"level": "INFO",
|
||||
"propagate": False
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user