Compare commits

..

1 Commits

Author SHA1 Message Date
53f57878a1 BBC Sounds is broken on yt-dlp 2025-10-19 17:13:56 +02:00
23 changed files with 411 additions and 1009 deletions

View File

@ -1,17 +0,0 @@
name: Build image with python3,get-iplayer
on:
schedule:
- cron: '0 0 * * 4'
workflow_dispatch:
env:
DOCKER_IMAGE_NAME: bbcr1
jobs:
build_docker_images:
name: Build Docker Image
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Build
working-directory: ./src/petetong
run: docker build . --tag $DOCKER_IMAGE_NAME:latest

171
.gitignore vendored
View File

@ -1,171 +0,0 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
*.mp3
*.m4a

View File

@ -1,13 +1,3 @@
# Media Download Scripts
For now BBC sounds links are broken with `yt-dlp` so I am switching to `get_iplayer` and keeping this as a reference
A collection of Python scripts for automatically downloading and processing podcast episodes from various sources.
### [Perun YouTube Downloader](src/perun/)
Downloads the latest video from the Perun YouTube channel, converts to MP3 with metadata and sponsor segment removal, and uploads to a podcast server.
### [BBC Radio 1 Pete Tong Downloader](src/petetong/)
Downloads the latest Pete Tong radio show from BBC iPlayer Radio, converts to MP3 with metadata, and sends notifications.
## Setup
Each project has its own README with detailed installation and configuration instructions. Navigate to the respective directories for more information.
See: https://github.com/yt-dlp/yt-dlp/issues/14569

Binary file not shown.

26
src/bbcr1/config.py Normal file
View File

@ -0,0 +1,26 @@
settings = {
"Pete Tong":{
"artist": "Pete Tong",
"base_url":"https://www.bbc.co.uk/programmes/b006ww0v",
"cut_intro":True,
"modify_timestamp":7200,
"calculate_amount_of_fridays":True
},
"Radio 1s Classic Essential Mix":{
"artist":"Radio 1s Classic Essential Mix",
"use_different_release_date":True,
"base_url":"https://www.bbc.co.uk/programmes/b00f3pc4",
"cut_intro":True,
"remove_amount_of_characters_from_title":-5
},
"Defected on Radio 1 Dance":{
"artist": "Defected on Radio 1 Dance",
"base_url":"https://www.bbc.co.uk/programmes/m00287n1",
"remove_amount_of_characters_from_title":-10
},
"Radio 1s Essential Mix":{
"artist":"Radio 1s Essential Mix",
"base_url":"https://www.bbc.co.uk/programmes/b006wkfp",
"cut_intro":True
}
}

162
src/bbcr1/get_episode.py Normal file
View File

@ -0,0 +1,162 @@
import yt_dlp
import subprocess
import tempfile
import sys
from datetime import datetime, timezone
from config import settings
from os import rename, remove
from helper import modify_chapters_for_ffmpeg, get_friday_number, return_url_of_latest_episode
import logging
from ytdlp_helper import return_episode_data
logger = logging.getLogger(__name__)
def _apply_configurations(configuration_settings: dict, episode_data):
"""
Apply configuration settings to episode data.
Returns:
tuple: (episode_data, filename_timestamp, track)
"""
if "remove_amount_of_characters_from_title" in configuration_settings:
amount_to_remove = configuration_settings["remove_amount_of_characters_from_title"]
episode_data.extracted_title = episode_data.extracted_title[:amount_to_remove]
if "modify_timestamp" in configuration_settings:
episode_data.extracted_timestamp -= configuration_settings["modify_timestamp"]
if "use_different_release_date" in configuration_settings:
if len(sys.argv) > 2:
filename_timestamp = sys.argv[2]
else:
logger.warning("Use_different_release_date set but missing sys.argv[2]. Falling back to default.")
filename_timestamp = datetime.fromtimestamp(episode_data.extracted_timestamp, tz=timezone.utc).strftime("%Y-%m-%d")
else:
filename_timestamp = datetime.fromtimestamp(episode_data.extracted_timestamp, tz=timezone.utc).strftime("%Y-%m-%d")
if "calculate_amount_of_fridays" in configuration_settings:
track = get_friday_number(episode_data.extracted_timestamp)
else:
track = filename_timestamp
return episode_data, filename_timestamp, track
def _prepare_ffmpeg_chapters(episode_data, configuration_settings):
"""
Prepare chapters for FFmpeg if cutting intro is requested.
"""
if not episode_data.chapters or len(episode_data.chapters) < 2:
logger.warning("Cutting intro requested but no chapters found.")
return None
return modify_chapters_for_ffmpeg(
episode_data.chapters[1:], episode_data.chapters[0]["end_time"]
)
def _download_audio(episode_url: str, episode_data, filename_timestamp: str, track: str, artist: str):
"""
Download episode audio using yt_dlp with metadata.
"""
ytdl_options = {
"quiet": True,
"noprogress": True,
"format": "bestaudio/best",
"extract_audio": True,
"audio_format": "mp3",
"outtmpl": f"{filename_timestamp}.%(ext)s",
"addmetadata": True,
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
},
{
"key": "FFmpegMetadata",
}
],
"postprocessor_args": [
"-metadata", f"title={episode_data.extracted_title}",
"-metadata", f"artist={artist}",
"-metadata", f"track={track}",
"-metadata", f"date={filename_timestamp}",
"-metadata", f"comment={episode_data.extracted_description}"
],
"merge_output_format": "mp3"
}
with yt_dlp.YoutubeDL(ytdl_options) as episode:
episode.download(episode_url)
def _cut_intro_with_ffmpeg(ffmpeg_chapters: str, episode_data, filename_timestamp: str, track: str, artist: str):
"""
Cut the intro from the episode using FFmpeg and apply metadata.
"""
logger.info("Fixing chapters and metadata with FFmpeg")
temp_metadata_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".txt") as temp_file:
temp_file.write(ffmpeg_chapters)
temp_metadata_path = temp_file.name
ffmpeg_command = [
"ffmpeg", "-ss", str(episode_data.chapters[0]["end_time"]),
"-hide_banner", "-loglevel", "error",
"-i", f"{filename_timestamp}.mp3",
"-i", temp_metadata_path,
"-map_metadata", "1",
"-metadata", f"title={episode_data.extracted_title}",
"-metadata", f"artist={artist}",
"-metadata", f"track={track}",
"-metadata", f"date={filename_timestamp}",
"-metadata", f"comment={episode_data.extracted_description}",
"-codec", "copy",
f"{filename_timestamp}-{episode_data.extracted_id}.mp3"
]
subprocess.run(ffmpeg_command, check=True)
remove(f"{filename_timestamp}.mp3")
except subprocess.CalledProcessError as e:
logger.error(f"Error running FFmpeg: {e}")
finally:
if temp_metadata_path and remove:
try:
remove(temp_metadata_path)
except Exception as ex:
logger.warning(f"Could not remove temp metadata file: {ex}")
def download_episode(configuration_settings: dict, episode_url: str):
logger.info("Extracting metadata")
episode_data = return_episode_data(episode_url)
episode_data, filename_timestamp, track = _apply_configurations(configuration_settings, episode_data)
artist = configuration_settings.get("artist", sys.argv[1] if len(sys.argv) > 1 else "Unknown Artist")
ffmpeg_chapters = None
if configuration_settings.get("cut_intro"):
ffmpeg_chapters = _prepare_ffmpeg_chapters(episode_data, configuration_settings)
logger.info("Downloading episode")
_download_audio(episode_url, episode_data, filename_timestamp, track, artist)
if ffmpeg_chapters:
_cut_intro_with_ffmpeg(ffmpeg_chapters, episode_data, filename_timestamp, track, artist)
else:
rename(f"{filename_timestamp}.mp3", f"{filename_timestamp}-{episode_data.extracted_id}.mp3")
logger.info("Finished")
if __name__ == "__main__":
show_name = sys.argv[1]
logger.info (f"Processing {show_name}")
episode_url = return_url_of_latest_episode(settings[show_name]["base_url"])
download_episode(settings[show_name],episode_url)

52
src/bbcr1/helper.py Normal file
View File

@ -0,0 +1,52 @@
from datetime import datetime, timezone, timedelta
from typing import List, Dict
import subprocess
def time_to_milliseconds(time,length_to_cut) -> int:
return int(time * 1000 - length_to_cut * 1000)
def add_html_tags_to_description(input_text) -> str:
return("<p>"+input_text.replace("\n\n", "</p>\n<p>").replace("\n", "<br>")+"</p>")
def get_friday_number(extracted_timestamp) -> int:
dt = datetime.fromtimestamp(extracted_timestamp)
start_of_year = datetime(dt.year, 1, 1)
days_until_first_friday = (4 - start_of_year.weekday()) % 7
first_friday = start_of_year + timedelta(days=days_until_first_friday)
fridays_passed = (dt - first_friday).days // 7 + 1
return fridays_passed
def return_url_of_latest_episode(base_url:str) -> str:
result = subprocess.run(["get_iplayer","--pid-recursive-list",base_url], capture_output=True, text=True)
latest_episode_id = result.stdout.split("\n")[-3].split(",")[-1][1:]
return (f"https://www.bbc.co.uk/sounds/play/{latest_episode_id}")
def modify_chapters_for_ffmpeg(chapters: List[Dict], length_to_cut: float) -> str:
"""
Converts chapter times to ffmpeg-compatible metadata format, adjusting by length_to_cut.
Args:
chapters (list): List of chapter dicts with "start_time", "end_time", and "title".
length_to_cut (int/float): Amount of time to cut from start, in seconds.
Returns:
str: Chapters formatted as ffmpeg metadata.
"""
for entry in chapters:
if "start_time" in entry:
entry["start_time"]=time_to_milliseconds(entry["start_time"],length_to_cut)
if "end_time" in entry:
entry["end_time"]=time_to_milliseconds(entry["end_time"],length_to_cut)
chapter_format = ";FFMETADATA1\n"
for entry in chapters:
chapter_format+=("[CHAPTER]\n")
chapter_format+=("TIMEBASE=1/1000\n")
chapter_format+=(f"START={entry['start_time']}\n")
chapter_format+=(f"END={entry['end_time']}\n")
chapter_format+=(f"title={entry['title']}\n\n")
return(chapter_format)

47
src/bbcr1/ytdlp_helper.py Normal file
View File

@ -0,0 +1,47 @@
import yt_dlp
from helper import add_html_tags_to_description
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class EpisodeData:
chapters: List
extracted_description: str
extracted_id: str
extracted_title: str
extracted_timestamp: Optional[int]
def return_episode_data(episode_url: str) -> EpisodeData:
"""
Quietly extracts meta information about a given radio show.
Args:
episode_url (str): The URL of the episode.
Returns:
EpisodeData: A dataclass containing episode metadata:
- chapters (List): Chapters in JSON format.
- extracted_description (str): HTML-wrapped description of the episode.
- extracted_id (str): Unique episode ID.
- extracted_title (str): Episode title.
- extracted_timestamp (Optional[int]): Airing timestamp (epoch seconds), if available.
"""
try:
with yt_dlp.YoutubeDL({"quiet": True, "noprogress": True}) as ydl:
info_dict = ydl.extract_info(episode_url, download=False)
except Exception as e:
return {"error": f"Failed to extract info: {e}"}
return EpisodeData(
chapters=info_dict.get("chapters", []),
extracted_description=add_html_tags_to_description(info_dict.get("description", "")),
extracted_id=info_dict.get("id", ""),
extracted_title=info_dict.get("title", ""),
extracted_timestamp=info_dict.get("timestamp"),
)
if __name__ == "__main__":
print(return_episode_data("https://www.bbc.co.uk/sounds/play/m002jtcqyt "))

View File

@ -1,75 +1,18 @@
# Perun YouTube Podcast Downloader
A Python script that automatically downloads the latest video from the Perun YouTube channel, converts it to MP3 with metadata, removes sponsor segments, and uploads it to a podcast server.
## Features
- **Automatic Detection**: Checks for new episodes by comparing with Audiobookshelf library
- **Audio Conversion**: Downloads and converts YouTube videos to MP3 format
- **Sponsor Removal**: Uses SponsorBlock API to remove sponsored segments
- **Metadata Injection**: Adds title, artist, track number, date, and description to MP3 files
- **SFTP Upload**: Automatically uploads to remote podcast server
- **Push Notifications**: Sends notification when new episode is available
## Prerequisites
- Python 3.8+
- yt-dlp
- ffmpeg (for audio conversion)
- SSH key-based authentication configured
- Audiobookshelf server with API access
# Perun
## Installation
Youtube blocks a lot of server IPs so running this locally is just easier, expects the following environment variables in a .env file:
```bash
pip install -r requirements.txt
```
Install ffmpeg:
```bash
# Ubuntu/Debian
sudo apt install ffmpeg
REMOTE_HOSTNAME
# macOS
brew install ffmpeg
```
## Usage
REMOTE_PATH
Run the script manually:
```bash
python get_episode.py
```
BACKEND_API_URL
Or schedule with cron and use the provided `grabEpisode.sh` (Monday at 7 AM):
```bash
0 7 * * 1 /path/to/script/grabEpisode.sh
```
BACKEND_API_KEY
Youtube blocks a lot of server IPs so running this locally is just easier.
YOUTUBE_CHANNEL_URL
## Configuration
PODCAST_AUTHORIZATION_TOKEN
Create a `.env` file with the following variables:
```env
# YouTube channel to monitor
YOUTUBE_CHANNEL_URL=https://www.youtube.com/@PerunAU/videos
# Audiobookshelf API
PODCAST_API_URL=https://your-audiobookshelf.com/api/items/{item-id}
PODCAST_AUTHORIZATION_TOKEN=your_token_here
# SFTP upload destination
REMOTE_HOSTNAME=your_ssh_host_alias
REMOTE_PATH=/path/to/podcast/folder
# Backend notification service
BACKEND_API_URL=http://backend:8101/internal/receive-notifications
BACKEND_API_KEY=your_api_key
```
## Output
MP3 files are named: `perun-YYYY-MM-DD.mp3`
Example: `perun-2025-10-19.mp3`
PODCAST_API_URL

View File

@ -1,190 +1,66 @@
import requests
import yt_dlp
import os
import time
from dotenv import load_dotenv
from helper import log_message
from ssh_helper import upload_via_sftp, send_notification_via_ssh
from youtube_handler import return_download_options, check_for_sponsorblock_segments
from simple_logger_handler import setup_logger
from rss_feed_handler import grab_latest_chapter_information, EpisodeData
logger = setup_logger(__name__)
from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options
load_dotenv()
PODCAST_AUTHORIZATION_TOKEN = os.getenv("PODCAST_AUTHORIZATION_TOKEN")
PODCAST_API_URL = os.getenv("PODCAST_API_URL")
def get_audiobookshelf_data() -> tuple[str | None, str | None]:
"""
Fetches the latest episode data from the Audiobookshelf API.
Returns:
tuple[int | None, str | None]:
- The track number as a string (or None if data could not be fetched due to retryable errors).
- The YouTube episode id as a string (or None if data could not be fetched due to retryable errors).
Raises:
requests.exceptions.HTTPError:
If a non-retryable HTTP error occurs (e.g., 401 Unauthorized, 403 Forbidden, 404 Not Found).
Notes:
- Connection errors, timeouts, and server-side HTTP errors (500, 502, 503, 504) are caught and logged.
In these cases, the function returns (None, None) so the caller can retry later.
"""
def get_audiobookshelf_data()->tuple[int | None, str | None]:
headers = {"Authorization": f"Bearer {PODCAST_AUTHORIZATION_TOKEN}"}
logger.debug("[Audiobookshelf] Fetching Audiobookshelf data")
try:
response = requests.get(PODCAST_API_URL, headers=headers)
response.raise_for_status()
result = response.json()
audiobookshelf_track = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTrack"]
audiobookshelf_ytid = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagDescription"]
audiobookshelf_title = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTitle"]
return audiobookshelf_track, audiobookshelf_title
logger.debug(f"[Audiobookshelf] Fetched Audiobookshelf data: track={audiobookshelf_track}, ytid={audiobookshelf_ytid}")
return (audiobookshelf_track, audiobookshelf_ytid)
except requests.exceptions.ConnectionError as e:
logger.warning(f"[Audiobookshelf] Connection error, will retry: {e}")
return (None, None)
except requests.exceptions.Timeout as e:
logger.warning(f"[Audiobookshelf] Request timed out, will retry: {e}")
return (None, None)
except requests.exceptions.HTTPError as e:
status = e.response.status_code
if status in {500, 502, 503, 504}:
logger.warning(f"[Audiobookshelf] Server error {status}, will retry: {e}")
return (None, None)
else:
logger.error(f"[Audiobookshelf] HTTP error {status}, not retrying: {e}")
raise
except requests.RequestException as e:
log_message(f"Failed to fetch data: {e}")
return None
def check_until_new_episode_gets_released() -> tuple[EpisodeData | None, str | None]:
"""
Polls YouTube every hour for a new episode and compares it to the available episode on Audiobookshelf.
Stops after 72 hours.
Returns:
tuple[EpisodeData | None, str | None]:
- EpisodeData with information about the date,description,link,title and YouTube ID
- Track number from Audiobookshelf
Returns (None, None) if no new episode found within timeout
"""
CHECK_INTERVAL_HOURS = 1
MAX_HOURS = 72
for attempt in range(1, MAX_HOURS + 1):
logger.debug(f"[EpisodeCheck] Waiting for a new episode to be released, attempt: {attempt}/{MAX_HOURS}")
audiobookshelf_track, audiobookshelf_ytid = get_audiobookshelf_data()
def download_episode():
log_message("Starting Perun")
if audiobookshelf_track is None or audiobookshelf_ytid is None:
logger.warning("[EpisodeCheck] Unable to fetch Audiobookshelf data, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
continue
audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data()
if audiobookshelf_track is None or audiobookshelf_title is None:
log_message("Unable to fetch Audiobookshelf data. Exiting.")
return
episode_url = get_url_for_latest_video()
episode_info = get_youtube_data(episode_url)
log_message(f"Latest episode: {episode_info['title']}")
if audiobookshelf_title != episode_info["title"]:
log_message("New Episode found")
track = str(int(audiobookshelf_track) + 1).zfill(4)
options = return_download_options(episode_info,track)
log_message("Downloading episode")
try:
episode_data = grab_latest_chapter_information("UCC3ehuUksTyQ7bbjGntmx3Q")
with yt_dlp.YoutubeDL(options) as episode:
episode.download(episode_url)
except Exception as e:
logger.warning(f"[EpisodeCheck] Failed to fetch latest video data: {e}, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
continue
if episode_data is None:
logger.warning("[EpisodeCheck] Unable to fetch latest video data, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
continue
if audiobookshelf_ytid != episode_data.episode_ytid:
logger.info(f"[EpisodeCheck] Latest YouTube episode: {episode_data.episode_title}")
return episode_data, audiobookshelf_track
logger.info("[EpisodeCheck] No new episode found, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
logger.warning("[EpisodeCheck] No new episode found after maximum attempts.")
return None, None
def wait_for_sponsorblock_segments_to_be_added(episode_link) -> bool:
"""
Polls SponsorBlock for segments on the current video until found or until max attempts.
Args:
episode_link: YouTube video URL to check for SponsorBlock segments
Returns:
True if segments found, False otherwise
"""
CHECK_INTERVAL_HOURS = 1
MAX_HOURS = 24
for attempt in range(1, MAX_HOURS + 1):
logger.debug(f"[SponsorBlock] Waiting for SponsorBlock to be added, attempt: {attempt}/{MAX_HOURS} ")
segments = check_for_sponsorblock_segments(episode_link)
if segments:
logger.debug("[SponsorBlock] Segments found, exiting loop.")
return True
logger.debug("[SponsorBlock] No SponsorBlock segments found yet, retrying in 1 hour.")
time.sleep(CHECK_INTERVAL_HOURS * 3600)
logger.warning("[SponsorBlock] Segments not found after maximum attempts.")
return False
def download_episode() -> None:
"""
Main workflow: Check for new episode, download it, upload via SFTP, and send notification.
"""
logger.info("[App] Starting Perun")
try:
episode_data,audiobookshelf_track = check_until_new_episode_gets_released()
if episode_data is None or audiobookshelf_track is None:
logger.error("[App] Failed to find new episode within timeout period")
log_message(f"Failed to download episode: {e}")
return
logger.info("[App] New episode found")
except Exception as e:
logger.error(f"[App] Failed to fetch new episode info: {e}", exc_info=True)
return
try:
if "sponsored" in episode_data.episode_description.lower():
logger.debug("[App] Sponsored segments found in description, waiting for SponsorBlock")
wait_for_sponsorblock_segments_to_be_added(episode_data.episode_link)
else:
logger.debug("[App] No sponsored segments found in description")
except Exception as e:
logger.warning(f"[App] Failed during SponsorBlock wait: {e}", exc_info=True)
try:
episode_data.episode_number = str(int(audiobookshelf_track) + 1).zfill(4)
except (ValueError,TypeError) as e:
logger.warning(f"[App] Failed incrementing audiobookshelf track: {e}", exc_info=True)
return
try:
options = return_download_options(episode_data)
except Exception as e:
logger.error(f"[App] Failed to generate download options: {e}", exc_info=True)
return
logger.info("[App] Downloading new episode")
try:
with yt_dlp.YoutubeDL(options) as episode:
episode.download(episode_data.episode_link)
logger.debug("[App] Download completed successfully")
except Exception as e:
logger.error(f"[App] Failed to download episode: {e}", exc_info=True)
return
logger.info("[App] Uploading episode via SFTP")
upload_via_sftp(f"perun-{episode_data.episode_date}.mp3")
logger.info("[App] Sending release notification")
send_notification_via_ssh(f"Perun episode {episode_data.episode_number} has been released",episode_data.episode_title)
logger.info("[App] Workflow complete")
log_message("Uploading episode")
upload_via_sftp(f"perun-{episode_info['date']}.mp3")
log_message("Finished uploading, sending notification")
send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"])
log_message("Finished")
else:
log_message("No new episode found, exiting...")
if __name__ == "__main__":
download_episode()

1
src/perun/grabEpisode.sh Executable file → Normal file
View File

@ -9,6 +9,7 @@ fi
source .venv/bin/activate
pip install --upgrade pip
pip install --upgrade yt-dlp[default]
pip install -r requirements.txt

View File

@ -1,9 +1,15 @@
import re
import datetime
def return_string_as_html(input_text):
string_without_ads=""
for line in input_text.splitlines():
line = re.sub(r'(https?://[^\s]+)', r'<a href="\1">\1</a>', line)
if "Sponsored" not in line:
if not "Sponsored" in line:
string_without_ads+=line+"\n"
return("<p>"+string_without_ads.replace("\n\n", "</p>\n<p>").replace("\n", "<br>")+"</p>")
def log_message(message):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
return(f"[{timestamp}] {message}\n")

View File

@ -1,4 +1,3 @@
--extra-index-url https://git.gansejunge.com/api/packages/notifier/pypi/simple/
bcrypt==5.0.0
Brotli==1.1.0
certifi==2025.10.5
@ -6,7 +5,6 @@ cffi==2.0.0
charset-normalizer==3.4.3
cryptography==46.0.2
dotenv==0.9.9
feedparser==6.0.12
idna==3.10
invoke==2.2.0
mutagen==1.47.0
@ -16,8 +14,5 @@ pycryptodomex==3.23.0
PyNaCl==1.6.0
python-dotenv==1.1.1
requests==2.32.5
sgmllib3k==1.0.0
simple-logger-handler==0.1.0
sponsorblock.py==0.2.3
urllib3==2.5.0
websockets==15.0.1

View File

@ -1,85 +0,0 @@
import feedparser
from simple_logger_handler import setup_logger
import time
from urllib.error import URLError
from typing import Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class EpisodeData:
episode_date: str
episode_description: str
episode_link: str
episode_number: str
episode_title: str
episode_ytid: str
logger = setup_logger(__name__)
def grab_latest_chapter_information(id: str, max_retries: int = 3) -> Optional[EpisodeData]:
"""
Fetches the latest episodes information from a Youtube RSS feed, with retries on network-related errors.
Parameters:
id: Youtube channel ID as a string.
max_retries: Number of retry attempts if fetching the feed fails due to network issues.
Returns:
EpisodeData: A dataclass containing episode metadata:
episode_date: Date when it was published in iso format (2025-11-30).
episode_description: Episode description.
episode_link: YouTube link.
episode_number: Episode number.
episode_title: Episode title.
episode_ytid: Episode YouTube ID .
Returns None if the feed has no entries or all retries are exhausted.
Raises:
ValueError: If the feed has no entries.
Other network-related exceptions: If fetching fails after retries.
"""
rss_feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={id}"
attempt = 1
while attempt <= max_retries:
logger.debug(f"[Feed] Parsing feed URL: {rss_feed_url} (attempt {attempt}/{max_retries})")
try:
feed = feedparser.parse(rss_feed_url)
if not feed.entries:
logger.warning(f"[Feed] No entries found for feed {id}")
return None
latest_chapter_data = feed["entries"][0]
episode_link = latest_chapter_data["link"]
episode_title = latest_chapter_data["title"]
episode_description = latest_chapter_data["summary"]
episode_date = latest_chapter_data["published"]
episode_date = datetime.fromisoformat(episode_date).date().isoformat()
episode_ytid = latest_chapter_data["yt_videoid"]
logger.info(f"[Feed] Latest episode '{episode_title}': {episode_link}")
logger.debug(f"[Feed] Latest episode '{episode_title}' (YouTubeId {episode_ytid}): {episode_link} -> {episode_description}")
return EpisodeData(
episode_date=episode_date,
episode_description=episode_description,
episode_link=episode_link,
episode_number="",
episode_title=episode_title,
episode_ytid=episode_ytid
)
except (URLError, OSError) as e:
logger.warning(f"[Feed] Network error on attempt {attempt} for feed {id}: {e}")
if attempt == max_retries:
logger.error(f"[Feed] All {max_retries} attempts failed for feed {id}")
return None
backoff = 2 ** (attempt - 1)
logger.debug(f"[Feed] Retrying in {backoff} seconds...")
time.sleep(backoff)
attempt += 1
if __name__ == "__main__":
print(grab_latest_chapter_information("UCC3ehuUksTyQ7bbjGntmx3Q"))

View File

@ -2,42 +2,18 @@ import paramiko
import os
from dotenv import load_dotenv
from json import dumps
from simple_logger_handler import setup_logger
import time
import shlex
logger = setup_logger(__name__)
load_dotenv()
REMOTE_HOSTNAME = os.getenv("REMOTE_HOSTNAME")
REMOTE_PATH = os.getenv("REMOTE_PATH")
BACKEND_API_URL = os.getenv("BACKEND_API_URL")
BACKEND_API_KEY= os.getenv("BACKEND_API_KEY")
def load_ssh_config(host_alias:str) -> tuple[str, int, str, str]:
"""
Load SSH connection details from ~/.ssh/config for the given alias.
Args:
host_alias: The SSH host alias to look up
Returns:
Tuple of (hostname, port, username, keyfile)
Raises:
FileNotFoundError: If SSH config file doesn't exist
ValueError: If SSH configuration is incomplete
"""
logger.debug(f"[SSH] Loading SSH configuration for host alias '{host_alias}'")
def load_ssh_config(host_alias):
ssh_config = paramiko.SSHConfig()
config_path = os.path.expanduser("~/.ssh/config")
try:
with open(config_path) as f:
ssh_config.parse(f)
except FileNotFoundError:
logger.error(f"[SSH] SSH config file not found at {config_path}")
raise
with open(config_path) as f:
ssh_config.parse(f)
host_config = ssh_config.lookup(host_alias)
hostname = host_config.get("hostname")
port = int(host_config.get("port", 22))
@ -45,136 +21,57 @@ def load_ssh_config(host_alias:str) -> tuple[str, int, str, str]:
keyfile = host_config.get("identityfile", [None])[0]
if not all([hostname, username, keyfile]):
logger.error(f"[SSH] Incomplete SSH configuration for alias '{host_alias}'")
raise ValueError(f"[SSH] Missing SSH configuration for {host_alias}.")
raise ValueError(f"Missing SSH configuration for {host_alias}.")
logger.debug(f"[SSH] SSH config loaded: host={hostname}, port={port}, user={username}, key={keyfile}")
return hostname, port, username, keyfile
def create_ssh_client(hostname: str, port: int, username: str, keyfile: str)-> paramiko.SSHClient:
"""
Create and return a connected Paramiko SSHClient instance.
Args:
hostname: Remote hostname
port: SSH port
username: SSH username
keyfile: Path to SSH private key file
Returns:
Connected SSHClient instance (caller must close it)
Raises:
Exception: If SSH connection fails
"""
logger.debug("[SSH] Creating SSH client")
try:
ssh = paramiko.SSHClient()
ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts"))
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey)
logger.debug("[SSH] SSH connection established successfully")
return ssh
except Exception as e:
logger.error(f"[SSH] SSH connection failed: {e}", exc_info=True)
raise
def create_ssh_client(hostname, port, username, keyfile):
ssh = paramiko.SSHClient()
ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts"))
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey)
return ssh
def upload_via_sftp(filename) -> None:
"""
Upload a file to the remote host via SFTP using SSH credentials.
def upload_via_sftp(filename):
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
Args:
filename: Local file path to upload
transport = paramiko.Transport((hostname, port))
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
transport.connect(username=username, pkey=pkey)
sftp = paramiko.SFTPClient.from_transport(transport)
Raises:
Exception: If upload fails
"""
logger.info(f"[SFTP] Preparing to upload file '{filename}' via SFTP")
try:
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
logger.debug(f"[SFTP] Connecting to {hostname}:{port} for file upload")
remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename))
sftp.put(filename, remote_file)
transport = paramiko.Transport((hostname, port))
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
transport.connect(username=username, pkey=pkey)
sftp = paramiko.SFTPClient.from_transport(transport)
remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename))
logger.info(f"[SFTP] Uploading to remote path: {remote_file}")
sftp.put(filename, remote_file)
sftp.close()
transport.close()
logger.info(f"[SFTP] File '{filename}' uploaded successfully")
except Exception as e:
logger.error(f"[SFTP] SFTP upload failed for '{filename}': {e}", exc_info=True)
raise
sftp.close()
transport.close()
def send_notification_via_ssh(notification_title, notification_info) -> None:
"""
Send a JSON-formatted notification payload via SSH to the backend.
Args:
notification_title: Title of the notification
notification_info: Body/content of the notification
Raises:
Exception: If notification sending fails
"""
logger.info(f"[Notification] Sending SSH notification: {notification_title}")
ssh = None
try:
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
ssh = create_ssh_client(hostname, port, username, keyfile)
def send_notification_via_ssh(notification_title, notification_info):
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
with create_ssh_client(hostname, port, username, keyfile) as ssh:
data = {
"receipent_user_id": 1,
"message": {
"title": notification_title,
"body": notification_info,
"category": "podcasts",
"timestamp": int(time.time())
"info": notification_info,
"category": "mixtapes"
}
}
json_payload = dumps(data)
logger.debug(f"[Notification] Notification payload: {json_payload}")
escaped_payload = shlex.quote(json_payload)
escaped_url = shlex.quote(BACKEND_API_URL)
# Command reads API key and JSON from stdin
notification_cmd = (
f"API_KEY=$(head -n1) && "
f"curl -s -X POST {escaped_url} "
f"curl -s -X POST '{BACKEND_API_URL}' "
f"-H 'Content-Type: application/json' "
f"-H \"X-API-Key-Internal: $API_KEY\" "
f"-d {escaped_payload}"
f"-H 'X-API-Key-Internal: $(head -n1)' "
f"-d @-"
)
stdin, stdout, stderr = ssh.exec_command(notification_cmd)
stdin.write(f"{BACKEND_API_KEY}\n")
stdin.write(f"{BACKEND_API_KEY}\n{json_payload}")
stdin.flush()
stdin.channel.shutdown_write()
exit_status = stdout.channel.recv_exit_status()
response_output = stdout.read().decode()
if exit_status == 0:
logger.info("[Notification] Notification sent successfully")
logger.debug(f"[Notification] Response: {response_output}")
else:
error_output = stderr.read().decode()
logger.warning(f"[Notification] Notification command exited with {exit_status}")
logger.warning(f"[Notification] Error: {error_output}")
logger.warning(f"[Notification] Response: {response_output}")
except Exception as e:
logger.error(f"[Notification] Failed to send SSH notification: {e}", exc_info=True)
raise
finally:
if ssh:
ssh.close()
logger.debug("[Notification] SSH connection closed")
stdin.channel.shutdown_write()

View File

@ -4,39 +4,46 @@ import contextlib
from dotenv import load_dotenv
import os
from helper import return_string_as_html
from simple_logger_handler import setup_logger
import json
import sponsorblock as sb
logger = setup_logger(__name__)
load_dotenv()
YOUTUBE_CHANNEL_URL = os.getenv("YOUTUBE_CHANNEL_URL")
def check_for_sponsorblock_segments(youtube_video:str) -> bool:
client = sb.Client()
try:
segments = client.get_skip_segments(youtube_video)
except sb.errors.NotFoundException:
logger.debug(f"[SponsorBlock] No SponsorBlock information for video:{youtube_video}")
return False
if segments:
logger.info(f"[SponsorBlock] SponsorBlock segments found for video: {youtube_video}")
return True
def return_download_options(episode_data)->dict:
download_options = {
def get_url_for_latest_video():
options = {
"extract_flat": True,
"playlist_items": "1",
"quiet": True,
"forcejson": True,
"simulate": True,
}
with open(os.devnull, "w") as devnull:
with contextlib.redirect_stdout(devnull):
with yt_dlp.YoutubeDL(options) as video:
info_dict = video.extract_info(YOUTUBE_CHANNEL_URL, download = False)
if "entries" in info_dict and len(info_dict["entries"]) > 0:
return info_dict["entries"][0]["url"]
def get_youtube_data(url):
with yt_dlp.YoutubeDL({"quiet":True,"noprogress":True}) as video:
info_dict = video.extract_info(url, download = False)
return {"date":datetime.datetime.fromtimestamp(info_dict["timestamp"], datetime.timezone.utc).strftime("%Y-%m-%d"),"title":info_dict["title"],
"description":return_string_as_html(info_dict["description"]),"upload_date":info_dict["upload_date"]}
def return_download_options(information:dict,track:str)->dict:
return {
"quiet": True,
"noprogress": True,
"format": "bestaudio/best",
"extract_audio": True,
"audio_format": "mp3",
"outtmpl": f"perun-{episode_data.episode_date}.%(ext)s",
"outtmpl": f"perun-{information['date']}.%(ext)s",
"addmetadata": True,
"postprocessors":[
{"api": "https://sponsor.ajay.app",
"categories":["sponsor"],
"categories":{"sponsor"},
"key": "SponsorBlock",
"when": "after_filter"
},
@ -45,7 +52,7 @@ def return_download_options(episode_data)->dict:
"key": "ModifyChapters",
"remove_chapters_patterns": [],
"remove_ranges": [],
"remove_sponsor_segments": ["sponsor"],
"remove_sponsor_segments": {"sponsor"},
"sponsorblock_chapter_title": "[SponsorBlock]: %(category_names)l"
},
{
@ -56,17 +63,12 @@ def return_download_options(episode_data)->dict:
"key": "FFmpegMetadata",
}],
"postprocessor_args": [
"-metadata", f"title={episode_data.episode_title}",
"-metadata", "artist=Perun",
"-metadata", f"track={episode_data.episode_number}",
"-metadata", f"date={episode_data.episode_date}",
"-metadata", f"comment={return_string_as_html(episode_data.episode_description)}",
"-metadata", f"description={episode_data.episode_ytid}",
"-metadata", f"title={information['title']}",
"-metadata", f"artist=Perun",
"-metadata", f"track={track}",
"-metadata", f"date={information['date']}",
"-metadata", f"comment={information['description']}",
"-metadata", f"description={information['description']}",
],
"merge_output_format": "mp3"
}
logger.debug(f"[YouTube] Created download options:\n {json.dumps(download_options, indent=4)}")
return download_options
if __name__ == "__main__":
print(check_for_sponsorblock_segments("https://www.youtube.com/watch?v=M0t8UYZ9rrQ"))
}

View File

@ -1,21 +0,0 @@
FROM ubuntu:24.04
RUN apt update && \
apt install -y ca-certificates curl ffmpeg keychain python3 software-properties-common && \
add-apt-repository ppa:m-grant-prg/utils && \
apt update && apt install -y get-iplayer && \
rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/*
ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && mv /root/.local/bin/uv /usr/local/bin/uv && rm /uv-installer.sh
RUN userdel ubuntu && groupadd -r florian -g 1000 && \
useradd -u 1000 -r -g florian -m -d /home/florian -s /bin/bash florian && \
mkdir /app && chown -R florian:florian /app
USER florian
WORKDIR /home/florian
COPY requirements.txt ./
RUN uv venv && uv pip install -r requirements.txt
WORKDIR /app

View File

@ -1,75 +0,0 @@
# Pete Tong BBC Radio Episode Downloader
A Python script that automatically downloads the latest Pete Tong radio show from BBC iPlayer Radio, converts it to MP3 with metadata, and sends a push notification when complete.
## Features
- **Automatic Detection**: Finds the latest Pete Tong episode from BBC iPlayer
- **Audio Download**: Uses `get_iplayer` to download BBC Radio episodes
- **MP3 Conversion**: Converts to MP3 format with ffmpeg
- **Metadata Injection**: Adds title, artist, track number (week of year), date, and description
- **Push Notifications**: Sends notification to backend service when new episode is ready
## Prerequisites
- Python 3.8+
- `get_iplayer` (BBC iPlayer downloader)
- `ffmpeg` and `ffprobe` (audio processing)
- Backend notification service
## Installation
### Install Python Dependencies
```bash
pip install requests python-dotenv
```
### Install System Dependencies
**Ubuntu/Debian:**
```bash
sudo apt install get-iplayer ffmpeg
```
**macOS:**
```bash
brew install get-iplayer ffmpeg
```
## Configuration
Create a `.env` file with the following variables:
```env
# Backend notification service
BACKEND_API_URL=http://localhost:30101/internal/receive-notifications
BACKEND_API_KEY=your_api_key_here
```
## Usage
Run the script manually:
```bash
python download_episode.py
```
Or schedule with cron and use the provided `grabEpisode.sh`(Saturday mornings at 9 AM):
```bash
0 9 * * 6 /path/to/script/grabEpisode.sh
```
## Output
MP3 files are named: `YYYY-MM-DD-{episode_id}.mp3`
Example: `2025-10-17-m00258br.mp3`
## Metadata Structure
| Field | Value | Example |
|-------|-------|---------|
| Title | Featured artist | "Solomun" |
| Artist | Pete Tong | "Pete Tong" |
| Track | Friday number | 42 (42nd Friday of year) |
| Date | ISO date | "2025-10-17" |
| Comment | Episode description | HTML formatted text |

View File

@ -1,141 +0,0 @@
from datetime import datetime, timedelta
import os
import subprocess
from dataclasses import dataclass
import json
from logger_handler import setup_logger
from send_notification import send_notification
logger = setup_logger("PeteTongDownloader")
@dataclass
class EpisodeData:
description: str
title: str
timestamp: str
track: int
id: str
def add_html_tags_to_description(input_text) -> str:
if not input_text:
return ""
return("<p>"+input_text.replace("\n\n", "</p>\n<p>").replace("\n", "<br>")+"</p>")
def get_friday_number(iso_timestamp: str) -> int:
"""
Returns the week number of the Friday in the year for a given ISO timestamp string.
"""
try:
dt = datetime.fromisoformat(iso_timestamp)
start_of_year = datetime(dt.year, 1, 1, tzinfo=dt.tzinfo)
days_until_first_friday = (4 - start_of_year.weekday()) % 7
first_friday = start_of_year + timedelta(days=days_until_first_friday)
fridays_passed = (dt - first_friday).days // 7 + 1
return fridays_passed
except Exception as e:
logger.error(f"Failed to calculate Friday number from {iso_timestamp}: {e}")
return 0
def find_downloaded_file_name_via_id(directory: str, latest_episode_id: str) -> str | None:
for filename in os.listdir(directory):
if latest_episode_id in filename:
return filename
logger.warning(f"No file found containing episode ID {latest_episode_id} in {directory}")
return None
def extract_metadata_from_downloaded_episode(file_name: str, episode_id: str) -> EpisodeData:
if not file_name or not os.path.exists(file_name):
logger.error(f"File not found: {file_name}")
raise FileNotFoundError(f"File not found: {file_name}")
try:
result = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", file_name],
capture_output=True, text=True, check=True
)
ffprobe_data = json.loads(result.stdout)
metadata = ffprobe_data.get("format", {}).get("tags", {})
iso_timestamp = metadata.get("date", "1970-01-01T00:00:00")
return EpisodeData(
description=add_html_tags_to_description(metadata.get("lyrics", "")),
title=metadata.get("title", "Unknown Title"),
timestamp=iso_timestamp.split("T")[0],
track=get_friday_number(iso_timestamp),
id=episode_id
)
except subprocess.CalledProcessError as e:
logger.error(f"ffprobe failed for {file_name}: {e.stderr}")
raise
except json.JSONDecodeError as e:
logger.error(f"Failed to parse ffprobe output for {file_name}: {e}")
raise
def get_id_of_the_latest_episode(base_url: str) -> str:
try:
result = subprocess.run(
["get_iplayer", "--pid-recursive-list", base_url],
capture_output=True, text=True, check=True
)
lines = result.stdout.strip().split("\n")
if len(lines) < 3:
raise ValueError("get_iplayer output too short to find latest episode ID")
latest_episode_id = lines[-2].split(",")[-1].strip()
logger.info(f"Latest episode ID: {latest_episode_id}")
return latest_episode_id
except subprocess.CalledProcessError as e:
logger.error(f"get_iplayer failed: {e.stderr}")
raise
def download_episode_via_episode_id(episode_id: str) -> str:
script_dir = os.path.dirname(os.path.abspath(__file__))
try:
logger.info(f"Downloading episode {episode_id}")
subprocess.run(
["get_iplayer", f"--pid={episode_id}", "--type=radio"],
cwd=script_dir, check=True
)
except subprocess.CalledProcessError as e:
logger.error(f"Download failed for {episode_id}: {e.stderr}")
raise
return script_dir
def convert_episode_to_mp3(episode_data: EpisodeData, file_name: str):
output_file = f"{episode_data.timestamp}-{episode_data.id}.mp3"
ffmpeg_command = [
"ffmpeg", "-i", file_name,
"-metadata", f"title={episode_data.title}",
"-metadata", "artist=Pete Tong",
"-metadata", f"track={episode_data.track}",
"-metadata", f"date={episode_data.timestamp}",
"-metadata", f"comment={episode_data.description}",
output_file
]
try:
logger.info(f"Converting {file_name} to {output_file}")
subprocess.run(ffmpeg_command, check=True)
os.remove(file_name)
except subprocess.CalledProcessError as e:
logger.error(f"ffmpeg conversion failed: {e}")
raise
def download_latest_pete_tong_episode():
try:
base_url = "https://www.bbc.co.uk/programmes/b006ww0v"
episode_id = get_id_of_the_latest_episode(base_url)
download_episode_via_episode_id(episode_id)
script_dir = download_episode_via_episode_id(episode_id)
file_name = find_downloaded_file_name_via_id(script_dir, episode_id)
episode_data = extract_metadata_from_downloaded_episode(file_name, episode_id)
convert_episode_to_mp3(episode_data, file_name)
logger.info("Episode download and conversion completed successfully")
send_notification(episode_data.title)
logger.info("Notification sent")
except Exception as e:
logger.error(f"Failed to download latest Pete Tong episode: {e}", exc_info=True)
if __name__ == "__main__":
download_latest_pete_tong_episode()

View File

@ -1,4 +0,0 @@
#!/bin/bash -e
docker run --network host --rm -v /home/florian/github/service-podcasts/src/petetong:/app bbcr1:latest /home/florian/.venv/bin/python /app/download_episode.py
mv /home/florian/github/service-podcasts/src/petetong/*.mp3 "/var/lib/audiobookshelf/music/Pete Tong/"

View File

@ -1,19 +0,0 @@
import logging
import os
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
LOG_LEVEL = "INFO"
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(getattr(logging, LOG_LEVEL))
logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}")
return logger

View File

@ -1,2 +0,0 @@
dotenv
requests

View File

@ -1,60 +0,0 @@
import requests
from requests.exceptions import RequestException, Timeout, ConnectionError
import os
import time
from logger_handler import setup_logger
from dotenv import load_dotenv
load_dotenv()
backend_api_url=os.getenv("BACKEND_API_URL","http://localhost:30101/internal/receive-notifications")
api_key= os.getenv("BACKEND_API_KEY")
logger = setup_logger(__name__)
def send_notification(body: str,max_retries: int = 5,timeout: int = 5):
"""
Sends a notification to the internal backend service when a new Pete Tong episode is out.
Parameters:
body: Featured artist
"""
headers = {
"X-API-Key-Internal": api_key,
"Content-Type": "application/json"
}
title = "New Pete Tong episode is available"
data = {
"receipent_user_id": 1,
"message": {
"title": title,
"body": f"Featured artist: {body}",
"category":"mixtapes",
"timestamp": int(time.time())
}
}
logger.debug(f"[Notify] Preparing to send notification: title='{title}', body={body}")
with requests.Session() as session:
for attempt in range(1, max_retries + 1):
try:
logger.debug(f"[Notify] Sending request to backend (attempt {attempt}/{max_retries})")
response = session.post(backend_api_url, headers=headers, json=data, timeout=timeout)
response.raise_for_status()
logger.info(f"[Notify] Notification sent successfully for '{title}' (body {body})")
return
except (Timeout, ConnectionError) as e:
logger.warning(f"[Notify] Attempt {attempt}/{max_retries} failed: {type(e).__name__}")
if attempt == max_retries:
logger.error(f"[Notify] All retry attempts failed for '{title}'")
else:
sleep_time = 2 ** (attempt - 1)
logger.debug(f"[Notify] Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
except RequestException as e:
logger.error(f"[Notify] Unexpected request failure: {e}")
return