All checks were successful
Build & Publish to GHCR / build (push) Successful in 1m11s
78 lines
2.6 KiB
Python
78 lines
2.6 KiB
Python
from cryptography.fernet import Fernet
|
|
from simple_logger_handler import setup_logger
|
|
|
|
logger = setup_logger(__name__)
|
|
|
|
try:
|
|
with open("/etc/secrets/encryption_key","rb") as file:
|
|
encryption_key = file.read()
|
|
except FileNotFoundError:
|
|
logger.fatal("[Secret Handler] Encryption key not found")
|
|
raise
|
|
except Exception as e:
|
|
logger.fatal(f"[Secret Handler] Failed to read encryption key: {e}")
|
|
raise
|
|
|
|
fernet = Fernet(encryption_key)
|
|
|
|
def encrypt_token(token:str)->str:
|
|
return fernet.encrypt(token.encode()).decode()
|
|
|
|
def decrypt_token(token:str)->str:
|
|
return fernet.decrypt(token.encode()).decode()
|
|
|
|
def return_credentials(path: str)->str:
|
|
try:
|
|
with open (path) as file:
|
|
return file.read().strip()
|
|
except FileNotFoundError:
|
|
logger.fatal(f"[Secret Handler] Secret file not found: {path}")
|
|
raise
|
|
except Exception as e:
|
|
logger.fatal(f"[Secret Handler] Failed to read secret file {path}: {e}")
|
|
raise
|
|
|
|
async def database_lookup_by_user_id(routing_key: str, db_manager):
|
|
try:
|
|
user_id = int(routing_key.split('.')[-1])
|
|
logger.debug(f"[DB] Looking up tokens for user_id={user_id}")
|
|
except ValueError:
|
|
logger.error(f"[DB] Invalid user id supplied:{routing_key}")
|
|
return []
|
|
|
|
async with db_manager.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE user_id=%s",
|
|
(user_id,))
|
|
logger.debug(f"[DB] Executed query for user_id={user_id}")
|
|
if cur.description:
|
|
rows = await cur.fetchall()
|
|
logger.debug(f"[DB] Retrieved {len(rows)} tokens for user_id={user_id}")
|
|
return rows
|
|
logger.debug(f"[DB] No tokens found for user_id={user_id}")
|
|
return []
|
|
|
|
async def database_lookup_by_uuid(uuid: str, db_manager):
|
|
logger.debug(f"[DB] Looking up token for uuid={uuid}")
|
|
async with db_manager.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute("SELECT token_id AS uuid,token FROM device_tokens WHERE token_id=%s",
|
|
(uuid,))
|
|
logger.debug(f"[DB] Executed query for uuid={uuid}")
|
|
if cur.description:
|
|
rows = await cur.fetchall()
|
|
logger.debug(f"[DB] Retrieved {len(rows)} tokens for uuid={uuid}")
|
|
return rows
|
|
logger.debug(f"[DB] No token found for uuid={uuid}")
|
|
return []
|
|
|
|
async def remove_inactive_push_token(uuid :str, db_manager):
|
|
logger.debug(f"[DB] Expiring token for uuid={uuid}")
|
|
async with db_manager.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute("UPDATE device_tokens SET status='expired' WHERE token_id=%s",
|
|
(uuid,))
|
|
success = cur.rowcount > 0
|
|
logger.debug(f"[DB] Token expiration for uuid={uuid} success={success}")
|
|
return success
|