Rewrote the Flask server into a FastAPI one that has Prometheus metrics and connects to a database

This commit is contained in:
2025-10-09 20:21:04 +02:00
parent 0d04a0f790
commit db70de7659
13 changed files with 294 additions and 124 deletions

79
src/db.py Normal file
View File

@@ -0,0 +1,79 @@
import mysql.connector
import threading
from secret_handler import return_credentials
import os
import time
import sys
db_username = return_credentials("/etc/secrets/db_username")
db_password = return_credentials("/etc/secrets/db_password")
db_host = os.getenv("SERVICE_DRQ_DB_HOST","localhost")
db_database = os.getenv("SERVICE_DRQ_DB_HOST_DATABASE","app")
MAX_RETRIES = 5
RETRY_DELAY = 5
MYSQL_CONFIG = {
"host": db_host,
"user": db_username,
"password": db_password,
"database": db_database
}
_pool_lock = threading.Lock()
_connection_pool = None
def create_connection_pool():
global _connection_pool
for attempt in range(1, MAX_RETRIES+1):
try:
print(f"[MySQL] Attempt {attempt} to connect...")
pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=5,
pool_reset_session=True,
**MYSQL_CONFIG
)
with _pool_lock:
_connection_pool = pool
print("[MySQL] Connection pool created successfully.")
return
except mysql.connector.Error as e:
print(f"[MySQL] Attempt {attempt} failed: {e}")
if attempt < MAX_RETRIES:
time.sleep(RETRY_DELAY)
print(f"[MySQL] Failed to connect after {MAX_RETRIES} attempts — exiting.")
sys.exit(1)
def close_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool:
_connection_pool = None
print("[MySQL] Connection pool closed.")
def get_connection_pool():
global _connection_pool
with _pool_lock:
if _connection_pool is None:
create_connection_pool()
return _connection_pool
def get_db():
pool = get_connection_pool()
try:
conn = pool.get_connection()
if not conn.is_connected():
conn.reconnect(attempts=MAX_RETRIES, delay=RETRY_DELAY)
except Exception:
create_connection_pool()
pool = get_connection_pool()
conn = pool.get_connection()
try:
yield conn
finally:
conn.close()

29
src/dockerhub_api.py Normal file
View File

@@ -0,0 +1,29 @@
import requests
from secret_handler import return_credentials
dockerhub_token = return_credentials("/etc/secrets/dockerhub_token")
dockerhub_username = return_credentials("/etc/secrets/dockerhub_username")
def login_and_get_token()->str:
"""
You have to first login with credentials to get a token for subsequent api requests.
"""
login_url = "https://hub.docker.com/v2/users/login/"
response = requests.post(login_url,
json={"username": dockerhub_username, "password": dockerhub_token})
if response.status_code == 200:
token = response.json()["token"]
return token
else:
print(f"Login failed: {response.status_code} - {response.text}")
def find_package_version_with_tag(repo, tag):
token = login_and_get_token()
headers = {"Authorization": f"JWT {token}"}
tags_url = f"https://hub.docker.com/v2/repositories/{repo}/tags/{tag}?page_size=1"
tags_response = requests.get(tags_url, headers=headers)
id = tags_response.json()["id"]
return id
if __name__ == "__main__":
print(find_package_version_with_tag("pihole/pihole", "latest"))

42
src/github_api.py Normal file
View File

@@ -0,0 +1,42 @@
import requests
from secret_handler import return_credentials
github_token = return_credentials("/etc/secrets/github_token")
def find_package_version_with_tag(org:str,package:str,target_tag:str)->str:
"""
Iterates through the available pages looking for the supplied target_tag
Either returns None or a string when successful
"""
headers = {
"Authorization": f"Bearer {github_token}",
"Accept": "application/vnd.github+json"
}
page = 1
per_page = 100
while True:
url = f"https://api.github.com/orgs/{org}/packages/container/{package}/versions"
params = {"per_page": per_page, "page": page}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
print(f"Error {response.status_code}: {response.text}")
return None
versions = response.json()
if not versions:
print(f"Reached end of pages — tag '{target_tag}' not found.")
return None
for version in versions:
tags = version.get("metadata", {}).get("container", {}).get("tags", [])
if target_tag in tags:
print(f"Found tag '{target_tag}' on page {page}:\n")
return version["id"]
page += 1
if __name__ == "__main__":
find_package_version_with_tag("Suwayomi", "tachidesk", "stable")

113
src/main.py Normal file
View File

@@ -0,0 +1,113 @@
from fastapi import FastAPI, Depends, HTTPException, Response, Request
import uvicorn
from github_api import find_package_version_with_tag as find_package_version_with_tag_github
from dockerhub_api import find_package_version_with_tag as find_package_version_with_tag_dockerhub
import uvicorn
from contextlib import asynccontextmanager
from db import get_db, create_connection_pool, close_connection_pool
import logging
from send_notification import send_notification
from metrics_server import REQUEST_COUNTER
import asyncio
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Starting application...")
logger.info("Creating MySQL connection pool...")
create_connection_pool()
yield
logger.info("Closing MySQL connection pool...")
close_connection_pool()
api = FastAPI(
title="Docker Repository Query",
description="Queries Dockerhub and GHCR for new docker images",
version="1.0.0",
lifespan=lifespan
)
@api.middleware("http")
async def prometheus_middleware(request: Request, call_next):
try:
response = await call_next(request)
status = response.status_code
except Exception:
status = 500
raise
finally:
REQUEST_COUNTER.labels(request.method, request.url.path, status).inc()
return response
@api.get("/health")
def return_health():
return Response(status_code=200)
@api.get("/suwayomi")
def handle_suwayomi(
request: Request,
db = Depends(get_db)
):
try:
logger.info("Suwayomi handler invoked")
latest_online_version = find_package_version_with_tag_github("Suwayomi", "tachidesk", "stable")
cursor = db.cursor()
cursor.execute("SELECT latest_version FROM docker_repositories WHERE app='suwayomi'")
local_state = cursor.fetchone()
if local_state and latest_online_version != local_state[0]:
cursor.execute ("UPDATE docker_repositories SET latest_version=%s WHERE app='suwayomi'",
(latest_online_version,))
db.commit()
logger.info("New Suwayomi version has been found")
send_notification("New Suwayomi version has been found")
return Response(status_code=200)
return Response(status_code=204)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@api.get("/pihole")
def handle_pihole(
request: Request,
db = Depends(get_db)
):
try:
logger.info("Pi-hole handler invoked")
latest_online_version = find_package_version_with_tag_dockerhub("pihole/pihole", "latest")
cursor = db.cursor()
cursor.execute("SELECT latest_version FROM docker_repositories WHERE app='pihole'")
local_state = cursor.fetchone()
if local_state and latest_online_version != local_state[0]:
cursor.execute ("UPDATE docker_repositories SET latest_version=%s WHERE app='pihole'",
(latest_online_version,))
db.commit()
logger.info("New Pi-hole version has been found")
send_notification("New Pi-hole version has been found")
return Response(status_code=200)
return Response(status_code=204)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def start_servers():
config_main = uvicorn.Config("main:api", host="0.0.0.0", port=5000, log_level="info")
config_metrics = uvicorn.Config("metrics_server:metrics_api", host="0.0.0.0", port=9000, log_level="info")
server_main = uvicorn.Server(config_main)
server_metrics = uvicorn.Server(config_metrics)
await asyncio.gather(server_main.serve(), server_metrics.serve())
if __name__ == "__main__":
asyncio.run(start_servers())

10
src/metrics_server.py Normal file
View File

@@ -0,0 +1,10 @@
from fastapi import FastAPI, Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST, Counter
metrics_api = FastAPI(title="Metrics Server", description="Prometheus metrics endpoint")
REQUEST_COUNTER = Counter("http_requests_total", "Total HTTP Requests", ["method", "endpoint", "status"])
@metrics_api.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

12
src/secret_handler.py Normal file
View File

@@ -0,0 +1,12 @@
import sys
def return_credentials(path: str)->str:
try:
with open (path) as file:
return file.read().strip()
except FileNotFoundError:
print(f"[FATAL] Secret file not found: {path}")
sys.exit(1)
except Exception as e:
print(f"[FATAL] Failed to read secret file {path}: {e}")
sys.exit(1)

35
src/send_notification.py Normal file
View File

@@ -0,0 +1,35 @@
import requests
from requests.exceptions import RequestException, Timeout, ConnectionError, HTTPError
from secret_handler import return_credentials
import os
backend_api_url=os.getenv("BACKEND_API_URL","localhost:8101/internal/receive-notifications")
api_key= return_credentials("/etc/secrets/api_key")
def send_notification(notification:str)->str:
headers = {
"X-API-Key-Internal": backend_api_url,
"Content-Type": "application/json"
}
data = {
"receipent_user_id": 1,
"message": {
"title": notification,
"info": "A new version is available.",
"link": None
}}
try:
response = requests.post(backend_api_url, headers=headers, json=data)
response.raise_for_status()
print("Success: Notification sent")
except Timeout:
print("Error: request timed out")
except ConnectionError:
print("Error: connection failed")
except HTTPError as e:
print(f"HTTP error: {e.response.status_code} - {e.response.text}")
except RequestException as e:
print("Request failed:", str(e))