import paramiko import os from dotenv import load_dotenv from json import dumps load_dotenv() REMOTE_HOSTNAME = os.getenv("REMOTE_HOSTNAME") REMOTE_PATH = os.getenv("REMOTE_PATH") BACKEND_API_URL = os.getenv("BACKEND_API_URL") BACKEND_API_KEY= os.getenv("BACKEND_API_KEY") def load_ssh_config(host_alias): ssh_config = paramiko.SSHConfig() config_path = os.path.expanduser("~/.ssh/config") with open(config_path) as f: ssh_config.parse(f) host_config = ssh_config.lookup(host_alias) hostname = host_config.get("hostname") port = int(host_config.get("port", 22)) username = host_config.get("user") keyfile = host_config.get("identityfile", [None])[0] if not all([hostname, username, keyfile]): raise ValueError(f"Missing SSH configuration for {host_alias}.") return hostname, port, username, keyfile def create_ssh_client(hostname, port, username, keyfile): ssh = paramiko.SSHClient() ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts")) ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) pkey = paramiko.RSAKey.from_private_key_file(keyfile) ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey) return ssh def upload_via_sftp(filename): hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME) transport = paramiko.Transport((hostname, port)) pkey = paramiko.RSAKey.from_private_key_file(keyfile) transport.connect(username=username, pkey=pkey) sftp = paramiko.SFTPClient.from_transport(transport) remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename)) sftp.put(filename, remote_file) sftp.close() transport.close() def send_notification_via_ssh(notification_title, notification_info): hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME) with create_ssh_client(hostname, port, username, keyfile) as ssh: data = { "receipent_user_id": 1, "message": { "title": notification_title, "info": notification_info, "category": "mixtapes" } } json_payload = dumps(data) # Command reads API key and JSON from stdin notification_cmd = ( f"curl -s -X POST '{BACKEND_API_URL}' " f"-H 'Content-Type: application/json' " f"-H 'X-API-Key-Internal: $(head -n1)' " f"-d @-" ) stdin, stdout, stderr = ssh.exec_command(notification_cmd) stdin.write(f"{BACKEND_API_KEY}\n{json_payload}") stdin.flush() stdin.channel.shutdown_write()