Unified logging behaviour #6
14
src/db.py
14
src/db.py
@ -35,6 +35,7 @@ def create_connection_pool():
|
|||||||
global _connection_pool
|
global _connection_pool
|
||||||
with _pool_lock:
|
with _pool_lock:
|
||||||
if _connection_pool is not None:
|
if _connection_pool is not None:
|
||||||
|
logger.debug("[MySQL] Pool already exists, returning existing pool")
|
||||||
return
|
return
|
||||||
for attempt in range (1,MAX_RETRIES+1):
|
for attempt in range (1,MAX_RETRIES+1):
|
||||||
try:
|
try:
|
||||||
@ -53,6 +54,7 @@ def create_connection_pool():
|
|||||||
except Error as e:
|
except Error as e:
|
||||||
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
|
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
|
||||||
if attempt < MAX_RETRIES:
|
if attempt < MAX_RETRIES:
|
||||||
|
logger.debug(f"[MySQL] Retrying in {RETRY_DELAY} seconds...")
|
||||||
time.sleep(RETRY_DELAY)
|
time.sleep(RETRY_DELAY)
|
||||||
|
|
||||||
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
|
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
|
||||||
@ -62,34 +64,43 @@ def close_connection_pool():
|
|||||||
global _connection_pool
|
global _connection_pool
|
||||||
with _pool_lock:
|
with _pool_lock:
|
||||||
if _connection_pool:
|
if _connection_pool:
|
||||||
|
logger.debug(f"[MySQL] Closing pool: {_connection_pool}")
|
||||||
_connection_pool = None
|
_connection_pool = None
|
||||||
logger.info("[MySQL] Connection pool closed")
|
logger.info("[MySQL] Connection pool closed")
|
||||||
_stop_healthcheck.set()
|
_stop_healthcheck.set()
|
||||||
|
logger.debug("[MySQL] Healthcheck stop flag set")
|
||||||
|
|
||||||
def get_connection_pool():
|
def get_connection_pool():
|
||||||
global _connection_pool
|
global _connection_pool
|
||||||
with _pool_lock:
|
with _pool_lock:
|
||||||
if _connection_pool is None:
|
if _connection_pool is None:
|
||||||
|
logger.debug("[MySQL] No pool found, creating one")
|
||||||
create_connection_pool()
|
create_connection_pool()
|
||||||
|
else:
|
||||||
|
logger.debug(f"[MySQL] Returning existing pool: {_connection_pool}")
|
||||||
return _connection_pool
|
return _connection_pool
|
||||||
|
|
||||||
def get_db():
|
def get_db():
|
||||||
pool = get_connection_pool()
|
pool = get_connection_pool()
|
||||||
|
logger.debug(f"[MySQL] Acquiring connection from pool: {pool}")
|
||||||
conn = pool.get_connection()
|
conn = pool.get_connection()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn.ping(reconnect=True, attempts=3, delay=1)
|
conn.ping(reconnect=True, attempts=3, delay=1)
|
||||||
|
logger.debug("[MySQL] Connection alive")
|
||||||
except Error as e:
|
except Error as e:
|
||||||
logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
|
logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
|
||||||
create_connection_pool()
|
create_connection_pool()
|
||||||
pool = get_connection_pool()
|
pool = get_connection_pool()
|
||||||
conn = pool.get_connection()
|
conn = pool.get_connection()
|
||||||
|
logger.debug("[MySQL] Reconnected successfully")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield conn
|
yield conn
|
||||||
finally:
|
finally:
|
||||||
if conn and conn.is_connected():
|
if conn and conn.is_connected():
|
||||||
conn.close()
|
conn.close()
|
||||||
|
logger.debug("[MySQL] Connection returned to pool")
|
||||||
|
|
||||||
def _pool_healthcheck():
|
def _pool_healthcheck():
|
||||||
while not _stop_healthcheck.is_set():
|
while not _stop_healthcheck.is_set():
|
||||||
@ -97,11 +108,13 @@ def _pool_healthcheck():
|
|||||||
with _pool_lock:
|
with _pool_lock:
|
||||||
pool = _connection_pool
|
pool = _connection_pool
|
||||||
if not pool:
|
if not pool:
|
||||||
|
logger.debug("[MySQL] Healthcheck skipped, pool not initialized")
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
conn = pool.get_connection()
|
conn = pool.get_connection()
|
||||||
conn.ping(reconnect=True, attempts=3, delay=1)
|
conn.ping(reconnect=True, attempts=3, delay=1)
|
||||||
conn.close()
|
conn.close()
|
||||||
|
logger.debug(f"[MySQL] Pool healthcheck succeeded")
|
||||||
except Error as e:
|
except Error as e:
|
||||||
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
|
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
|
||||||
create_connection_pool()
|
create_connection_pool()
|
||||||
@ -110,6 +123,7 @@ def _pool_healthcheck():
|
|||||||
def start_healthcheck_thread():
|
def start_healthcheck_thread():
|
||||||
global _health_thread
|
global _health_thread
|
||||||
if _health_thread and _health_thread.is_alive():
|
if _health_thread and _health_thread.is_alive():
|
||||||
|
logger.debug("[MySQL] Healthcheck thread already running")
|
||||||
return
|
return
|
||||||
_stop_healthcheck.clear()
|
_stop_healthcheck.clear()
|
||||||
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
|
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
|
||||||
|
|||||||
@ -1,4 +1,9 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||||
|
if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
|
||||||
|
LOG_LEVEL = "INFO"
|
||||||
|
|
||||||
def setup_logger(name: str) -> logging.Logger:
|
def setup_logger(name: str) -> logging.Logger:
|
||||||
logger = logging.getLogger(name)
|
logger = logging.getLogger(name)
|
||||||
@ -9,5 +14,6 @@ def setup_logger(name: str) -> logging.Logger:
|
|||||||
)
|
)
|
||||||
handler.setFormatter(formatter)
|
handler.setFormatter(formatter)
|
||||||
logger.addHandler(handler)
|
logger.addHandler(handler)
|
||||||
logger.setLevel(logging.DEBUG)
|
logger.setLevel(getattr(logging, LOG_LEVEL))
|
||||||
|
logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}")
|
||||||
return logger
|
return logger
|
||||||
|
|||||||
18
src/main.py
18
src/main.py
@ -6,10 +6,9 @@ from typing import Dict
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from validator import verify_api_key
|
from validator import verify_api_key
|
||||||
from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
|
from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
|
||||||
from logger_handler import setup_logger
|
from logger_handler import setup_logger, LOG_LEVEL
|
||||||
from rabbitmq_handler import RabbitMQProducer
|
from rabbitmq_handler import RabbitMQProducer
|
||||||
import uvicorn
|
import uvicorn
|
||||||
from uvicorn_logging_config import LOGGING_CONFIG
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from metrics_server import REQUEST_COUNTER
|
from metrics_server import REQUEST_COUNTER
|
||||||
import asyncio
|
import asyncio
|
||||||
@ -54,26 +53,34 @@ api = FastAPI(
|
|||||||
|
|
||||||
@api.middleware("http")
|
@api.middleware("http")
|
||||||
async def prometheus_middleware(request: Request, call_next):
|
async def prometheus_middleware(request: Request, call_next):
|
||||||
|
logger.debug(f"Incoming request: {request.method} {request.url.path}")
|
||||||
status = 500
|
status = 500
|
||||||
try:
|
try:
|
||||||
response = await call_next(request)
|
response = await call_next(request)
|
||||||
status = response.status_code
|
status = response.status_code
|
||||||
|
logger.debug(f"Request processed with status: {status}")
|
||||||
except Exception:
|
except Exception:
|
||||||
|
logger.exception(f"Exception during request processing: {e}")
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
REQUEST_COUNTER.labels(request.method, request.url.path, status).inc()
|
REQUEST_COUNTER.labels(request.method, request.url.path, status).inc()
|
||||||
|
logger.debug(f"Prometheus counter incremented: {request.method} {request.url.path} {status}")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def verify_api_key_dependency_internal(db=Depends(get_db), api_key: str = Depends(api_key_header_internal)) -> str:
|
def verify_api_key_dependency_internal(db=Depends(get_db), api_key: str = Depends(api_key_header_internal)) -> str:
|
||||||
|
logger.debug(f"Verifying internal API key: {api_key}")
|
||||||
cursor = db.cursor()
|
cursor = db.cursor()
|
||||||
cursor.execute("SELECT program_name, api_key FROM internal_api_keys WHERE status = 'active'")
|
cursor.execute("SELECT program_name, api_key FROM internal_api_keys WHERE status = 'active'")
|
||||||
for program_name, hashed_key in cursor.fetchall():
|
for program_name, hashed_key in cursor.fetchall():
|
||||||
if verify_api_key(api_key=api_key, hashed=hashed_key):
|
if verify_api_key(api_key=api_key, hashed=hashed_key):
|
||||||
|
logger.debug(f"API key verified for program: {program_name}")
|
||||||
return program_name
|
return program_name
|
||||||
|
logger.warning("Internal API key verification failed")
|
||||||
raise HTTPException(status_code=403, detail="Unauthorized")
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
||||||
|
|
||||||
@api.exception_handler(StarletteHTTPException)
|
@api.exception_handler(StarletteHTTPException)
|
||||||
async def custom_http_exception_handler(request,exc):
|
async def custom_http_exception_handler(request,exc):
|
||||||
|
logger.debug(f"Handling HTTP exception: {exc}")
|
||||||
if exc.status_code == 404:
|
if exc.status_code == 404:
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
status_code=403,
|
status_code=403,
|
||||||
@ -93,6 +100,7 @@ async def receive_notifications(
|
|||||||
):
|
):
|
||||||
|
|
||||||
logger.info(f"Received notifcation data from {program_name} for RMQ")
|
logger.info(f"Received notifcation data from {program_name} for RMQ")
|
||||||
|
logger.debug(f"Notification data: {notification_data}")
|
||||||
await request.app.state.rmq_producer.publish(
|
await request.app.state.rmq_producer.publish(
|
||||||
notification_data.receipent_user_id,
|
notification_data.receipent_user_id,
|
||||||
notification_data.message
|
notification_data.message
|
||||||
@ -101,13 +109,15 @@ async def receive_notifications(
|
|||||||
return {"status": "queued"}
|
return {"status": "queued"}
|
||||||
|
|
||||||
async def start_servers():
|
async def start_servers():
|
||||||
config_main = uvicorn.Config("main:api", host="0.0.0.0", port=8101, log_level="info")
|
logger.debug("Starting FastAPI and metrics servers")
|
||||||
config_metrics = uvicorn.Config("metrics_server:metrics_api", host="0.0.0.0", port=9000, log_level="info")
|
config_main = uvicorn.Config("main:api", host="0.0.0.0", port=8101, log_level=LOG_LEVEL.lower())
|
||||||
|
config_metrics = uvicorn.Config("metrics_server:metrics_api", host="0.0.0.0", port=9000, log_level=LOG_LEVEL.lower())
|
||||||
|
|
||||||
server_main = uvicorn.Server(config_main)
|
server_main = uvicorn.Server(config_main)
|
||||||
server_metrics = uvicorn.Server(config_metrics)
|
server_metrics = uvicorn.Server(config_metrics)
|
||||||
|
|
||||||
await asyncio.gather(server_main.serve(), server_metrics.serve())
|
await asyncio.gather(server_main.serve(), server_metrics.serve())
|
||||||
|
logger.debug("Servers started")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
asyncio.run(start_servers())
|
asyncio.run(start_servers())
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import aio_pika
|
import aio_pika
|
||||||
|
from aio_pika.exceptions import AMQPException
|
||||||
from secret_handler import return_credentials
|
from secret_handler import return_credentials
|
||||||
import os
|
import os
|
||||||
from logger_handler import setup_logger
|
from logger_handler import setup_logger
|
||||||
@ -27,26 +28,30 @@ class RabbitMQProducer:
|
|||||||
self._flush_task: asyncio.Task | None = None
|
self._flush_task: asyncio.Task | None = None
|
||||||
self._closing = False
|
self._closing = False
|
||||||
self._ready = asyncio.Event()
|
self._ready = asyncio.Event()
|
||||||
|
logger.debug(f"[RabbitMQ] Initialized producer for exchange '{self.exchange_name}'")
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
|
logger.info(f"[RabbitMQ] Connecting to {self.url}...")
|
||||||
self.connection = await aio_pika.connect_robust(self.url)
|
self.connection = await aio_pika.connect_robust(self.url)
|
||||||
self.channel = await self.connection.channel(publisher_confirms=True)
|
self.channel = await self.connection.channel(publisher_confirms=True)
|
||||||
self.exchange = await self.channel.declare_exchange(
|
self.exchange = await self.channel.declare_exchange(
|
||||||
self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
|
self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True
|
||||||
)
|
)
|
||||||
logger.info(f"[aio-pika] Connected and exchange '{self.exchange_name}' ready.")
|
logger.info(f"[RabbitMQ] Connected and exchange '{self.exchange_name}' ready.")
|
||||||
self._ready.set()
|
self._ready.set()
|
||||||
self._flush_task = asyncio.create_task(self._flush_queue_loop())
|
self._flush_task = asyncio.create_task(self._flush_queue_loop())
|
||||||
|
logger.debug("[RabbitMQ] Flush queue loop started")
|
||||||
|
|
||||||
async def publish(self, routing_key: int, message: dict):
|
async def publish(self, routing_key: int, message: dict):
|
||||||
await self._queue.put((routing_key, message))
|
await self._queue.put((routing_key, message))
|
||||||
logger.debug(f"[RabbitMQ] Queued message for {routing_key}")
|
logger.debug(f"[RabbitMQ] Queued message for {routing_key}")
|
||||||
|
|
||||||
async def _flush_queue_loop(self):
|
async def _flush_queue_loop(self):
|
||||||
|
logger.debug("[RabbitMQ] Waiting for ready signal before flushing queue")
|
||||||
await self._ready.wait()
|
await self._ready.wait()
|
||||||
logger.debug(f"here")
|
|
||||||
while True:
|
while True:
|
||||||
routing_key, message = await self._queue.get()
|
routing_key, message = await self._queue.get()
|
||||||
|
logger.debug(f"[RabbitMQ] Publishing message to routing key {routing_key}: {message}")
|
||||||
try:
|
try:
|
||||||
await self.exchange.publish(
|
await self.exchange.publish(
|
||||||
aio_pika.Message(
|
aio_pika.Message(
|
||||||
@ -57,25 +62,27 @@ class RabbitMQProducer:
|
|||||||
routing_key=f"notify.user.{routing_key}",
|
routing_key=f"notify.user.{routing_key}",
|
||||||
mandatory=True
|
mandatory=True
|
||||||
)
|
)
|
||||||
logger.debug(f"[aio-pika] Published message to notify.user.{routing_key}")
|
logger.debug(f"[RabbitMQ] Successfully published to notify.user.{routing_key}")
|
||||||
except Exception as e:
|
except (AMQPException, ConnectionError, OSError) as e:
|
||||||
logger.warning(f"[aio-pika] Publish failed, requeuing: {e}")
|
logger.warning(f"[RabbitMQ] Publish failed, requeuing message: {e}", exc_info=True)
|
||||||
await asyncio.sleep(2)
|
await asyncio.sleep(2)
|
||||||
await self._queue.put((routing_key, message))
|
await self._queue.put((routing_key, message))
|
||||||
finally:
|
finally:
|
||||||
self._queue.task_done()
|
self._queue.task_done()
|
||||||
|
|
||||||
async def close(self):
|
async def close(self):
|
||||||
|
logger.info("[RabbitMQ] Closing producer...")
|
||||||
self._closing = True
|
self._closing = True
|
||||||
if self._flush_task and not self._flush_task.done():
|
if self._flush_task and not self._flush_task.done():
|
||||||
self._flush_task.cancel()
|
self._flush_task.cancel()
|
||||||
try:
|
try:
|
||||||
await self._flush_task
|
await self._flush_task
|
||||||
|
logger.debug("[RabbitMQ] Flush task cancelled successfully")
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
logger.debug("[RabbitMQ] Flush task cancellation caught")
|
||||||
if self.connection:
|
if self.connection:
|
||||||
await self.connection.close()
|
await self.connection.close()
|
||||||
logger.info("[aio-pika] Connection closed.")
|
logger.info("[RabbitMQ] Connection closed")
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|||||||
@ -1,12 +1,14 @@
|
|||||||
import sys
|
from logger_handler import setup_logger
|
||||||
|
|
||||||
|
logger = setup_logger(__name__)
|
||||||
|
|
||||||
def return_credentials(path: str)->str:
|
def return_credentials(path: str)->str:
|
||||||
try:
|
try:
|
||||||
with open (path) as file:
|
with open (path) as file:
|
||||||
return file.read().strip()
|
return file.read().strip()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
print(f"[FATAL] Secret file not found: {path}")
|
logger.fatal(f"[FATAL] Secret file not found: {path}")
|
||||||
sys.exit(1)
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[FATAL] Failed to read secret file {path}: {e}")
|
logger.fatal(f"[FATAL] Failed to read secret file {path}: {e}")
|
||||||
sys.exit(1)
|
raise
|
||||||
|
|||||||
@ -1,39 +0,0 @@
|
|||||||
LOGGING_CONFIG = {
|
|
||||||
"version": 1,
|
|
||||||
"disable_existing_loggers": False,
|
|
||||||
"formatters": {
|
|
||||||
"default": {
|
|
||||||
"format": "%(asctime)s - %(levelname)s - %(name)s - %(message)s",
|
|
||||||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"handlers": {
|
|
||||||
"default": {
|
|
||||||
"class": "logging.StreamHandler",
|
|
||||||
"formatter": "default",
|
|
||||||
"stream": "ext://sys.stdout"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"loggers": {
|
|
||||||
"": { # root logger
|
|
||||||
"handlers": ["default"],
|
|
||||||
"level": "INFO",
|
|
||||||
"propagate": False
|
|
||||||
},
|
|
||||||
"uvicorn": {
|
|
||||||
"handlers": ["default"],
|
|
||||||
"level": "INFO",
|
|
||||||
"propagate": False
|
|
||||||
},
|
|
||||||
"uvicorn.error": {
|
|
||||||
"handlers": ["default"],
|
|
||||||
"level": "INFO",
|
|
||||||
"propagate": False
|
|
||||||
},
|
|
||||||
"uvicorn.access": {
|
|
||||||
"handlers": ["default"],
|
|
||||||
"level": "INFO",
|
|
||||||
"propagate": False
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user