Unified logging behaviour
- Logger doesn't start with log level DEBUG by default, instead reads a environment variable - Secret handler raises exceptions instead of using the module os to exit - Added extensive debug logging
This commit is contained in:
parent
4590386b3e
commit
b4ad998bd0
15
src/db.py
15
src/db.py
@ -1,4 +1,3 @@
|
||||
import mysql.connector
|
||||
from mysql.connector import pooling, Error
|
||||
import threading
|
||||
from secret_handler import return_credentials
|
||||
@ -35,6 +34,7 @@ def create_connection_pool():
|
||||
global _connection_pool
|
||||
with _pool_lock:
|
||||
if _connection_pool is not None:
|
||||
logger.debug("[MySQL] Pool already exists, returning existing pool")
|
||||
return
|
||||
for attempt in range (1,MAX_RETRIES+1):
|
||||
try:
|
||||
@ -53,6 +53,7 @@ def create_connection_pool():
|
||||
except Error as e:
|
||||
logger.warning(f"[MySQL] Attempt {attempt} failed: {e}")
|
||||
if attempt < MAX_RETRIES:
|
||||
logger.debug(f"[MySQL] Retrying in {RETRY_DELAY} seconds...")
|
||||
time.sleep(RETRY_DELAY)
|
||||
|
||||
logger.critical(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts.")
|
||||
@ -62,34 +63,43 @@ def close_connection_pool():
|
||||
global _connection_pool
|
||||
with _pool_lock:
|
||||
if _connection_pool:
|
||||
logger.debug(f"[MySQL] Closing pool: {_connection_pool}")
|
||||
_connection_pool = None
|
||||
logger.info("[MySQL] Connection pool closed")
|
||||
_stop_healthcheck.set()
|
||||
logger.debug("[MySQL] Healthcheck stop flag set")
|
||||
|
||||
def get_connection_pool():
|
||||
global _connection_pool
|
||||
with _pool_lock:
|
||||
if _connection_pool is None:
|
||||
logger.debug("[MySQL] No pool found, creating one")
|
||||
create_connection_pool()
|
||||
else:
|
||||
logger.debug(f"[MySQL] Returning existing pool: {_connection_pool}")
|
||||
return _connection_pool
|
||||
|
||||
def get_db():
|
||||
pool = get_connection_pool()
|
||||
logger.debug(f"[MySQL] Acquiring connection from pool: {pool}")
|
||||
conn = pool.get_connection()
|
||||
|
||||
try:
|
||||
conn.ping(reconnect=True, attempts=3, delay=1)
|
||||
logger.debug("[MySQL] Connection alive")
|
||||
except Error as e:
|
||||
logger.warning(f"[MySQL] Connection dead, recreating pool: {e}")
|
||||
create_connection_pool()
|
||||
pool = get_connection_pool()
|
||||
conn = pool.get_connection()
|
||||
logger.debug("[MySQL] Reconnected successfully")
|
||||
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
if conn and conn.is_connected():
|
||||
conn.close()
|
||||
logger.debug("[MySQL] Connection returned to pool")
|
||||
|
||||
def _pool_healthcheck():
|
||||
while not _stop_healthcheck.is_set():
|
||||
@ -97,11 +107,13 @@ def _pool_healthcheck():
|
||||
with _pool_lock:
|
||||
pool = _connection_pool
|
||||
if not pool:
|
||||
logger.debug("[MySQL] Healthcheck skipped, pool not initialized")
|
||||
continue
|
||||
try:
|
||||
conn = pool.get_connection()
|
||||
conn.ping(reconnect=True, attempts=3, delay=1)
|
||||
conn.close()
|
||||
logger.debug(f"[MySQL] Pool healthcheck succeeded")
|
||||
except Error as e:
|
||||
logger.warning(f"[MySQL] Pool healthcheck failed: {e}")
|
||||
create_connection_pool()
|
||||
@ -110,6 +122,7 @@ def _pool_healthcheck():
|
||||
def start_healthcheck_thread():
|
||||
global _health_thread
|
||||
if _health_thread and _health_thread.is_alive():
|
||||
logger.debug("[MySQL] Healthcheck thread already running")
|
||||
return
|
||||
_stop_healthcheck.clear()
|
||||
_health_thread = threading.Thread(target=_pool_healthcheck, daemon=True)
|
||||
|
||||
@ -1,4 +1,9 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
|
||||
LOG_LEVEL = "INFO"
|
||||
|
||||
def setup_logger(name: str) -> logging.Logger:
|
||||
logger = logging.getLogger(name)
|
||||
@ -9,5 +14,5 @@ def setup_logger(name: str) -> logging.Logger:
|
||||
)
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.setLevel(getattr(logging, LOG_LEVEL))
|
||||
return logger
|
||||
|
||||
35
src/main.py
35
src/main.py
@ -7,11 +7,10 @@ from pydantic import BaseModel
|
||||
from validator import is_valid_platform,is_valid_token,verify_api_key
|
||||
from secret_handler import encrypt_token
|
||||
from db import get_db, create_connection_pool, close_connection_pool, start_healthcheck_thread
|
||||
from logger_handler import setup_logger
|
||||
from logger_handler import setup_logger, LOG_LEVEL
|
||||
import uuid
|
||||
from hashlib import sha256
|
||||
import uvicorn
|
||||
from uvicorn_logging_config import LOGGING_CONFIG
|
||||
from contextlib import asynccontextmanager
|
||||
from metrics_server import REQUEST_COUNTER
|
||||
import asyncio
|
||||
@ -23,7 +22,10 @@ api_key_header = APIKeyHeader(name="X-API-Key")
|
||||
|
||||
|
||||
def hash_token(token: str) -> str:
|
||||
return sha256(token.encode()).hexdigest()
|
||||
logger.debug(f"Hashing token: {token}")
|
||||
result = sha256(token.encode()).hexdigest()
|
||||
logger.debug(f"Hashed token result: {result}")
|
||||
return result
|
||||
|
||||
class TokenRequest(BaseModel):
|
||||
token : str
|
||||
@ -58,22 +60,28 @@ api = FastAPI(
|
||||
@api.middleware("http")
|
||||
async def prometheus_middleware(request: Request, call_next):
|
||||
status = 500
|
||||
logger.debug(f"Incoming request: {request.method} {request.url.path}")
|
||||
try:
|
||||
response = await call_next(request)
|
||||
status = response.status_code
|
||||
logger.debug(f"Request processed with status {status}")
|
||||
except Exception:
|
||||
logger.exception(f"Exception during request processing: {e}")
|
||||
raise
|
||||
finally:
|
||||
REQUEST_COUNTER.labels(request.method, request.url.path, status).inc()
|
||||
logger.debug(f"Prometheus counter incremented for {request.method} {request.url.path} {status}")
|
||||
return response
|
||||
|
||||
@api.get("/health", tags=["Health"])
|
||||
def return_health(db=Depends(get_db)):
|
||||
logger.debug("Health check initiated")
|
||||
try:
|
||||
cursor = db.cursor()
|
||||
cursor.execute("SELECT 1")
|
||||
cursor.fetchone()
|
||||
db_status = "ok"
|
||||
logger.debug("Database health check passed")
|
||||
except Exception as e:
|
||||
logger.error(f"Health check DB failed: {e}")
|
||||
db_status = "error"
|
||||
@ -85,15 +93,19 @@ def return_health(db=Depends(get_db)):
|
||||
|
||||
|
||||
def verify_api_key_dependency(db=Depends(get_db), api_key: str = Depends(api_key_header)) -> int:
|
||||
logger.debug(f"Verifying API key: {api_key}")
|
||||
cursor = db.cursor()
|
||||
cursor.execute("SELECT user_id, api_key FROM users WHERE status = 'active'")
|
||||
for user_id, hashed_key in cursor.fetchall():
|
||||
if verify_api_key(api_key=api_key, hashed=hashed_key):
|
||||
logger.debug(f"API key verified for user_id={user_id}")
|
||||
return user_id
|
||||
logger.warning("API key verification failed")
|
||||
raise HTTPException(status_code=403, detail="Unauthorized here")
|
||||
|
||||
@api.exception_handler(StarletteHTTPException)
|
||||
async def custom_http_exception_handler(request,exc):
|
||||
logger.debug(f"Handling HTTP exception: {exc}")
|
||||
if exc.status_code == 404:
|
||||
return JSONResponse(
|
||||
status_code=401,
|
||||
@ -111,11 +123,14 @@ def register_token(
|
||||
user_id: int = Depends(verify_api_key_dependency)
|
||||
):
|
||||
logger.info(f"Registering token for user_id={user_id}, platform={request_data.platform}")
|
||||
logger.debug(f"Request data: {request_data}")
|
||||
if not is_valid_platform(request_data.platform) or not is_valid_token(request_data.token):
|
||||
logger.warning(f"Invalid platform or token for user_id={user_id}")
|
||||
raise HTTPException(status_code=403,detail="Unathorized")
|
||||
|
||||
secure_token = encrypt_token(request_data.token)
|
||||
hashed_token = hash_token(request_data.token)
|
||||
logger.debug(f"Secure token: {secure_token}, Hashed token: {hashed_token}")
|
||||
|
||||
try:
|
||||
cursor = db.cursor()
|
||||
@ -123,8 +138,10 @@ def register_token(
|
||||
"SELECT * FROM device_tokens WHERE user_id=%s AND hashed_token=%s",
|
||||
(user_id,hashed_token))
|
||||
existing = cursor.fetchone()
|
||||
logger.debug(f"Existing token fetched: {existing}")
|
||||
|
||||
if existing:
|
||||
logger.debug(f"Updating existing token for user_id={user_id}")
|
||||
cursor.execute("""
|
||||
UPDATE device_tokens
|
||||
SET platform=%s, app_ver=%s, locale=%s, topics=%s, last_seen_at=NOW()
|
||||
@ -138,7 +155,7 @@ def register_token(
|
||||
))
|
||||
else:
|
||||
token_id = str(uuid.uuid4())
|
||||
logger.info(f"Creating new entry user_id={user_id}, token_id={token_id}")
|
||||
logger.debug(f"Creating new entry user_id={user_id}, token_id={token_id}")
|
||||
cursor.execute("""
|
||||
INSERT INTO device_tokens
|
||||
(token_id, user_id, platform, token, hashed_token, status, app_ver, locale, topics, created_at)
|
||||
@ -157,6 +174,7 @@ def register_token(
|
||||
db.commit()
|
||||
logger.info(f"Success: Registering token for user_id={user_id}, platform={request_data.platform}")
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to register token for user_id={user_id}: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
return {"status":"registered"}
|
||||
@ -170,6 +188,7 @@ def unregister_token(
|
||||
):
|
||||
logger.info(f"Unregistering token for user_id={user_id}, platform={request_data.platform}")
|
||||
hashed_token = hash_token(request_data.token)
|
||||
logger.debug(f"Hashed token for removal: {hashed_token}")
|
||||
try:
|
||||
cursor = db.cursor()
|
||||
cursor.execute("""
|
||||
@ -177,20 +196,22 @@ def unregister_token(
|
||||
SET status=%s, last_seen_at=NOW()
|
||||
WHERE hashed_token=%s
|
||||
""", ('expired', hashed_token))
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to unregister token for user_id={user_id}: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
logger.info(f"Success: Unregistering token for user_id={user_id}, platform={request_data.platform}")
|
||||
return {"status":"unregistered"}
|
||||
|
||||
async def start_servers():
|
||||
config_main = uvicorn.Config("main:api", host="0.0.0.0", port=8100, log_config=LOGGING_CONFIG, log_level="info")
|
||||
config_metrics = uvicorn.Config("metrics_server:metrics_api", host="0.0.0.0", port=9000, log_level="info")
|
||||
logger.debug("Starting servers...")
|
||||
config_main = uvicorn.Config("main:api", host="0.0.0.0", port=8100, log_level=LOG_LEVEL)
|
||||
config_metrics = uvicorn.Config("metrics_server:metrics_api", host="0.0.0.0", port=9000, log_level=LOG_LEVEL)
|
||||
|
||||
server_main = uvicorn.Server(config_main)
|
||||
server_metrics = uvicorn.Server(config_metrics)
|
||||
|
||||
await asyncio.gather(server_main.serve(), server_metrics.serve())
|
||||
logger.debug("Servers started")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(start_servers())
|
||||
|
||||
@ -1,15 +1,17 @@
|
||||
from cryptography.fernet import Fernet
|
||||
import sys
|
||||
from logger_handler import setup_logger
|
||||
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
try:
|
||||
with open("/etc/secrets/encryption_key","rb") as file:
|
||||
encryption_key = file.read()
|
||||
except FileNotFoundError:
|
||||
print("[FATAL] Encryption key not found")
|
||||
sys.exit(1)
|
||||
logger.fatal("[FATAL] Encryption key not found")
|
||||
raise
|
||||
except Exception as e:
|
||||
print(f"[FATAL]Failed to read encryption key: {e}")
|
||||
sys.exit(1)
|
||||
logger.fatal(f"[FATAL]Failed to read encryption key: {e}")
|
||||
raise
|
||||
|
||||
fernet = Fernet(encryption_key)
|
||||
|
||||
@ -24,10 +26,10 @@ def return_credentials(path: str)->str:
|
||||
with open (path) as file:
|
||||
return file.read().strip()
|
||||
except FileNotFoundError:
|
||||
print(f"[FATAL] Secret file not found: {path}")
|
||||
sys.exit(1)
|
||||
logger.fatal(f"[FATAL] Secret file not found: {path}")
|
||||
raise
|
||||
except Exception as e:
|
||||
print(f"[FATAL] Failed to read secret file {path}: {e}")
|
||||
sys.exit(1)
|
||||
logger.fatal(f"[FATAL] Failed to read secret file {path}: {e}")
|
||||
raise
|
||||
|
||||
|
||||
|
||||
@ -1,39 +0,0 @@
|
||||
LOGGING_CONFIG = {
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
"formatters": {
|
||||
"default": {
|
||||
"format": "%(asctime)s - %(levelname)s - %(name)s - %(message)s",
|
||||
"datefmt": "%Y-%m-%d %H:%M:%S",
|
||||
}
|
||||
},
|
||||
"handlers": {
|
||||
"default": {
|
||||
"class": "logging.StreamHandler",
|
||||
"formatter": "default",
|
||||
"stream": "ext://sys.stdout"
|
||||
}
|
||||
},
|
||||
"loggers": {
|
||||
"": { # root logger
|
||||
"handlers": ["default"],
|
||||
"level": "INFO",
|
||||
"propagate": False
|
||||
},
|
||||
"uvicorn": {
|
||||
"handlers": ["default"],
|
||||
"level": "INFO",
|
||||
"propagate": False
|
||||
},
|
||||
"uvicorn.error": {
|
||||
"handlers": ["default"],
|
||||
"level": "INFO",
|
||||
"propagate": False
|
||||
},
|
||||
"uvicorn.access": {
|
||||
"handlers": ["default"],
|
||||
"level": "INFO",
|
||||
"propagate": False
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,9 +1,14 @@
|
||||
from argon2 import PasswordHasher
|
||||
import re
|
||||
from logger_handler import setup_logger
|
||||
|
||||
logger = setup_logger(__name__)
|
||||
|
||||
def is_valid_platform(platform) -> bool:
|
||||
if platform not in ["android"]:
|
||||
logger.debug(f"Invalid platform: {platform}")
|
||||
return False
|
||||
logger.debug(f"Valid platform: {platform}")
|
||||
return True
|
||||
|
||||
def is_valid_token(token: str) -> bool:
|
||||
@ -13,8 +18,9 @@ def is_valid_token(token: str) -> bool:
|
||||
|
||||
pattern = r"^ExponentPushToken\[([A-Za-z0-9]{22})\]$"
|
||||
if not re.match(pattern, token):
|
||||
logger.debug(f"Invalid token format: {token}")
|
||||
return False
|
||||
|
||||
logger.debug(f"Valid token format: {token}")
|
||||
return True
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user