service-podcasts/src/perun/ssh_helper.py
florian ca1bdfd707 Perun: Continuous release check
Instead of waiting a day or two until an episode has come out now the script checks for 3 days for a release and then waits another day for sponsorblock segments to be added.
2025-11-09 18:50:56 +01:00

180 lines
5.4 KiB
Python

import paramiko
import os
from dotenv import load_dotenv
from json import dumps
from simple_logger_handler import setup_logger
import time
import shlex
logger = setup_logger(__name__)
load_dotenv()
REMOTE_HOSTNAME = os.getenv("REMOTE_HOSTNAME")
REMOTE_PATH = os.getenv("REMOTE_PATH")
BACKEND_API_URL = os.getenv("BACKEND_API_URL")
BACKEND_API_KEY= os.getenv("BACKEND_API_KEY")
def load_ssh_config(host_alias:str) -> tuple[str, int, str, str]:
"""
Load SSH connection details from ~/.ssh/config for the given alias.
Args:
host_alias: The SSH host alias to look up
Returns:
Tuple of (hostname, port, username, keyfile)
Raises:
FileNotFoundError: If SSH config file doesn't exist
ValueError: If SSH configuration is incomplete
"""
logger.debug(f"[SSH] Loading SSH configuration for host alias '{host_alias}'")
ssh_config = paramiko.SSHConfig()
config_path = os.path.expanduser("~/.ssh/config")
try:
with open(config_path) as f:
ssh_config.parse(f)
except FileNotFoundError:
logger.error(f"[SSH] SSH config file not found at {config_path}")
raise
host_config = ssh_config.lookup(host_alias)
hostname = host_config.get("hostname")
port = int(host_config.get("port", 22))
username = host_config.get("user")
keyfile = host_config.get("identityfile", [None])[0]
if not all([hostname, username, keyfile]):
logger.error(f"[SSH] Incomplete SSH configuration for alias '{host_alias}'")
raise ValueError(f"[SSH] Missing SSH configuration for {host_alias}.")
logger.debug(f"[SSH] SSH config loaded: host={hostname}, port={port}, user={username}, key={keyfile}")
return hostname, port, username, keyfile
def create_ssh_client(hostname: str, port: int, username: str, keyfile: str)-> paramiko.SSHClient:
"""
Create and return a connected Paramiko SSHClient instance.
Args:
hostname: Remote hostname
port: SSH port
username: SSH username
keyfile: Path to SSH private key file
Returns:
Connected SSHClient instance (caller must close it)
Raises:
Exception: If SSH connection fails
"""
logger.debug("[SSH] Creating SSH client")
try:
ssh = paramiko.SSHClient()
ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts"))
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey)
logger.debug("[SSH] SSH connection established successfully")
return ssh
except Exception as e:
logger.error(f"[SSH] SSH connection failed: {e}", exc_info=True)
raise
def upload_via_sftp(filename) -> None:
"""
Upload a file to the remote host via SFTP using SSH credentials.
Args:
filename: Local file path to upload
Raises:
Exception: If upload fails
"""
logger.info(f"[SFTP] Preparing to upload file '{filename}' via SFTP")
try:
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
logger.debug(f"[SFTP] Connecting to {hostname}:{port} for file upload")
transport = paramiko.Transport((hostname, port))
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
transport.connect(username=username, pkey=pkey)
sftp = paramiko.SFTPClient.from_transport(transport)
remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename))
logger.info(f"[SFTP] Uploading to remote path: {remote_file}")
sftp.put(filename, remote_file)
sftp.close()
transport.close()
logger.info(f"[SFTP] File '{filename}' uploaded successfully")
except Exception as e:
logger.error(f"[SFTP] SFTP upload failed for '{filename}': {e}", exc_info=True)
raise
def send_notification_via_ssh(notification_title, notification_info) -> None:
"""
Send a JSON-formatted notification payload via SSH to the backend.
Args:
notification_title: Title of the notification
notification_info: Body/content of the notification
Raises:
Exception: If notification sending fails
"""
logger.info(f"[Notification] Sending SSH notification: {notification_title}")
ssh = None
try:
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
ssh = create_ssh_client(hostname, port, username, keyfile)
data = {
"receipent_user_id": 1,
"message": {
"title": notification_title,
"body": notification_info,
"category": "podcasts",
"timestamp": int(time.time())
}
}
json_payload = dumps(data)
logger.debug(f"[Notification] Notification payload: {json_payload}")
escaped_payload = shlex.quote(json_payload)
escaped_url = shlex.quote(BACKEND_API_URL)
notification_cmd = (
f"API_KEY=$(head -n1) && "
f"curl -s -X POST {escaped_url} "
f"-H 'Content-Type: application/json' "
f"-H \"X-API-Key-Internal: $API_KEY\" "
f"-d {escaped_payload}"
)
stdin, stdout, stderr = ssh.exec_command(notification_cmd)
stdin.write(f"{BACKEND_API_KEY}\n")
stdin.flush()
stdin.channel.shutdown_write()
exit_status = stdout.channel.recv_exit_status()
response_output = stdout.read().decode()
if exit_status == 0:
logger.info("[Notification] Notification sent successfully")
logger.debug(f"[Notification] Response: {response_output}")
else:
error_output = stderr.read().decode()
logger.warning(f"[Notification] Notification command exited with {exit_status}")
logger.warning(f"[Notification] Error: {error_output}")
logger.warning(f"[Notification] Response: {response_output}")
except Exception as e:
logger.error(f"[Notification] Failed to send SSH notification: {e}", exc_info=True)
raise
finally:
if ssh:
ssh.close()
logger.debug("[Notification] SSH connection closed")