Remade generic BBC R1 downloader into one tailored for Pete Tong

- `yt-dlp` stopped working so a switch to `get_iplayer` was necessary
- Added sending a notification to the backend api
- Added logging and general error handling
This commit is contained in:
Florian 2025-10-19 20:40:27 +02:00
parent 7da6b09981
commit 9686ae26e4
11 changed files with 314 additions and 288 deletions

3
.gitignore vendored
View File

@ -167,4 +167,5 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
*.mp3
*.m4a

View File

@ -0,0 +1,13 @@
# Media Download Scripts
A collection of Python scripts for automatically downloading and processing podcast episodes from various sources.
### [Perun YouTube Downloader](src/perun/)
Downloads the latest video from the Perun YouTube channel, converts to MP3 with metadata and sponsor segment removal, and uploads to a podcast server.
### [BBC Radio 1 Pete Tong Downloader](src/petetong/)
Downloads the latest Pete Tong radio show from BBC iPlayer Radio, converts to MP3 with metadata, and sends notifications.
## Setup
Each project has its own README with detailed installation and configuration instructions. Navigate to the respective directories for more information.

View File

@ -1,26 +0,0 @@
settings = {
"Pete Tong":{
"artist": "Pete Tong",
"base_url":"https://www.bbc.co.uk/programmes/b006ww0v",
"cut_intro":True,
"modify_timestamp":7200,
"calculate_amount_of_fridays":True
},
"Radio 1s Classic Essential Mix":{
"artist":"Radio 1s Classic Essential Mix",
"use_different_release_date":True,
"base_url":"https://www.bbc.co.uk/programmes/b00f3pc4",
"cut_intro":True,
"remove_amount_of_characters_from_title":-5
},
"Defected on Radio 1 Dance":{
"artist": "Defected on Radio 1 Dance",
"base_url":"https://www.bbc.co.uk/programmes/m00287n1",
"remove_amount_of_characters_from_title":-10
},
"Radio 1s Essential Mix":{
"artist":"Radio 1s Essential Mix",
"base_url":"https://www.bbc.co.uk/programmes/b006wkfp",
"cut_intro":True
}
}

View File

@ -1,162 +0,0 @@
import yt_dlp
import subprocess
import tempfile
import sys
from datetime import datetime, timezone
from config import settings
from os import rename, remove
from helper import modify_chapters_for_ffmpeg, get_friday_number, return_url_of_latest_episode
import logging
from ytdlp_helper import return_episode_data
logger = logging.getLogger(__name__)
def _apply_configurations(configuration_settings: dict, episode_data):
"""
Apply configuration settings to episode data.
Returns:
tuple: (episode_data, filename_timestamp, track)
"""
if "remove_amount_of_characters_from_title" in configuration_settings:
amount_to_remove = configuration_settings["remove_amount_of_characters_from_title"]
episode_data.extracted_title = episode_data.extracted_title[:amount_to_remove]
if "modify_timestamp" in configuration_settings:
episode_data.extracted_timestamp -= configuration_settings["modify_timestamp"]
if "use_different_release_date" in configuration_settings:
if len(sys.argv) > 2:
filename_timestamp = sys.argv[2]
else:
logger.warning("Use_different_release_date set but missing sys.argv[2]. Falling back to default.")
filename_timestamp = datetime.fromtimestamp(episode_data.extracted_timestamp, tz=timezone.utc).strftime("%Y-%m-%d")
else:
filename_timestamp = datetime.fromtimestamp(episode_data.extracted_timestamp, tz=timezone.utc).strftime("%Y-%m-%d")
if "calculate_amount_of_fridays" in configuration_settings:
track = get_friday_number(episode_data.extracted_timestamp)
else:
track = filename_timestamp
return episode_data, filename_timestamp, track
def _prepare_ffmpeg_chapters(episode_data, configuration_settings):
"""
Prepare chapters for FFmpeg if cutting intro is requested.
"""
if not episode_data.chapters or len(episode_data.chapters) < 2:
logger.warning("Cutting intro requested but no chapters found.")
return None
return modify_chapters_for_ffmpeg(
episode_data.chapters[1:], episode_data.chapters[0]["end_time"]
)
def _download_audio(episode_url: str, episode_data, filename_timestamp: str, track: str, artist: str):
"""
Download episode audio using yt_dlp with metadata.
"""
ytdl_options = {
"quiet": True,
"noprogress": True,
"format": "bestaudio/best",
"extract_audio": True,
"audio_format": "mp3",
"outtmpl": f"{filename_timestamp}.%(ext)s",
"addmetadata": True,
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
},
{
"key": "FFmpegMetadata",
}
],
"postprocessor_args": [
"-metadata", f"title={episode_data.extracted_title}",
"-metadata", f"artist={artist}",
"-metadata", f"track={track}",
"-metadata", f"date={filename_timestamp}",
"-metadata", f"comment={episode_data.extracted_description}"
],
"merge_output_format": "mp3"
}
with yt_dlp.YoutubeDL(ytdl_options) as episode:
episode.download(episode_url)
def _cut_intro_with_ffmpeg(ffmpeg_chapters: str, episode_data, filename_timestamp: str, track: str, artist: str):
"""
Cut the intro from the episode using FFmpeg and apply metadata.
"""
logger.info("Fixing chapters and metadata with FFmpeg")
temp_metadata_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".txt") as temp_file:
temp_file.write(ffmpeg_chapters)
temp_metadata_path = temp_file.name
ffmpeg_command = [
"ffmpeg", "-ss", str(episode_data.chapters[0]["end_time"]),
"-hide_banner", "-loglevel", "error",
"-i", f"{filename_timestamp}.mp3",
"-i", temp_metadata_path,
"-map_metadata", "1",
"-metadata", f"title={episode_data.extracted_title}",
"-metadata", f"artist={artist}",
"-metadata", f"track={track}",
"-metadata", f"date={filename_timestamp}",
"-metadata", f"comment={episode_data.extracted_description}",
"-codec", "copy",
f"{filename_timestamp}-{episode_data.extracted_id}.mp3"
]
subprocess.run(ffmpeg_command, check=True)
remove(f"{filename_timestamp}.mp3")
except subprocess.CalledProcessError as e:
logger.error(f"Error running FFmpeg: {e}")
finally:
if temp_metadata_path and remove:
try:
remove(temp_metadata_path)
except Exception as ex:
logger.warning(f"Could not remove temp metadata file: {ex}")
def download_episode(configuration_settings: dict, episode_url: str):
logger.info("Extracting metadata")
episode_data = return_episode_data(episode_url)
episode_data, filename_timestamp, track = _apply_configurations(configuration_settings, episode_data)
artist = configuration_settings.get("artist", sys.argv[1] if len(sys.argv) > 1 else "Unknown Artist")
ffmpeg_chapters = None
if configuration_settings.get("cut_intro"):
ffmpeg_chapters = _prepare_ffmpeg_chapters(episode_data, configuration_settings)
logger.info("Downloading episode")
_download_audio(episode_url, episode_data, filename_timestamp, track, artist)
if ffmpeg_chapters:
_cut_intro_with_ffmpeg(ffmpeg_chapters, episode_data, filename_timestamp, track, artist)
else:
rename(f"{filename_timestamp}.mp3", f"{filename_timestamp}-{episode_data.extracted_id}.mp3")
logger.info("Finished")
if __name__ == "__main__":
show_name = sys.argv[1]
logger.info (f"Processing {show_name}")
episode_url = return_url_of_latest_episode(settings[show_name]["base_url"])
download_episode(settings[show_name],episode_url)

View File

@ -1,52 +0,0 @@
from datetime import datetime, timezone, timedelta
from typing import List, Dict
import subprocess
def time_to_milliseconds(time,length_to_cut) -> int:
return int(time * 1000 - length_to_cut * 1000)
def add_html_tags_to_description(input_text) -> str:
return("<p>"+input_text.replace("\n\n", "</p>\n<p>").replace("\n", "<br>")+"</p>")
def get_friday_number(extracted_timestamp) -> int:
dt = datetime.fromtimestamp(extracted_timestamp)
start_of_year = datetime(dt.year, 1, 1)
days_until_first_friday = (4 - start_of_year.weekday()) % 7
first_friday = start_of_year + timedelta(days=days_until_first_friday)
fridays_passed = (dt - first_friday).days // 7 + 1
return fridays_passed
def return_url_of_latest_episode(base_url:str) -> str:
result = subprocess.run(["get_iplayer","--pid-recursive-list",base_url], capture_output=True, text=True)
latest_episode_id = result.stdout.split("\n")[-3].split(",")[-1][1:]
return (f"https://www.bbc.co.uk/sounds/play/{latest_episode_id}")
def modify_chapters_for_ffmpeg(chapters: List[Dict], length_to_cut: float) -> str:
"""
Converts chapter times to ffmpeg-compatible metadata format, adjusting by length_to_cut.
Args:
chapters (list): List of chapter dicts with "start_time", "end_time", and "title".
length_to_cut (int/float): Amount of time to cut from start, in seconds.
Returns:
str: Chapters formatted as ffmpeg metadata.
"""
for entry in chapters:
if "start_time" in entry:
entry["start_time"]=time_to_milliseconds(entry["start_time"],length_to_cut)
if "end_time" in entry:
entry["end_time"]=time_to_milliseconds(entry["end_time"],length_to_cut)
chapter_format = ";FFMETADATA1\n"
for entry in chapters:
chapter_format+=("[CHAPTER]\n")
chapter_format+=("TIMEBASE=1/1000\n")
chapter_format+=(f"START={entry['start_time']}\n")
chapter_format+=(f"END={entry['end_time']}\n")
chapter_format+=(f"title={entry['title']}\n\n")
return(chapter_format)

View File

@ -1,47 +0,0 @@
import yt_dlp
from helper import add_html_tags_to_description
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class EpisodeData:
chapters: List
extracted_description: str
extracted_id: str
extracted_title: str
extracted_timestamp: Optional[int]
def return_episode_data(episode_url: str) -> EpisodeData:
"""
Quietly extracts meta information about a given radio show.
Args:
episode_url (str): The URL of the episode.
Returns:
EpisodeData: A dataclass containing episode metadata:
- chapters (List): Chapters in JSON format.
- extracted_description (str): HTML-wrapped description of the episode.
- extracted_id (str): Unique episode ID.
- extracted_title (str): Episode title.
- extracted_timestamp (Optional[int]): Airing timestamp (epoch seconds), if available.
"""
try:
with yt_dlp.YoutubeDL({"quiet": True, "noprogress": True}) as ydl:
info_dict = ydl.extract_info(episode_url, download=False)
except Exception as e:
return {"error": f"Failed to extract info: {e}"}
return EpisodeData(
chapters=info_dict.get("chapters", []),
extracted_description=add_html_tags_to_description(info_dict.get("description", "")),
extracted_id=info_dict.get("id", ""),
extracted_title=info_dict.get("title", ""),
extracted_timestamp=info_dict.get("timestamp"),
)
if __name__ == "__main__":
print(return_episode_data("https://www.bbc.co.uk/sounds/play/m002jtcqyt "))

75
src/petetong/README.md Normal file
View File

@ -0,0 +1,75 @@
# Pete Tong BBC Radio Episode Downloader
A Python script that automatically downloads the latest Pete Tong radio show from BBC iPlayer Radio, converts it to MP3 with metadata, and sends a push notification when complete.
## Features
- **Automatic Detection**: Finds the latest Pete Tong episode from BBC iPlayer
- **Audio Download**: Uses `get_iplayer` to download BBC Radio episodes
- **MP3 Conversion**: Converts to MP3 format with ffmpeg
- **Metadata Injection**: Adds title, artist, track number (week of year), date, and description
- **Push Notifications**: Sends notification to backend service when new episode is ready
## Prerequisites
- Python 3.8+
- `get_iplayer` (BBC iPlayer downloader)
- `ffmpeg` and `ffprobe` (audio processing)
- Backend notification service
## Installation
### Install Python Dependencies
```bash
pip install requests python-dotenv
```
### Install System Dependencies
**Ubuntu/Debian:**
```bash
sudo apt install get-iplayer ffmpeg
```
**macOS:**
```bash
brew install get-iplayer ffmpeg
```
## Configuration
Create a `.env` file with the following variables:
```env
# Backend notification service
BACKEND_API_URL=http://localhost:30101/internal/receive-notifications
BACKEND_API_KEY=your_api_key_here
```
## Usage
Run the script manually:
```bash
python download_episode.py
```
Or schedule with cron and use the provided `grabEpisode.sh`(Saturday mornings at 9 AM):
```bash
0 9 * * 6 /path/to/script/grabEpisode.sh
```
## Output
MP3 files are named: `YYYY-MM-DD-{episode_id}.mp3`
Example: `2025-10-17-m00258br.mp3`
## Metadata Structure
| Field | Value | Example |
|-------|-------|---------|
| Title | Featured artist | "Solomun" |
| Artist | Pete Tong | "Pete Tong" |
| Track | Friday number | 42 (42nd Friday of year) |
| Date | ISO date | "2025-10-17" |
| Comment | Episode description | HTML formatted text |

View File

@ -0,0 +1,141 @@
from datetime import datetime, timedelta
import os
import subprocess
from dataclasses import dataclass
import json
from logger_handler import setup_logger
from send_notification import send_notification
logger = setup_logger("PeteTongDownloader")
@dataclass
class EpisodeData:
description: str
title: str
timestamp: str
track: int
id: str
def add_html_tags_to_description(input_text) -> str:
if not input_text:
return ""
return("<p>"+input_text.replace("\n\n", "</p>\n<p>").replace("\n", "<br>")+"</p>")
def get_friday_number(iso_timestamp: str) -> int:
"""
Returns the week number of the Friday in the year for a given ISO timestamp string.
"""
try:
dt = datetime.fromisoformat(iso_timestamp)
start_of_year = datetime(dt.year, 1, 1, tzinfo=dt.tzinfo)
days_until_first_friday = (4 - start_of_year.weekday()) % 7
first_friday = start_of_year + timedelta(days=days_until_first_friday)
fridays_passed = (dt - first_friday).days // 7 + 1
return fridays_passed
except Exception as e:
logger.error(f"Failed to calculate Friday number from {iso_timestamp}: {e}")
return 0
def find_downloaded_file_name_via_id(directory: str, latest_episode_id: str) -> str | None:
for filename in os.listdir(directory):
if latest_episode_id in filename:
return filename
logger.warning(f"No file found containing episode ID {latest_episode_id} in {directory}")
return None
def extract_metadata_from_downloaded_episode(file_name: str, episode_id: str) -> EpisodeData:
if not file_name or not os.path.exists(file_name):
logger.error(f"File not found: {file_name}")
raise FileNotFoundError(f"File not found: {file_name}")
try:
result = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", file_name],
capture_output=True, text=True, check=True
)
ffprobe_data = json.loads(result.stdout)
metadata = ffprobe_data.get("format", {}).get("tags", {})
iso_timestamp = metadata.get("date", "1970-01-01T00:00:00")
return EpisodeData(
description=add_html_tags_to_description(metadata.get("lyrics", "")),
title=metadata.get("title", "Unknown Title"),
timestamp=iso_timestamp.split("T")[0],
track=get_friday_number(iso_timestamp),
id=episode_id
)
except subprocess.CalledProcessError as e:
logger.error(f"ffprobe failed for {file_name}: {e.stderr}")
raise
except json.JSONDecodeError as e:
logger.error(f"Failed to parse ffprobe output for {file_name}: {e}")
raise
def get_id_of_the_latest_episode(base_url: str) -> str:
try:
result = subprocess.run(
["get_iplayer", "--pid-recursive-list", base_url],
capture_output=True, text=True, check=True
)
lines = result.stdout.strip().split("\n")
if len(lines) < 3:
raise ValueError("get_iplayer output too short to find latest episode ID")
latest_episode_id = lines[-2].split(",")[-1].strip()
logger.info(f"Latest episode ID: {latest_episode_id}")
return latest_episode_id
except subprocess.CalledProcessError as e:
logger.error(f"get_iplayer failed: {e.stderr}")
raise
def download_episode_via_episode_id(episode_id: str) -> str:
script_dir = os.path.dirname(os.path.abspath(__file__))
try:
logger.info(f"Downloading episode {episode_id}")
subprocess.run(
["get_iplayer", f"--pid={episode_id}", "--type=radio"],
cwd=script_dir, check=True
)
except subprocess.CalledProcessError as e:
logger.error(f"Download failed for {episode_id}: {e.stderr}")
raise
return script_dir
def convert_episode_to_mp3(episode_data: EpisodeData, file_name: str):
output_file = f"{episode_data.timestamp}-{episode_data.id}.mp3"
ffmpeg_command = [
"ffmpeg", "-i", file_name,
"-metadata", f"title={episode_data.title}",
"-metadata", f"artist=Pete Tong",
"-metadata", f"track={episode_data.track}",
"-metadata", f"date={episode_data.timestamp}",
"-metadata", f"comment={episode_data.description}",
output_file
]
try:
logger.info(f"Converting {file_name} to {output_file}")
subprocess.run(ffmpeg_command, check=True)
os.remove(file_name)
except subprocess.CalledProcessError as e:
logger.error(f"ffmpeg conversion failed: {e}")
raise
def download_latest_pete_tong_episode():
try:
base_url = "https://www.bbc.co.uk/programmes/b006ww0v"
episode_id = get_id_of_the_latest_episode(base_url)
download_episode_via_episode_id(episode_id)
script_dir = download_episode_via_episode_id(episode_id)
file_name = find_downloaded_file_name_via_id(script_dir, episode_id)
episode_data = extract_metadata_from_downloaded_episode(file_name, episode_id)
convert_episode_to_mp3(episode_data, file_name)
logger.info("Episode download and conversion completed successfully")
send_notification(episode_data.title)
logger.info("Notification sent")
except Exception as e:
logger.error(f"Failed to download latest Pete Tong episode: {e}", exc_info=True)
if __name__ == "__main__":
download_latest_pete_tong_episode()

View File

@ -0,0 +1,4 @@
#!/bin/bash -e
docker run --network host --rm -v /home/florian/github/service-podcasts/src/petetong:/app ytdlp:latest python3 /app/download_episode.py
mv /home/florian/github/scripts/audiobookshelf/bbc-downloader/*.mp3 "/var/lib/audiobookshelf/music/Pete Tong/"

View File

@ -0,0 +1,19 @@
import logging
import os
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
LOG_LEVEL = "INFO"
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(getattr(logging, LOG_LEVEL))
logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}")
return logger

View File

@ -0,0 +1,60 @@
import requests
from requests.exceptions import RequestException, Timeout, ConnectionError
import os
import time
from logger_handler import setup_logger
from dotenv import load_dotenv
load_dotenv()
backend_api_url=os.getenv("BACKEND_API_URL","http://localhost:30101/internal/receive-notifications")
api_key= os.getenv("BACKEND_API_KEY")
logger = setup_logger(__name__)
def send_notification(body: str,max_retries: int = 5,timeout: int = 5):
"""
Sends a notification to the internal backend service when a new Pete Tong episode is out.
Parameters:
body: Featured artist
"""
headers = {
"X-API-Key-Internal": api_key,
"Content-Type": "application/json"
}
title = "New Pete Tong episode is available"
data = {
"receipent_user_id": 1,
"message": {
"title": title,
"body": f"Featured artist: {body}",
"category":"mixtapes",
"timestamp": int(time.time())
}
}
logger.debug(f"[Notify] Preparing to send notification: title='{title}', body={body}")
with requests.Session() as session:
for attempt in range(1, max_retries + 1):
try:
logger.debug(f"[Notify] Sending request to backend (attempt {attempt}/{max_retries})")
response = session.post(backend_api_url, headers=headers, json=data, timeout=timeout)
response.raise_for_status()
logger.info(f"[Notify] Notification sent successfully for '{title}' (body {body})")
return
except (Timeout, ConnectionError) as e:
logger.warning(f"[Notify] Attempt {attempt}/{max_retries} failed: {type(e).__name__}")
if attempt == max_retries:
logger.error(f"[Notify] All retry attempts failed for '{title}'")
else:
sleep_time = 2 ** (attempt - 1)
logger.debug(f"[Notify] Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
except RequestException as e:
logger.error(f"[Notify] Unexpected request failure: {e}")
return