Merge branch 'feature/continuous-release-check'
All checks were successful
Build image with python3,get-iplayer / Build Docker Image (push) Successful in 51s

This commit is contained in:
florian 2025-11-09 18:55:37 +01:00
commit f5a95a08f2
5 changed files with 287 additions and 123 deletions

View File

@ -1,10 +1,11 @@
import requests
import yt_dlp
import os
import time
from dotenv import load_dotenv
from ssh_helper import upload_via_sftp, send_notification_via_ssh
from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options
from logger_handler import setup_logger
from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options, check_for_sponsorblock_segments
from simple_logger_handler import setup_logger
logger = setup_logger(__name__)
@ -13,9 +14,26 @@ PODCAST_AUTHORIZATION_TOKEN = os.getenv("PODCAST_AUTHORIZATION_TOKEN")
PODCAST_API_URL = os.getenv("PODCAST_API_URL")
def get_audiobookshelf_data()->tuple[int | None, str | None]:
headers = {"Authorization": f"Bearer {PODCAST_AUTHORIZATION_TOKEN}"}
def get_audiobookshelf_data() -> tuple[int | None, str | None]:
"""
Fetches the latest episode data from the Audiobookshelf API.
Returns:
tuple[int | None, str | None]:
- The track number as an integer (or None if data could not be fetched due to retryable errors).
- The episode title as a string (or None if data could not be fetched due to retryable errors).
Raises:
requests.exceptions.HTTPError:
If a non-retryable HTTP error occurs (e.g., 401 Unauthorized, 403 Forbidden, 404 Not Found).
Notes:
- Connection errors, timeouts, and server-side HTTP errors (500, 502, 503, 504) are caught and logged.
In these cases, the function returns (None, None) so the caller can retry later.
"""
headers = {"Authorization": f"Bearer {PODCAST_AUTHORIZATION_TOKEN}"}
logger.debug("[Audiobookshelf] Fetching Audiobookshelf data")
try:
response = requests.get(PODCAST_API_URL, headers=headers)
response.raise_for_status()
@ -24,49 +42,151 @@ def get_audiobookshelf_data()->tuple[int | None, str | None]:
audiobookshelf_track = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTrack"]
audiobookshelf_title = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTitle"]
logger.debug(f"Fetched Audiobookshelf data: track={audiobookshelf_track}, title={audiobookshelf_title}")
return audiobookshelf_track, audiobookshelf_title
logger.debug(f"[Audiobookshelf] Fetched Audiobookshelf data: track={audiobookshelf_track}, title={audiobookshelf_title}")
return (audiobookshelf_track, audiobookshelf_title)
except requests.RequestException as e:
logger.warning(f"Failed to fetch Audiobookshelf data: {e}")
except requests.exceptions.ConnectionError as e:
logger.warning(f"[Audiobookshelf] Connection error, will retry: {e}")
return (None, None)
except requests.exceptions.Timeout as e:
logger.warning(f"[Audiobookshelf] Request timed out, will retry: {e}")
return (None, None)
except requests.exceptions.HTTPError as e:
status = e.response.status_code
if status in {500, 502, 503, 504}:
logger.warning(f"[Audiobookshelf] Server error {status}, will retry: {e}")
return (None, None)
else:
logger.error(f"[Audiobookshelf] HTTP error {status}, not retrying: {e}")
raise
def check_until_new_episode_gets_released() -> tuple[int | None, dict | None, str | None]:
"""
Polls YouTube every hour for a new episode and compares it to the available episode on Audiobookshelf.
Stops after 72 hours.
def download_episode():
logger.info("Starting Perun")
Returns:
tuple[int | None, dict | None, str | None]:
- Track number from Audiobookshelf
- Episode info dictionary from YouTube
- Episode URL
Returns (None, None, None) if no new episode found within timeout
"""
CHECK_INTERVAL_HOURS = 1
MAX_HOURS = 72
for attempt in range(1, MAX_HOURS + 1):
logger.debug(f"[EpisodeCheck] Waiting for a new episode to be released, attempt: {attempt}/{MAX_HOURS}")
audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data()
audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data()
if audiobookshelf_track is None or audiobookshelf_title is None:
logger.warning("Unable to fetch Audiobookshelf data. Exiting.")
return
if audiobookshelf_track is None or audiobookshelf_title is None:
logger.warning("[EpisodeCheck] Unable to fetch Audiobookshelf data, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
continue
episode_url = get_url_for_latest_video()
episode_info = get_youtube_data(episode_url)
logger.info(f"Latest YouTube episode: {episode_info['title']}")
episode_url = get_url_for_latest_video()
if episode_url is None:
logger.warning("[EpisodeCheck] Unable to fetch latest video URL, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
continue
if audiobookshelf_title != episode_info["title"]:
logger.info("New episode found")
episode_info = get_youtube_data(episode_url)
if not episode_info:
logger.warning("[EpisodeCheck] Unable to fetch video metadata, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
continue
track = str(int(audiobookshelf_track) + 1).zfill(4)
options = return_download_options(episode_info,track)
if audiobookshelf_title != episode_info["title"]:
logger.info(f"[EpisodeCheck] Latest YouTube episode: {episode_info['title']}")
return (audiobookshelf_track,episode_info,episode_url)
logger.info("Downloading new episode")
try:
with yt_dlp.YoutubeDL(options) as episode:
episode.download(episode_url)
logger.debug("Download completed successfully")
except Exception as e:
logger.error(f"Failed to download episode: {e}", exc_info=True)
logger.debug("[EpisodeCheck] No new episode found, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
logger.warning("[EpisodeCheck] No new episode found after maximum attempts.")
return (None, None, None)
def wait_for_sponsorblock_segments_to_be_added(episode_url) -> bool:
"""
Polls SponsorBlock for segments on the current video until found or until max attempts.
Args:
episode_url: YouTube video URL to check for SponsorBlock segments
Returns:
True if segments found, False otherwise
"""
CHECK_INTERVAL_HOURS = 1
MAX_HOURS = 24
for attempt in range(1, MAX_HOURS + 1):
logger.debug(f"[SponsorBlock] Waiting for SponsorBlock to be added, attempt: {attempt}/{MAX_HOURS} ")
segments = check_for_sponsorblock_segments(episode_url)
if segments:
logger.debug("[SponsorBlock] Segments found, exiting loop.")
return True
logger.debug("[SponsorBlock] No SponsorBlock segments found yet, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
logger.warning("[SponsorBlock] Segments not found after maximum attempts.")
return False
def download_episode() -> None:
"""
Main workflow: Check for new episode, download it, upload via SFTP, and send notification.
"""
logger.info("[App] Starting Perun")
try:
audiobookshelf_track,episode_info,episode_url = check_until_new_episode_gets_released()
if audiobookshelf_track is None or episode_info is None or episode_url is None:
logger.error("[App] Failed to find new episode within timeout period")
return
logger.info("Uploading episode via SFTP")
upload_via_sftp(f"perun-{episode_info['date']}.mp3")
logger.info("[App] New episode found")
except Exception as e:
logger.error(f"[App] Failed to fetch new episode info: {e}", exc_info=True)
return
try:
episode_description = episode_info.get("description", "")
if "sponsored" in episode_description.lower():
logger.debug("[App] Sponsored segments found in description, waiting for SponsorBlock")
wait_for_sponsorblock_segments_to_be_added(episode_url)
else:
logger.debug("[App] No sponsored segments found in description")
except Exception as e:
logger.warning(f"[App] Failed during SponsorBlock wait: {e}", exc_info=True)
try:
track = str(int(audiobookshelf_track) + 1).zfill(4)
except (ValueError,TypeError) as e:
logger.warning(f"[App] Failed incrementing audiobookshelf track: {e}", exc_info=True)
return
try:
options = return_download_options(episode_info,track)
except Exception as e:
logger.error(f"[App] Failed to generate download options: {e}", exc_info=True)
return
logger.info("[App] Downloading new episode")
try:
with yt_dlp.YoutubeDL(options) as episode:
episode.download(episode_url)
logger.debug("[App] Download completed successfully")
except Exception as e:
logger.error(f"[App] Failed to download episode: {e}", exc_info=True)
return
logger.info("[App] Uploading episode via SFTP")
upload_via_sftp(f"perun-{episode_info['date']}.mp3")
logger.info("[App] Sending release notification")
send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"])
logger.info("[App] Workflow complete")
logger.info("Sending release notification")
send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"])
logger.info("Workflow complete")
else:
logger.info("No new episode found, exiting.")
if __name__ == "__main__":
download_episode()

View File

@ -1,19 +0,0 @@
import logging
import os
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
LOG_LEVEL = "INFO"
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(getattr(logging, LOG_LEVEL))
logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}")
return logger

View File

@ -1,3 +1,4 @@
--extra-index-url https://git.gansejunge.com/api/packages/notifier/pypi/simple/
bcrypt==5.0.0
Brotli==1.1.0
certifi==2025.10.5
@ -14,5 +15,7 @@ pycryptodomex==3.23.0
PyNaCl==1.6.0
python-dotenv==1.1.1
requests==2.32.5
sponsorblock.py==0.2.3
urllib3==2.5.0
websockets==15.0.1
simple-logger-handler==0.1.0

View File

@ -2,8 +2,9 @@ import paramiko
import os
from dotenv import load_dotenv
from json import dumps
from logger_handler import setup_logger
from simple_logger_handler import setup_logger
import time
import shlex
logger = setup_logger(__name__)
load_dotenv()
@ -12,11 +13,21 @@ REMOTE_PATH = os.getenv("REMOTE_PATH")
BACKEND_API_URL = os.getenv("BACKEND_API_URL")
BACKEND_API_KEY= os.getenv("BACKEND_API_KEY")
def load_ssh_config(host_alias):
def load_ssh_config(host_alias:str) -> tuple[str, int, str, str]:
"""
Load SSH connection details from ~/.ssh/config for the given alias.
Args:
host_alias: The SSH host alias to look up
Returns:
Tuple of (hostname, port, username, keyfile)
Raises:
FileNotFoundError: If SSH config file doesn't exist
ValueError: If SSH configuration is incomplete
"""
logger.debug(f"Loading SSH configuration for host alias '{host_alias}'")
logger.debug(f"[SSH] Loading SSH configuration for host alias '{host_alias}'")
ssh_config = paramiko.SSHConfig()
config_path = os.path.expanduser("~/.ssh/config")
@ -24,7 +35,7 @@ def load_ssh_config(host_alias):
with open(config_path) as f:
ssh_config.parse(f)
except FileNotFoundError:
logger.error(f"SSH config file not found at {config_path}")
logger.error(f"[SSH] SSH config file not found at {config_path}")
raise
host_config = ssh_config.lookup(host_alias)
@ -34,17 +45,30 @@ def load_ssh_config(host_alias):
keyfile = host_config.get("identityfile", [None])[0]
if not all([hostname, username, keyfile]):
logger.error(f"Incomplete SSH configuration for alias '{host_alias}'")
raise ValueError(f"Missing SSH configuration for {host_alias}.")
logger.error(f"[SSH] Incomplete SSH configuration for alias '{host_alias}'")
raise ValueError(f"[SSH] Missing SSH configuration for {host_alias}.")
logger.debug(f"SSH config loaded: host={hostname}, port={port}, user={username}, key={keyfile}")
logger.debug(f"[SSH] SSH config loaded: host={hostname}, port={port}, user={username}, key={keyfile}")
return hostname, port, username, keyfile
def create_ssh_client(hostname, port, username, keyfile):
def create_ssh_client(hostname: str, port: int, username: str, keyfile: str)-> paramiko.SSHClient:
"""
Create and return a connected Paramiko SSHClient instance.
Args:
hostname: Remote hostname
port: SSH port
username: SSH username
keyfile: Path to SSH private key file
Returns:
Connected SSHClient instance (caller must close it)
Raises:
Exception: If SSH connection fails
"""
logger.debug("[SSH] Creating SSH client")
try:
ssh = paramiko.SSHClient()
ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts"))
@ -52,21 +76,27 @@ def create_ssh_client(hostname, port, username, keyfile):
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey)
logger.debug("SSH connection established successfully")
logger.debug("[SSH] SSH connection established successfully")
return ssh
except Exception as e:
logger.error(f"SSH connection failed: {e}", exc_info=True)
logger.error(f"[SSH] SSH connection failed: {e}", exc_info=True)
raise
def upload_via_sftp(filename):
def upload_via_sftp(filename) -> None:
"""
Upload a file to the remote host via SFTP using SSH credentials.
Args:
filename: Local file path to upload
Raises:
Exception: If upload fails
"""
logger.info(f"Preparing to upload file '{filename}' via SFTP")
logger.info(f"[SFTP] Preparing to upload file '{filename}' via SFTP")
try:
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
logger.debug(f"Connecting to {hostname}:{port} for file upload")
logger.debug(f"[SFTP] Connecting to {hostname}:{port} for file upload")
transport = paramiko.Transport((hostname, port))
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
@ -74,62 +104,77 @@ def upload_via_sftp(filename):
sftp = paramiko.SFTPClient.from_transport(transport)
remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename))
logger.info(f"Uploading to remote path: {remote_file}")
logger.info(f"[SFTP] Uploading to remote path: {remote_file}")
sftp.put(filename, remote_file)
sftp.close()
transport.close()
logger.info(f"File '{filename}' uploaded successfully")
logger.info(f"[SFTP] File '{filename}' uploaded successfully")
except Exception as e:
logger.error(f"SFTP upload failed for '{filename}': {e}", exc_info=True)
logger.error(f"[SFTP] SFTP upload failed for '{filename}': {e}", exc_info=True)
raise
def send_notification_via_ssh(notification_title, notification_info):
def send_notification_via_ssh(notification_title, notification_info) -> None:
"""
Send a JSON-formatted notification payload via SSH to the backend.
Args:
notification_title: Title of the notification
notification_info: Body/content of the notification
Raises:
Exception: If notification sending fails
"""
logger.info(f"Sending SSH notification: {notification_title}")
logger.info(f"[Notification] Sending SSH notification: {notification_title}")
ssh = None
try:
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
with create_ssh_client(hostname, port, username, keyfile) as ssh:
data = {
"receipent_user_id": 1,
"message": {
"title": notification_title,
"body": notification_info,
"category": "podcasts",
"timestamp": int(time.time())
}
ssh = create_ssh_client(hostname, port, username, keyfile)
data = {
"receipent_user_id": 1,
"message": {
"title": notification_title,
"body": notification_info,
"category": "podcasts",
"timestamp": int(time.time())
}
json_payload = dumps(data)
logger.debug(f"Notification payload: {json_payload}")
}
json_payload = dumps(data)
logger.debug(f"[Notification] Notification payload: {json_payload}")
notification_cmd = (
f"API_KEY=$(head -n1) && "
f"curl -s -X POST '{BACKEND_API_URL}' "
f"-H 'Content-Type: application/json' "
f"-H \"X-API-Key-Internal: $API_KEY\" "
f"-d '{json_payload}'"
)
escaped_payload = shlex.quote(json_payload)
escaped_url = shlex.quote(BACKEND_API_URL)
stdin, stdout, stderr = ssh.exec_command(notification_cmd)
stdin.write(f"{BACKEND_API_KEY}\n")
stdin.flush()
stdin.channel.shutdown_write()
notification_cmd = (
f"API_KEY=$(head -n1) && "
f"curl -s -X POST {escaped_url} "
f"-H 'Content-Type: application/json' "
f"-H \"X-API-Key-Internal: $API_KEY\" "
f"-d {escaped_payload}"
)
exit_status = stdout.channel.recv_exit_status()
response_output = stdout.read().decode()
stdin, stdout, stderr = ssh.exec_command(notification_cmd)
stdin.write(f"{BACKEND_API_KEY}\n")
stdin.flush()
stdin.channel.shutdown_write()
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
logger.info("Notification sent successfully")
logger.debug(f"Response: {response_output}")
else:
error_output = stderr.read().decode()
logger.warning(f"Notification command exited with {exit_status}")
logger.warning(f"Error: {error_output}")
logger.warning(f"Response: {response_output}")
exit_status = stdout.channel.recv_exit_status()
response_output = stdout.read().decode()
if exit_status == 0:
logger.info("[Notification] Notification sent successfully")
logger.debug(f"[Notification] Response: {response_output}")
else:
error_output = stderr.read().decode()
logger.warning(f"[Notification] Notification command exited with {exit_status}")
logger.warning(f"[Notification] Error: {error_output}")
logger.warning(f"[Notification] Response: {response_output}")
except Exception as e:
logger.error(f"Failed to send SSH notification: {e}", exc_info=True)
logger.error(f"[Notification] Failed to send SSH notification: {e}", exc_info=True)
raise
finally:
if ssh:
ssh.close()
logger.debug("[Notification] SSH connection closed")

View File

@ -4,8 +4,10 @@ import contextlib
from dotenv import load_dotenv
import os
from helper import return_string_as_html
from logger_handler import setup_logger
from simple_logger_handler import setup_logger
import json
import sponsorblock as sb
logger = setup_logger(__name__)
load_dotenv()
@ -16,7 +18,7 @@ def get_url_for_latest_video():
"""
Fetch the URL of the latest video from a YouTube channel.
"""
logger.info("Fetching latest video URL from YouTube channel")
logger.info("[YouTube] Fetching latest video URL from YouTube channel")
options = {
"extract_flat": True,
"playlist_items": "1",
@ -30,15 +32,15 @@ def get_url_for_latest_video():
with yt_dlp.YoutubeDL(options) as video:
info_dict = video.extract_info(YOUTUBE_CHANNEL_URL, download=False)
except Exception as e:
logger.error(f"Failed to fetch latest video info: {e}", exc_info=True)
logger.error(f"[YouTube] Failed to fetch latest video info: {e}", exc_info=True)
return None
if "entries" in info_dict and len(info_dict["entries"]) > 0:
latest_url = info_dict["entries"][0]["url"]
logger.debug(f"Latest video URL found: {latest_url}")
logger.debug(f"[YouTube] Latest video URL found: {latest_url}")
return latest_url
else:
logger.warning("No entries found in channel feed")
logger.warning("[YouTube] No entries found in channel feed")
return None
def get_youtube_data(url: str) -> dict:
@ -50,7 +52,7 @@ def get_youtube_data(url: str) -> dict:
with yt_dlp.YoutubeDL({"quiet": True, "noprogress": True}) as video:
info_dict = video.extract_info(url, download=False)
except Exception as e:
logger.error(f"Failed to fetch YouTube video info for {url}: {e}", exc_info=True)
logger.error(f"[YouTube] Failed to fetch YouTube video info for {url}: {e}", exc_info=True)
return {}
video_data = {
@ -58,13 +60,23 @@ def get_youtube_data(url: str) -> dict:
info_dict["timestamp"], datetime.timezone.utc
).strftime("%Y-%m-%d"),
"title": info_dict["title"],
"description": return_string_as_html(info_dict["description"]),
"upload_date": info_dict["upload_date"]
"description": info_dict.get("description", "")
}
logger.debug(f"Fetched video data: {json.dumps(video_data, indent=4)}")
logger.debug(f"[YouTube] Fetched video data: {json.dumps(video_data, indent=4)}")
return video_data
def check_for_sponsorblock_segments(youtube_video:str) -> bool:
client = sb.Client()
try:
segments = client.get_skip_segments(youtube_video)
except sb.errors.NotFoundException:
logger.debug(f"[SponsorBlock] No SponsorBlock information for video:{youtube_video}")
return False
if segments:
logger.info(f"[SponsorBlock] SponsorBlock segments found for video: {youtube_video}")
return True
def return_download_options(information:dict,track:str)->dict:
download_options = {
@ -101,10 +113,13 @@ def return_download_options(information:dict,track:str)->dict:
"-metadata", "artist=Perun",
"-metadata", f"track={track}",
"-metadata", f"date={information['date']}",
"-metadata", f"comment={information['description']}",
"-metadata", f"description={information['description']}",
"-metadata", f"comment={return_string_as_html(information['description'])}",
"-metadata", f"description={return_string_as_html(information['description'])}",
],
"merge_output_format": "mp3"
}
logger.debug(f"Created download options:\n {json.dumps(download_options, indent=4)}")
logger.debug(f"[YouTube] Created download options:\n {json.dumps(download_options, indent=4)}")
return download_options
if __name__ == "__main__":
print(check_for_sponsorblock_segments("https://www.youtube.com/watch?v=M0t8UYZ9rrQ"))