diff --git a/src/perun/get_episode.py b/src/perun/get_episode.py index ab5fa73..b1d7098 100644 --- a/src/perun/get_episode.py +++ b/src/perun/get_episode.py @@ -1,10 +1,11 @@ import requests import yt_dlp import os +import time from dotenv import load_dotenv from ssh_helper import upload_via_sftp, send_notification_via_ssh -from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options -from logger_handler import setup_logger +from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options, check_for_sponsorblock_segments +from simple_logger_handler import setup_logger logger = setup_logger(__name__) @@ -13,9 +14,26 @@ PODCAST_AUTHORIZATION_TOKEN = os.getenv("PODCAST_AUTHORIZATION_TOKEN") PODCAST_API_URL = os.getenv("PODCAST_API_URL") -def get_audiobookshelf_data()->tuple[int | None, str | None]: - headers = {"Authorization": f"Bearer {PODCAST_AUTHORIZATION_TOKEN}"} +def get_audiobookshelf_data() -> tuple[int | None, str | None]: + """ + Fetches the latest episode data from the Audiobookshelf API. + Returns: + tuple[int | None, str | None]: + - The track number as an integer (or None if data could not be fetched due to retryable errors). + - The episode title as a string (or None if data could not be fetched due to retryable errors). + + Raises: + requests.exceptions.HTTPError: + If a non-retryable HTTP error occurs (e.g., 401 Unauthorized, 403 Forbidden, 404 Not Found). + + Notes: + - Connection errors, timeouts, and server-side HTTP errors (500, 502, 503, 504) are caught and logged. + In these cases, the function returns (None, None) so the caller can retry later. + """ + + headers = {"Authorization": f"Bearer {PODCAST_AUTHORIZATION_TOKEN}"} + logger.debug("[Audiobookshelf] Fetching Audiobookshelf data") try: response = requests.get(PODCAST_API_URL, headers=headers) response.raise_for_status() @@ -24,49 +42,151 @@ def get_audiobookshelf_data()->tuple[int | None, str | None]: audiobookshelf_track = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTrack"] audiobookshelf_title = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTitle"] - logger.debug(f"Fetched Audiobookshelf data: track={audiobookshelf_track}, title={audiobookshelf_title}") - return audiobookshelf_track, audiobookshelf_title + logger.debug(f"[Audiobookshelf] Fetched Audiobookshelf data: track={audiobookshelf_track}, title={audiobookshelf_title}") + return (audiobookshelf_track, audiobookshelf_title) - except requests.RequestException as e: - logger.warning(f"Failed to fetch Audiobookshelf data: {e}") + except requests.exceptions.ConnectionError as e: + logger.warning(f"[Audiobookshelf] Connection error, will retry: {e}") return (None, None) + except requests.exceptions.Timeout as e: + logger.warning(f"[Audiobookshelf] Request timed out, will retry: {e}") + return (None, None) + except requests.exceptions.HTTPError as e: + status = e.response.status_code + if status in {500, 502, 503, 504}: + logger.warning(f"[Audiobookshelf] Server error {status}, will retry: {e}") + return (None, None) + else: + logger.error(f"[Audiobookshelf] HTTP error {status}, not retrying: {e}") + raise +def check_until_new_episode_gets_released() -> tuple[int | None, dict | None, str | None]: + """ + Polls YouTube every hour for a new episode and compares it to the available episode on Audiobookshelf. + Stops after 72 hours. -def download_episode(): - logger.info("Starting Perun") + Returns: + tuple[int | None, dict | None, str | None]: + - Track number from Audiobookshelf + - Episode info dictionary from YouTube + - Episode URL + Returns (None, None, None) if no new episode found within timeout + """ + CHECK_INTERVAL_HOURS = 1 + MAX_HOURS = 72 + for attempt in range(1, MAX_HOURS + 1): + logger.debug(f"[EpisodeCheck] Waiting for a new episode to be released, attempt: {attempt}/{MAX_HOURS}") + audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data() - audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data() - if audiobookshelf_track is None or audiobookshelf_title is None: - logger.warning("Unable to fetch Audiobookshelf data. Exiting.") + if audiobookshelf_track is None or audiobookshelf_title is None: + logger.warning("[EpisodeCheck] Unable to fetch Audiobookshelf data, retrying in 1 hour.") + time.sleep(CHECK_INTERVAL_HOURS * 3600) + continue + + episode_url = get_url_for_latest_video() + if episode_url is None: + logger.warning("[EpisodeCheck] Unable to fetch latest video URL, retrying in 1 hour.") + time.sleep(CHECK_INTERVAL_HOURS * 3600) + continue + + episode_info = get_youtube_data(episode_url) + if not episode_info: + logger.warning("[EpisodeCheck] Unable to fetch video metadata, retrying in 1 hour.") + time.sleep(CHECK_INTERVAL_HOURS * 3600) + continue + + if audiobookshelf_title != episode_info["title"]: + logger.info(f"[EpisodeCheck] Latest YouTube episode: {episode_info['title']}") + return (audiobookshelf_track,episode_info,episode_url) + + logger.debug("[EpisodeCheck] No new episode found, retrying in 1 hour.") + time.sleep(CHECK_INTERVAL_HOURS * 3600) + + logger.warning("[EpisodeCheck] No new episode found after maximum attempts.") + return (None, None, None) + +def wait_for_sponsorblock_segments_to_be_added(episode_url) -> bool: + """ + Polls SponsorBlock for segments on the current video until found or until max attempts. + + Args: + episode_url: YouTube video URL to check for SponsorBlock segments + + Returns: + True if segments found, False otherwise + """ + CHECK_INTERVAL_HOURS = 1 + MAX_HOURS = 24 + for attempt in range(1, MAX_HOURS + 1): + logger.debug(f"[SponsorBlock] Waiting for SponsorBlock to be added, attempt: {attempt}/{MAX_HOURS} ") + segments = check_for_sponsorblock_segments(episode_url) + + if segments: + logger.debug("[SponsorBlock] Segments found, exiting loop.") + return True + + logger.debug("[SponsorBlock] No SponsorBlock segments found yet, retrying in 1 hour.") + time.sleep(CHECK_INTERVAL_HOURS * 3600) + + logger.warning("[SponsorBlock] Segments not found after maximum attempts.") + return False + +def download_episode() -> None: + """ + Main workflow: Check for new episode, download it, upload via SFTP, and send notification. + """ + logger.info("[App] Starting Perun") + + try: + audiobookshelf_track,episode_info,episode_url = check_until_new_episode_gets_released() + + if audiobookshelf_track is None or episode_info is None or episode_url is None: + logger.error("[App] Failed to find new episode within timeout period") + return + + logger.info("[App] New episode found") + except Exception as e: + logger.error(f"[App] Failed to fetch new episode info: {e}", exc_info=True) + return + + try: + episode_description = episode_info.get("description", "") + if "sponsored" in episode_description.lower(): + logger.debug("[App] Sponsored segments found in description, waiting for SponsorBlock") + wait_for_sponsorblock_segments_to_be_added(episode_url) + else: + logger.debug("[App] No sponsored segments found in description") + except Exception as e: + logger.warning(f"[App] Failed during SponsorBlock wait: {e}", exc_info=True) + + try: + track = str(int(audiobookshelf_track) + 1).zfill(4) + except (ValueError,TypeError) as e: + logger.warning(f"[App] Failed incrementing audiobookshelf track: {e}", exc_info=True) + return + + try: + options = return_download_options(episode_info,track) + except Exception as e: + logger.error(f"[App] Failed to generate download options: {e}", exc_info=True) return - episode_url = get_url_for_latest_video() - episode_info = get_youtube_data(episode_url) - logger.info(f"Latest YouTube episode: {episode_info['title']}") + logger.info("[App] Downloading new episode") + try: + with yt_dlp.YoutubeDL(options) as episode: + episode.download(episode_url) + logger.debug("[App] Download completed successfully") + except Exception as e: + logger.error(f"[App] Failed to download episode: {e}", exc_info=True) + return + + logger.info("[App] Uploading episode via SFTP") + upload_via_sftp(f"perun-{episode_info['date']}.mp3") - if audiobookshelf_title != episode_info["title"]: - logger.info("New episode found") - - track = str(int(audiobookshelf_track) + 1).zfill(4) - options = return_download_options(episode_info,track) - - logger.info("Downloading new episode") - try: - with yt_dlp.YoutubeDL(options) as episode: - episode.download(episode_url) - logger.debug("Download completed successfully") - except Exception as e: - logger.error(f"Failed to download episode: {e}", exc_info=True) - return - - logger.info("Uploading episode via SFTP") - upload_via_sftp(f"perun-{episode_info['date']}.mp3") + logger.info("[App] Sending release notification") + send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"]) + logger.info("[App] Workflow complete") - logger.info("Sending release notification") - send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"]) - logger.info("Workflow complete") - else: - logger.info("No new episode found, exiting.") if __name__ == "__main__": download_episode() diff --git a/src/perun/logger_handler.py b/src/perun/logger_handler.py deleted file mode 100644 index 3911736..0000000 --- a/src/perun/logger_handler.py +++ /dev/null @@ -1,19 +0,0 @@ -import logging -import os - -LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() -if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}: - LOG_LEVEL = "INFO" - -def setup_logger(name: str) -> logging.Logger: - logger = logging.getLogger(name) - if not logger.handlers: - handler = logging.StreamHandler() - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - ) - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(getattr(logging, LOG_LEVEL)) - logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}") - return logger diff --git a/src/perun/requirements.txt b/src/perun/requirements.txt index cc0de2e..0af6a6b 100644 --- a/src/perun/requirements.txt +++ b/src/perun/requirements.txt @@ -1,3 +1,4 @@ +--extra-index-url https://git.gansejunge.com/api/packages/notifier/pypi/simple/ bcrypt==5.0.0 Brotli==1.1.0 certifi==2025.10.5 @@ -14,5 +15,7 @@ pycryptodomex==3.23.0 PyNaCl==1.6.0 python-dotenv==1.1.1 requests==2.32.5 +sponsorblock.py==0.2.3 urllib3==2.5.0 websockets==15.0.1 +simple-logger-handler==0.1.0 \ No newline at end of file diff --git a/src/perun/ssh_helper.py b/src/perun/ssh_helper.py index e872202..5f3b5fa 100644 --- a/src/perun/ssh_helper.py +++ b/src/perun/ssh_helper.py @@ -2,8 +2,9 @@ import paramiko import os from dotenv import load_dotenv from json import dumps -from logger_handler import setup_logger +from simple_logger_handler import setup_logger import time +import shlex logger = setup_logger(__name__) load_dotenv() @@ -12,11 +13,21 @@ REMOTE_PATH = os.getenv("REMOTE_PATH") BACKEND_API_URL = os.getenv("BACKEND_API_URL") BACKEND_API_KEY= os.getenv("BACKEND_API_KEY") -def load_ssh_config(host_alias): +def load_ssh_config(host_alias:str) -> tuple[str, int, str, str]: """ Load SSH connection details from ~/.ssh/config for the given alias. + + Args: + host_alias: The SSH host alias to look up + + Returns: + Tuple of (hostname, port, username, keyfile) + + Raises: + FileNotFoundError: If SSH config file doesn't exist + ValueError: If SSH configuration is incomplete """ - logger.debug(f"Loading SSH configuration for host alias '{host_alias}'") + logger.debug(f"[SSH] Loading SSH configuration for host alias '{host_alias}'") ssh_config = paramiko.SSHConfig() config_path = os.path.expanduser("~/.ssh/config") @@ -24,7 +35,7 @@ def load_ssh_config(host_alias): with open(config_path) as f: ssh_config.parse(f) except FileNotFoundError: - logger.error(f"SSH config file not found at {config_path}") + logger.error(f"[SSH] SSH config file not found at {config_path}") raise host_config = ssh_config.lookup(host_alias) @@ -34,17 +45,30 @@ def load_ssh_config(host_alias): keyfile = host_config.get("identityfile", [None])[0] if not all([hostname, username, keyfile]): - logger.error(f"Incomplete SSH configuration for alias '{host_alias}'") - raise ValueError(f"Missing SSH configuration for {host_alias}.") + logger.error(f"[SSH] Incomplete SSH configuration for alias '{host_alias}'") + raise ValueError(f"[SSH] Missing SSH configuration for {host_alias}.") - logger.debug(f"SSH config loaded: host={hostname}, port={port}, user={username}, key={keyfile}") + logger.debug(f"[SSH] SSH config loaded: host={hostname}, port={port}, user={username}, key={keyfile}") return hostname, port, username, keyfile -def create_ssh_client(hostname, port, username, keyfile): +def create_ssh_client(hostname: str, port: int, username: str, keyfile: str)-> paramiko.SSHClient: """ Create and return a connected Paramiko SSHClient instance. + + Args: + hostname: Remote hostname + port: SSH port + username: SSH username + keyfile: Path to SSH private key file + + Returns: + Connected SSHClient instance (caller must close it) + + Raises: + Exception: If SSH connection fails """ + logger.debug("[SSH] Creating SSH client") try: ssh = paramiko.SSHClient() ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts")) @@ -52,21 +76,27 @@ def create_ssh_client(hostname, port, username, keyfile): pkey = paramiko.RSAKey.from_private_key_file(keyfile) ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey) - logger.debug("SSH connection established successfully") + logger.debug("[SSH] SSH connection established successfully") return ssh except Exception as e: - logger.error(f"SSH connection failed: {e}", exc_info=True) + logger.error(f"[SSH] SSH connection failed: {e}", exc_info=True) raise -def upload_via_sftp(filename): +def upload_via_sftp(filename) -> None: """ Upload a file to the remote host via SFTP using SSH credentials. + + Args: + filename: Local file path to upload + + Raises: + Exception: If upload fails """ - logger.info(f"Preparing to upload file '{filename}' via SFTP") + logger.info(f"[SFTP] Preparing to upload file '{filename}' via SFTP") try: hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME) - logger.debug(f"Connecting to {hostname}:{port} for file upload") + logger.debug(f"[SFTP] Connecting to {hostname}:{port} for file upload") transport = paramiko.Transport((hostname, port)) pkey = paramiko.RSAKey.from_private_key_file(keyfile) @@ -74,62 +104,77 @@ def upload_via_sftp(filename): sftp = paramiko.SFTPClient.from_transport(transport) remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename)) - logger.info(f"Uploading to remote path: {remote_file}") + logger.info(f"[SFTP] Uploading to remote path: {remote_file}") sftp.put(filename, remote_file) sftp.close() transport.close() - logger.info(f"File '{filename}' uploaded successfully") + logger.info(f"[SFTP] File '{filename}' uploaded successfully") except Exception as e: - logger.error(f"SFTP upload failed for '{filename}': {e}", exc_info=True) + logger.error(f"[SFTP] SFTP upload failed for '{filename}': {e}", exc_info=True) raise -def send_notification_via_ssh(notification_title, notification_info): +def send_notification_via_ssh(notification_title, notification_info) -> None: """ Send a JSON-formatted notification payload via SSH to the backend. + + Args: + notification_title: Title of the notification + notification_info: Body/content of the notification + + Raises: + Exception: If notification sending fails """ - logger.info(f"Sending SSH notification: {notification_title}") + logger.info(f"[Notification] Sending SSH notification: {notification_title}") + ssh = None try: hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME) - with create_ssh_client(hostname, port, username, keyfile) as ssh: - data = { - "receipent_user_id": 1, - "message": { - "title": notification_title, - "body": notification_info, - "category": "podcasts", - "timestamp": int(time.time()) - } + ssh = create_ssh_client(hostname, port, username, keyfile) + + data = { + "receipent_user_id": 1, + "message": { + "title": notification_title, + "body": notification_info, + "category": "podcasts", + "timestamp": int(time.time()) } - json_payload = dumps(data) - logger.debug(f"Notification payload: {json_payload}") + } + json_payload = dumps(data) + logger.debug(f"[Notification] Notification payload: {json_payload}") - notification_cmd = ( - f"API_KEY=$(head -n1) && " - f"curl -s -X POST '{BACKEND_API_URL}' " - f"-H 'Content-Type: application/json' " - f"-H \"X-API-Key-Internal: $API_KEY\" " - f"-d '{json_payload}'" - ) + escaped_payload = shlex.quote(json_payload) + escaped_url = shlex.quote(BACKEND_API_URL) - stdin, stdout, stderr = ssh.exec_command(notification_cmd) - stdin.write(f"{BACKEND_API_KEY}\n") - stdin.flush() - stdin.channel.shutdown_write() + notification_cmd = ( + f"API_KEY=$(head -n1) && " + f"curl -s -X POST {escaped_url} " + f"-H 'Content-Type: application/json' " + f"-H \"X-API-Key-Internal: $API_KEY\" " + f"-d {escaped_payload}" + ) - exit_status = stdout.channel.recv_exit_status() - response_output = stdout.read().decode() + stdin, stdout, stderr = ssh.exec_command(notification_cmd) + stdin.write(f"{BACKEND_API_KEY}\n") + stdin.flush() + stdin.channel.shutdown_write() - exit_status = stdout.channel.recv_exit_status() - if exit_status == 0: - logger.info("Notification sent successfully") - logger.debug(f"Response: {response_output}") - else: - error_output = stderr.read().decode() - logger.warning(f"Notification command exited with {exit_status}") - logger.warning(f"Error: {error_output}") - logger.warning(f"Response: {response_output}") + exit_status = stdout.channel.recv_exit_status() + response_output = stdout.read().decode() + + if exit_status == 0: + logger.info("[Notification] Notification sent successfully") + logger.debug(f"[Notification] Response: {response_output}") + else: + error_output = stderr.read().decode() + logger.warning(f"[Notification] Notification command exited with {exit_status}") + logger.warning(f"[Notification] Error: {error_output}") + logger.warning(f"[Notification] Response: {response_output}") except Exception as e: - logger.error(f"Failed to send SSH notification: {e}", exc_info=True) - raise \ No newline at end of file + logger.error(f"[Notification] Failed to send SSH notification: {e}", exc_info=True) + raise + finally: + if ssh: + ssh.close() + logger.debug("[Notification] SSH connection closed") \ No newline at end of file diff --git a/src/perun/youtube_handler.py b/src/perun/youtube_handler.py index 770a15d..29aeb81 100644 --- a/src/perun/youtube_handler.py +++ b/src/perun/youtube_handler.py @@ -4,8 +4,10 @@ import contextlib from dotenv import load_dotenv import os from helper import return_string_as_html -from logger_handler import setup_logger +from simple_logger_handler import setup_logger import json +import sponsorblock as sb + logger = setup_logger(__name__) load_dotenv() @@ -16,7 +18,7 @@ def get_url_for_latest_video(): """ Fetch the URL of the latest video from a YouTube channel. """ - logger.info("Fetching latest video URL from YouTube channel") + logger.info("[YouTube] Fetching latest video URL from YouTube channel") options = { "extract_flat": True, "playlist_items": "1", @@ -30,15 +32,15 @@ def get_url_for_latest_video(): with yt_dlp.YoutubeDL(options) as video: info_dict = video.extract_info(YOUTUBE_CHANNEL_URL, download=False) except Exception as e: - logger.error(f"Failed to fetch latest video info: {e}", exc_info=True) + logger.error(f"[YouTube] Failed to fetch latest video info: {e}", exc_info=True) return None if "entries" in info_dict and len(info_dict["entries"]) > 0: latest_url = info_dict["entries"][0]["url"] - logger.debug(f"Latest video URL found: {latest_url}") + logger.debug(f"[YouTube] Latest video URL found: {latest_url}") return latest_url else: - logger.warning("No entries found in channel feed") + logger.warning("[YouTube] No entries found in channel feed") return None def get_youtube_data(url: str) -> dict: @@ -50,7 +52,7 @@ def get_youtube_data(url: str) -> dict: with yt_dlp.YoutubeDL({"quiet": True, "noprogress": True}) as video: info_dict = video.extract_info(url, download=False) except Exception as e: - logger.error(f"Failed to fetch YouTube video info for {url}: {e}", exc_info=True) + logger.error(f"[YouTube] Failed to fetch YouTube video info for {url}: {e}", exc_info=True) return {} video_data = { @@ -58,14 +60,24 @@ def get_youtube_data(url: str) -> dict: info_dict["timestamp"], datetime.timezone.utc ).strftime("%Y-%m-%d"), "title": info_dict["title"], - "description": return_string_as_html(info_dict["description"]), - "upload_date": info_dict["upload_date"] + "description": info_dict.get("description", "") } - logger.debug(f"Fetched video data: {json.dumps(video_data, indent=4)}") + logger.debug(f"[YouTube] Fetched video data: {json.dumps(video_data, indent=4)}") return video_data +def check_for_sponsorblock_segments(youtube_video:str) -> bool: + client = sb.Client() + try: + segments = client.get_skip_segments(youtube_video) + except sb.errors.NotFoundException: + logger.debug(f"[SponsorBlock] No SponsorBlock information for video:{youtube_video}") + return False + if segments: + logger.info(f"[SponsorBlock] SponsorBlock segments found for video: {youtube_video}") + return True + def return_download_options(information:dict,track:str)->dict: download_options = { "quiet": True, @@ -101,10 +113,13 @@ def return_download_options(information:dict,track:str)->dict: "-metadata", "artist=Perun", "-metadata", f"track={track}", "-metadata", f"date={information['date']}", - "-metadata", f"comment={information['description']}", - "-metadata", f"description={information['description']}", + "-metadata", f"comment={return_string_as_html(information['description'])}", + "-metadata", f"description={return_string_as_html(information['description'])}", ], "merge_output_format": "mp3" } - logger.debug(f"Created download options:\n {json.dumps(download_options, indent=4)}") - return download_options \ No newline at end of file + logger.debug(f"[YouTube] Created download options:\n {json.dumps(download_options, indent=4)}") + return download_options + +if __name__ == "__main__": + print(check_for_sponsorblock_segments("https://www.youtube.com/watch?v=M0t8UYZ9rrQ"))