diff --git a/src/perun/get_episode.py b/src/perun/get_episode.py index 7095958..dfa19a0 100644 --- a/src/perun/get_episode.py +++ b/src/perun/get_episode.py @@ -14,7 +14,7 @@ PODCAST_AUTHORIZATION_TOKEN = os.getenv("PODCAST_AUTHORIZATION_TOKEN") PODCAST_API_URL = os.getenv("PODCAST_API_URL") -def get_audiobookshelf_data()->tuple[int | None, str | None]: +def get_audiobookshelf_data() -> tuple[int | None, str | None]: """ Fetches the latest episode data from the Audiobookshelf API. @@ -43,27 +43,34 @@ def get_audiobookshelf_data()->tuple[int | None, str | None]: audiobookshelf_title = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTitle"] logger.debug(f"[Audiobookshelf] Fetched Audiobookshelf data: track={audiobookshelf_track}, title={audiobookshelf_title}") - return audiobookshelf_track, audiobookshelf_title + return (audiobookshelf_track, audiobookshelf_title) except requests.exceptions.ConnectionError as e: logger.warning(f"[Audiobookshelf] Connection error, will retry: {e}") - return None + return (None, None) except requests.exceptions.Timeout as e: logger.warning(f"[Audiobookshelf] Request timed out, will retry: {e}") - return None + return (None, None) except requests.exceptions.HTTPError as e: status = e.response.status_code if status in {500, 502, 503, 504}: logger.warning(f"[Audiobookshelf] Server error {status}, will retry: {e}") - return None + return (None, None) else: logger.error(f"[Audiobookshelf] HTTP error {status}, not retrying: {e}") raise -def check_until_new_episode_gets_released(): +def check_until_new_episode_gets_released() -> tuple[int | None, dict | None, str | None]: """ Polls YouTube every hour for a new episode and compares it to the available episode on Audiobookshelf. Stops after 72 hours. + + Returns: + tuple[int | None, dict | None, str | None]: + - Track number from Audiobookshelf + - Episode info dictionary from YouTube + - Episode URL + Returns (None, None, None) if no new episode found within timeout """ CHECK_INTERVAL_HOURS = 1 MAX_HOURS = 72 @@ -77,43 +84,66 @@ def check_until_new_episode_gets_released(): continue episode_url = get_url_for_latest_video() + if episode_url is None: + logger.warning("[EpisodeCheck] Unable to fetch latest video URL, retrying in 1 hour.") + time.sleep(CHECK_INTERVAL_HOURS * 3600) + continue + episode_info = get_youtube_data(episode_url) + if not episode_info: + logger.warning("[EpisodeCheck] Unable to fetch video metadata, retrying in 1 hour.") + time.sleep(CHECK_INTERVAL_HOURS * 3600) + continue if audiobookshelf_title != episode_info["title"]: logger.info(f"[EpisodeCheck] Latest YouTube episode: {episode_info['title']}") - return audiobookshelf_track,episode_info,episode_url + return (audiobookshelf_track,episode_info,episode_url) logger.debug("[EpisodeCheck] No new episode found, retrying in 1 hour.") time.sleep(CHECK_INTERVAL_HOURS * 3600) logger.warning("[EpisodeCheck] No new episode found after maximum attempts.") - return None, None, None + return (None, None, None) -def wait_for_sponsorblock_segments_to_be_added(): +def wait_for_sponsorblock_segments_to_be_added(episode_url) -> bool: """ Polls SponsorBlock for segments on the current video until found or until max attempts. + + Args: + episode_url: YouTube video URL to check for SponsorBlock segments + + Returns: + True if segments found, False otherwise """ CHECK_INTERVAL_HOURS = 1 MAX_HOURS = 24 for attempt in range(1, MAX_HOURS + 1): logger.debug(f"[SponsorBlock] Waiting for SponsorBlock to be added, attempt: {attempt}/{MAX_HOURS} ") - segments = check_for_sponsorblock_segments() + segments = check_for_sponsorblock_segments(episode_url) if segments: - logger.debug("[SponsorBlock] Segments found, existing loop.") + logger.debug("[SponsorBlock] Segments found, exiting loop.") return True logger.debug("[SponsorBlock] No SponsorBlock segments found yet, retrying in 1 hour.") time.sleep(CHECK_INTERVAL_HOURS * 3600) logger.warning("[SponsorBlock] Segments not found after maximum attempts.") - return None + return False -def download_episode(): +def download_episode() -> None: + """ + Main workflow: Check for new episode, download it, upload via SFTP, and send notification. + """ logger.info("[App] Starting Perun") try: audiobookshelf_track,episode_info,episode_url = check_until_new_episode_gets_released() + + if audiobookshelf_track is None or episode_info is None or episode_url is None: + logger.error("[App] Failed to find new episode within timeout period") + return + logger.info("[App] New episode found") except Exception as e: logger.error(f"[App] Failed to fetch new episode info: {e}", exc_info=True) @@ -123,7 +153,7 @@ def download_episode(): episode_description = episode_info.get("description", "") if "sponsored" in episode_description.lower(): logger.debug(f"[App] Sponsored segments found in description, waiting for SponsorBlock") - wait_for_sponsorblock_segments_to_be_added() + wait_for_sponsorblock_segments_to_be_added(episode_url) else: logger.debug(f"[App] No sponsored segments found in description") except Exception as e: @@ -131,8 +161,8 @@ def download_episode(): try: track = str(int(audiobookshelf_track) + 1).zfill(4) - except Exception as e: - logger.error(f"[App] Invalid Audiobookshelf track number: {audiobookshelf_track}, error: {e}") + except (ValueError,TypeError) as e: + logger.warning(f"[App] Failed incrementing audiobookshelf track: {e}", exc_info=True) return try: diff --git a/src/perun/ssh_helper.py b/src/perun/ssh_helper.py index 886e089..4e61a5c 100644 --- a/src/perun/ssh_helper.py +++ b/src/perun/ssh_helper.py @@ -4,6 +4,7 @@ from dotenv import load_dotenv from json import dumps from logger_handler import setup_logger import time +import shlex logger = setup_logger(__name__) load_dotenv() @@ -12,9 +13,19 @@ REMOTE_PATH = os.getenv("REMOTE_PATH") BACKEND_API_URL = os.getenv("BACKEND_API_URL") BACKEND_API_KEY= os.getenv("BACKEND_API_KEY") -def load_ssh_config(host_alias): +def load_ssh_config(host_alias:str) -> tuple[str, int, str, str]: """ Load SSH connection details from ~/.ssh/config for the given alias. + + Args: + host_alias: The SSH host alias to look up + + Returns: + Tuple of (hostname, port, username, keyfile) + + Raises: + FileNotFoundError: If SSH config file doesn't exist + ValueError: If SSH configuration is incomplete """ logger.debug(f"[SSH] Loading SSH configuration for host alias '{host_alias}'") ssh_config = paramiko.SSHConfig() @@ -41,9 +52,21 @@ def load_ssh_config(host_alias): return hostname, port, username, keyfile -def create_ssh_client(hostname, port, username, keyfile): +def create_ssh_client(hostname: str, port: int, username: str, keyfile: str)-> paramiko.SSHClient: """ Create and return a connected Paramiko SSHClient instance. + + Args: + hostname: Remote hostname + port: SSH port + username: SSH username + keyfile: Path to SSH private key file + + Returns: + Connected SSHClient instance (caller must close it) + + Raises: + Exception: If SSH connection fails """ logger.debug("[SSH] Creating SSH client") try: @@ -60,9 +83,15 @@ def create_ssh_client(hostname, port, username, keyfile): raise -def upload_via_sftp(filename): +def upload_via_sftp(filename) -> None: """ Upload a file to the remote host via SFTP using SSH credentials. + + Args: + filename: Local file path to upload + + Raises: + Exception: If upload fails """ logger.info(f"[SFTP] Preparing to upload file '{filename}' via SFTP") try: @@ -86,51 +115,66 @@ def upload_via_sftp(filename): raise -def send_notification_via_ssh(notification_title, notification_info): +def send_notification_via_ssh(notification_title, notification_info) -> None: """ Send a JSON-formatted notification payload via SSH to the backend. + + Args: + notification_title: Title of the notification + notification_info: Body/content of the notification + + Raises: + Exception: If notification sending fails """ logger.info(f"[Notification] Sending SSH notification: {notification_title}") + ssh = None try: hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME) - with create_ssh_client(hostname, port, username, keyfile) as ssh: - data = { - "receipent_user_id": 1, - "message": { - "title": notification_title, - "body": notification_info, - "category": "podcasts", - "timestamp": int(time.time()) - } + ssh = create_ssh_client(hostname, port, username, keyfile) + + data = { + "receipent_user_id": 1, + "message": { + "title": notification_title, + "body": notification_info, + "category": "podcasts", + "timestamp": int(time.time()) } - json_payload = dumps(data) - logger.debug(f"[Notification] Notification payload: {json_payload}") + } + json_payload = dumps(data) + logger.debug(f"[Notification] Notification payload: {json_payload}") - notification_cmd = ( - f"API_KEY=$(head -n1) && " - f"curl -s -X POST '{BACKEND_API_URL}' " - f"-H 'Content-Type: application/json' " - f"-H \"X-API-Key-Internal: $API_KEY\" " - f"-d '{json_payload}'" - ) + escaped_payload = shlex.quote(json_payload) + escaped_url = shlex.quote(BACKEND_API_URL) - stdin, stdout, stderr = ssh.exec_command(notification_cmd) - stdin.write(f"{BACKEND_API_KEY}\n") - stdin.flush() - stdin.channel.shutdown_write() + notification_cmd = ( + f"API_KEY=$(head -n1) && " + f"curl -s -X POST {escaped_url} " + f"-H 'Content-Type: application/json' " + f"-H \"X-API-Key-Internal: $API_KEY\" " + f"-d {escaped_payload}" + ) - exit_status = stdout.channel.recv_exit_status() - response_output = stdout.read().decode() + stdin, stdout, stderr = ssh.exec_command(notification_cmd) + stdin.write(f"{BACKEND_API_KEY}\n") + stdin.flush() + stdin.channel.shutdown_write() - exit_status = stdout.channel.recv_exit_status() - if exit_status == 0: - logger.info("[Notification] Notification sent successfully") - logger.debug(f"[Notification] Response: {response_output}") - else: - error_output = stderr.read().decode() - logger.warning(f"[Notification] Notification command exited with {exit_status}") - logger.warning(f"[Notification] Error: {error_output}") - logger.warning(f"[Notification] Response: {response_output}") + exit_status = stdout.channel.recv_exit_status() + response_output = stdout.read().decode() + + if exit_status == 0: + logger.info("[Notification] Notification sent successfully") + logger.debug(f"[Notification] Response: {response_output}") + else: + error_output = stderr.read().decode() + logger.warning(f"[Notification] Notification command exited with {exit_status}") + logger.warning(f"[Notification] Error: {error_output}") + logger.warning(f"[Notification] Response: {response_output}") except Exception as e: logger.error(f"[Notification] Failed to send SSH notification: {e}", exc_info=True) - raise \ No newline at end of file + raise + finally: + if ssh: + ssh.close() + logger.debug("[Notification] SSH connection closed") \ No newline at end of file