diff --git a/src/perun/README.md b/src/perun/README.md
index 0a67c5d..29fdda6 100644
--- a/src/perun/README.md
+++ b/src/perun/README.md
@@ -1,18 +1,75 @@
-# Perun
+# Perun YouTube Podcast Downloader
+
+A Python script that automatically downloads the latest video from the Perun YouTube channel, converts it to MP3 with metadata, removes sponsor segments, and uploads it to a podcast server.
+
+## Features
+
+- **Automatic Detection**: Checks for new episodes by comparing with Audiobookshelf library
+- **Audio Conversion**: Downloads and converts YouTube videos to MP3 format
+- **Sponsor Removal**: Uses SponsorBlock API to remove sponsored segments
+- **Metadata Injection**: Adds title, artist, track number, date, and description to MP3 files
+- **SFTP Upload**: Automatically uploads to remote podcast server
+- **Push Notifications**: Sends notification when new episode is available
+
+## Prerequisites
+
+- Python 3.8+
+- yt-dlp
+- ffmpeg (for audio conversion)
+- SSH key-based authentication configured
+- Audiobookshelf server with API access
-Youtube blocks a lot of server IPs so running this locally is just easier, expects the following environment variables in a .env file:
+## Installation
-REMOTE_HOSTNAME
+```bash
+pip install -r requirements.txt
+```
+Install ffmpeg:
+```bash
+# Ubuntu/Debian
+sudo apt install ffmpeg
-REMOTE_PATH
+# macOS
+brew install ffmpeg
+```
+## Usage
-BACKEND_API_URL
+Run the script manually:
+```bash
+python get_episode.py
+```
-BACKEND_API_KEY
+Or schedule with cron and use the provided `grabEpisode.sh` (Monday at 7 AM):
+```bash
+0 7 * * 1 /path/to/script/grabEpisode.sh
+```
-YOUTUBE_CHANNEL_URL
+Youtube blocks a lot of server IPs so running this locally is just easier.
-PODCAST_AUTHORIZATION_TOKEN
+## Configuration
-PODCAST_API_URL
\ No newline at end of file
+Create a `.env` file with the following variables:
+
+```env
+# YouTube channel to monitor
+YOUTUBE_CHANNEL_URL=https://www.youtube.com/@PerunAU/videos
+
+# Audiobookshelf API
+PODCAST_API_URL=https://your-audiobookshelf.com/api/items/{item-id}
+PODCAST_AUTHORIZATION_TOKEN=your_token_here
+
+# SFTP upload destination
+REMOTE_HOSTNAME=your_ssh_host_alias
+REMOTE_PATH=/path/to/podcast/folder
+
+# Backend notification service
+BACKEND_API_URL=http://backend:8101/internal/receive-notifications
+BACKEND_API_KEY=your_api_key
+```
+
+## Output
+
+MP3 files are named: `perun-YYYY-MM-DD.mp3`
+
+Example: `perun-2025-10-19.mp3`
diff --git a/src/perun/get_episode.py b/src/perun/get_episode.py
index b40763b..1e3d780 100644
--- a/src/perun/get_episode.py
+++ b/src/perun/get_episode.py
@@ -2,10 +2,11 @@ import requests
import yt_dlp
import os
from dotenv import load_dotenv
-from helper import log_message
from ssh_helper import upload_via_sftp, send_notification_via_ssh
from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options
+from logger_handler import setup_logger
+logger = setup_logger(__name__)
load_dotenv()
PODCAST_AUTHORIZATION_TOKEN = os.getenv("PODCAST_AUTHORIZATION_TOKEN")
@@ -19,48 +20,53 @@ def get_audiobookshelf_data()->tuple[int | None, str | None]:
response = requests.get(PODCAST_API_URL, headers=headers)
response.raise_for_status()
result = response.json()
+
audiobookshelf_track = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTrack"]
audiobookshelf_title = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTitle"]
+
+ logger.debug(f"Fetched Audiobookshelf data: track={audiobookshelf_track}, title={audiobookshelf_title}")
return audiobookshelf_track, audiobookshelf_title
except requests.RequestException as e:
- log_message(f"Failed to fetch data: {e}")
+ logger.warning(f"Failed to fetch Audiobookshelf data: {e}")
return None
def download_episode():
- log_message("Starting Perun")
+ logger.info("Starting Perun")
audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data()
if audiobookshelf_track is None or audiobookshelf_title is None:
- log_message("Unable to fetch Audiobookshelf data. Exiting.")
+ logger.warning("Unable to fetch Audiobookshelf data. Exiting.")
return
episode_url = get_url_for_latest_video()
episode_info = get_youtube_data(episode_url)
- log_message(f"Latest episode: {episode_info['title']}")
+ logger.info(f"Latest YouTube episode: {episode_info['title']}")
if audiobookshelf_title != episode_info["title"]:
- log_message("New Episode found")
+ logger.info("New episode found")
track = str(int(audiobookshelf_track) + 1).zfill(4)
options = return_download_options(episode_info,track)
- log_message("Downloading episode")
+ logger.info("Downloading new episode")
try:
with yt_dlp.YoutubeDL(options) as episode:
episode.download(episode_url)
+ logger.debug("Download completed successfully")
except Exception as e:
- log_message(f"Failed to download episode: {e}")
+ logger.error(f"Failed to download episode: {e}", exc_info=True)
return
- log_message("Uploading episode")
+ logger.info("Uploading episode via SFTP")
upload_via_sftp(f"perun-{episode_info['date']}.mp3")
- log_message("Finished uploading, sending notification")
+
+ logger.info("Sending release notification")
send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"])
- log_message("Finished")
+ logger.info("Workflow complete")
else:
- log_message("No new episode found, exiting...")
+ logger.debug("No new episode found, exiting.")
if __name__ == "__main__":
download_episode()
diff --git a/src/perun/helper.py b/src/perun/helper.py
index 7ad7e6c..0eef539 100644
--- a/src/perun/helper.py
+++ b/src/perun/helper.py
@@ -1,5 +1,4 @@
import re
-import datetime
def return_string_as_html(input_text):
string_without_ads=""
@@ -8,8 +7,3 @@ def return_string_as_html(input_text):
if not "Sponsored" in line:
string_without_ads+=line+"\n"
return("
"+string_without_ads.replace("\n\n", "
\n").replace("\n", "
")+"
")
-
-def log_message(message):
- timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- print(f"[{timestamp}] {message}")
- return(f"[{timestamp}] {message}\n")
\ No newline at end of file
diff --git a/src/perun/logger_handler.py b/src/perun/logger_handler.py
new file mode 100644
index 0000000..3911736
--- /dev/null
+++ b/src/perun/logger_handler.py
@@ -0,0 +1,19 @@
+import logging
+import os
+
+LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
+if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
+ LOG_LEVEL = "INFO"
+
+def setup_logger(name: str) -> logging.Logger:
+ logger = logging.getLogger(name)
+ if not logger.handlers:
+ handler = logging.StreamHandler()
+ formatter = logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+ )
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+ logger.setLevel(getattr(logging, LOG_LEVEL))
+ logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}")
+ return logger
diff --git a/src/perun/ssh_helper.py b/src/perun/ssh_helper.py
index 8c59cd0..d6c0457 100644
--- a/src/perun/ssh_helper.py
+++ b/src/perun/ssh_helper.py
@@ -2,7 +2,10 @@ import paramiko
import os
from dotenv import load_dotenv
from json import dumps
+from logger_handler import setup_logger
+import time
+logger = setup_logger(__name__)
load_dotenv()
REMOTE_HOSTNAME = os.getenv("REMOTE_HOSTNAME")
REMOTE_PATH = os.getenv("REMOTE_PATH")
@@ -10,10 +13,20 @@ BACKEND_API_URL = os.getenv("BACKEND_API_URL")
BACKEND_API_KEY= os.getenv("BACKEND_API_KEY")
def load_ssh_config(host_alias):
+ """
+ Load SSH connection details from ~/.ssh/config for the given alias.
+ """
+ logger.debug(f"Loading SSH configuration for host alias '{host_alias}'")
ssh_config = paramiko.SSHConfig()
config_path = os.path.expanduser("~/.ssh/config")
- with open(config_path) as f:
- ssh_config.parse(f)
+
+ try:
+ with open(config_path) as f:
+ ssh_config.parse(f)
+ except FileNotFoundError:
+ logger.error(f"SSH config file not found at {config_path}")
+ raise
+
host_config = ssh_config.lookup(host_alias)
hostname = host_config.get("hostname")
port = int(host_config.get("port", 22))
@@ -21,57 +34,95 @@ def load_ssh_config(host_alias):
keyfile = host_config.get("identityfile", [None])[0]
if not all([hostname, username, keyfile]):
+ logger.error(f"Incomplete SSH configuration for alias '{host_alias}'")
raise ValueError(f"Missing SSH configuration for {host_alias}.")
+ logger.debug(f"SSH config loaded: host={hostname}, port={port}, user={username}, key={keyfile}")
return hostname, port, username, keyfile
def create_ssh_client(hostname, port, username, keyfile):
- ssh = paramiko.SSHClient()
- ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts"))
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- pkey = paramiko.RSAKey.from_private_key_file(keyfile)
- ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey)
- return ssh
+ """
+ Create and return a connected Paramiko SSHClient instance.
+ """
+ try:
+ ssh = paramiko.SSHClient()
+ ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts"))
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ pkey = paramiko.RSAKey.from_private_key_file(keyfile)
+ ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey)
+ logger.debug("SSH connection established successfully")
+ return ssh
+ except Exception as e:
+ logger.error(f"SSH connection failed: {e}", exc_info=True)
+ raise
def upload_via_sftp(filename):
- hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
+ """
+ Upload a file to the remote host via SFTP using SSH credentials.
+ """
+ logger.info(f"Preparing to upload file '{filename}' via SFTP")
+ try:
+ hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
+ logger.debug(f"Connecting to {hostname}:{port} for file upload")
- transport = paramiko.Transport((hostname, port))
- pkey = paramiko.RSAKey.from_private_key_file(keyfile)
- transport.connect(username=username, pkey=pkey)
- sftp = paramiko.SFTPClient.from_transport(transport)
+ transport = paramiko.Transport((hostname, port))
+ pkey = paramiko.RSAKey.from_private_key_file(keyfile)
+ transport.connect(username=username, pkey=pkey)
+ sftp = paramiko.SFTPClient.from_transport(transport)
- remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename))
- sftp.put(filename, remote_file)
+ remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename))
+ logger.info(f"Uploading to remote path: {remote_file}")
+ sftp.put(filename, remote_file)
- sftp.close()
- transport.close()
+ sftp.close()
+ transport.close()
+ logger.info(f"File '{filename}' uploaded successfully")
+ except Exception as e:
+ logger.error(f"SFTP upload failed for '{filename}': {e}", exc_info=True)
+ raise
def send_notification_via_ssh(notification_title, notification_info):
- hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
-
- with create_ssh_client(hostname, port, username, keyfile) as ssh:
- data = {
- "receipent_user_id": 1,
- "message": {
- "title": notification_title,
- "info": notification_info,
- "category": "mixtapes"
+ """
+ Send a JSON-formatted notification payload via SSH to the backend.
+ """
+ logger.info(f"Sending SSH notification: {notification_title}")
+ try:
+ hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
+ with create_ssh_client(hostname, port, username, keyfile) as ssh:
+ data = {
+ "receipent_user_id": 1,
+ "message": {
+ "title": notification_title,
+ "body": notification_info,
+ "category": "mixtapes",
+ "timestamp": int(time.time())
+ }
}
- }
- json_payload = dumps(data)
+ json_payload = dumps(data)
+ logger.debug(f"Notification payload: {json_payload}")
- # Command reads API key and JSON from stdin
- notification_cmd = (
- f"curl -s -X POST '{BACKEND_API_URL}' "
- f"-H 'Content-Type: application/json' "
- f"-H 'X-API-Key-Internal: $(head -n1)' "
- f"-d @-"
- )
- stdin, stdout, stderr = ssh.exec_command(notification_cmd)
- stdin.write(f"{BACKEND_API_KEY}\n{json_payload}")
- stdin.flush()
- stdin.channel.shutdown_write()
\ No newline at end of file
+ notification_cmd = (
+ f"curl -s -X POST '{BACKEND_API_URL}' "
+ f"-H 'Content-Type: application/json' "
+ f"-H 'X-API-Key-Internal: $(head -n1)' "
+ f"-d @-"
+ )
+
+ stdin, stdout, stderr = ssh.exec_command(notification_cmd)
+ stdin.write(f"{BACKEND_API_KEY}\n{json_payload}")
+ stdin.flush()
+ stdin.channel.shutdown_write()
+
+ exit_status = stdout.channel.recv_exit_status()
+ if exit_status == 0:
+ logger.info("Notification sent successfully")
+ else:
+ error_output = stderr.read().decode()
+ logger.warning(f"Notification command exited with {exit_status}: {error_output}")
+ except Exception as e:
+ logger.error(f"Failed to send SSH notification: {e}", exc_info=True)
+ raise
\ No newline at end of file
diff --git a/src/perun/youtube_handler.py b/src/perun/youtube_handler.py
index d75ce88..3591c28 100644
--- a/src/perun/youtube_handler.py
+++ b/src/perun/youtube_handler.py
@@ -4,13 +4,19 @@ import contextlib
from dotenv import load_dotenv
import os
from helper import return_string_as_html
+from logger_handler import setup_logger
+import json
+logger = setup_logger(__name__)
load_dotenv()
-
YOUTUBE_CHANNEL_URL = os.getenv("YOUTUBE_CHANNEL_URL")
def get_url_for_latest_video():
+ """
+ Fetch the URL of the latest video from a YouTube channel.
+ """
+ logger.info("Fetching latest video URL from YouTube channel")
options = {
"extract_flat": True,
"playlist_items": "1",
@@ -18,22 +24,50 @@ def get_url_for_latest_video():
"forcejson": True,
"simulate": True,
}
- with open(os.devnull, "w") as devnull:
- with contextlib.redirect_stdout(devnull):
- with yt_dlp.YoutubeDL(options) as video:
- info_dict = video.extract_info(YOUTUBE_CHANNEL_URL, download = False)
- if "entries" in info_dict and len(info_dict["entries"]) > 0:
- return info_dict["entries"][0]["url"]
-def get_youtube_data(url):
- with yt_dlp.YoutubeDL({"quiet":True,"noprogress":True}) as video:
- info_dict = video.extract_info(url, download = False)
- return {"date":datetime.datetime.fromtimestamp(info_dict["timestamp"], datetime.timezone.utc).strftime("%Y-%m-%d"),"title":info_dict["title"],
- "description":return_string_as_html(info_dict["description"]),"upload_date":info_dict["upload_date"]}
+ try:
+ with open(os.devnull, "w") as devnull, contextlib.redirect_stdout(devnull):
+ with yt_dlp.YoutubeDL(options) as video:
+ info_dict = video.extract_info(YOUTUBE_CHANNEL_URL, download=False)
+ except Exception as e:
+ logger.error(f"Failed to fetch latest video info: {e}", exc_info=True)
+ return None
+
+ if "entries" in info_dict and len(info_dict["entries"]) > 0:
+ latest_url = info_dict["entries"][0]["url"]
+ logger.debug(f"Latest video URL found: {latest_url}")
+ return latest_url
+ else:
+ logger.warning("No entries found in channel feed")
+ return None
+
+def get_youtube_data(url: str) -> dict:
+ """
+ Fetch metadata for a given YouTube video URL.
+ """
+ logger.info(f"Fetching YouTube metadata for video: {url}")
+ try:
+ with yt_dlp.YoutubeDL({"quiet": True, "noprogress": True}) as video:
+ info_dict = video.extract_info(url, download=False)
+ except Exception as e:
+ logger.error(f"Failed to fetch YouTube video info for {url}: {e}", exc_info=True)
+ return {}
+
+ video_data = {
+ "date": datetime.datetime.fromtimestamp(
+ info_dict["timestamp"], datetime.timezone.utc
+ ).strftime("%Y-%m-%d"),
+ "title": info_dict["title"],
+ "description": return_string_as_html(info_dict["description"]),
+ "upload_date": info_dict["upload_date"]
+ }
+
+ logger.debug(f"Fetched video data: {json.dumps(video_data, indent=4)}")
+ return video_data
def return_download_options(information:dict,track:str)->dict:
- return {
+ download_options = {
"quiet": True,
"noprogress": True,
"format": "bestaudio/best",
@@ -71,4 +105,6 @@ def return_download_options(information:dict,track:str)->dict:
"-metadata", f"description={information['description']}",
],
"merge_output_format": "mp3"
- }
\ No newline at end of file
+ }
+ logger.debug(f"Created download options:\n {json.dumps(download_options, indent=4)}")
+ return download_options
\ No newline at end of file