Updated Perun script with logging and added a readme
This commit is contained in:
parent
2984c96de9
commit
7da6b09981
@ -1,18 +1,75 @@
|
|||||||
# Perun
|
# Perun YouTube Podcast Downloader
|
||||||
|
|
||||||
|
A Python script that automatically downloads the latest video from the Perun YouTube channel, converts it to MP3 with metadata, removes sponsor segments, and uploads it to a podcast server.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- **Automatic Detection**: Checks for new episodes by comparing with Audiobookshelf library
|
||||||
|
- **Audio Conversion**: Downloads and converts YouTube videos to MP3 format
|
||||||
|
- **Sponsor Removal**: Uses SponsorBlock API to remove sponsored segments
|
||||||
|
- **Metadata Injection**: Adds title, artist, track number, date, and description to MP3 files
|
||||||
|
- **SFTP Upload**: Automatically uploads to remote podcast server
|
||||||
|
- **Push Notifications**: Sends notification when new episode is available
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
|
||||||
|
- Python 3.8+
|
||||||
|
- yt-dlp
|
||||||
|
- ffmpeg (for audio conversion)
|
||||||
|
- SSH key-based authentication configured
|
||||||
|
- Audiobookshelf server with API access
|
||||||
|
|
||||||
|
|
||||||
Youtube blocks a lot of server IPs so running this locally is just easier, expects the following environment variables in a .env file:
|
## Installation
|
||||||
|
|
||||||
REMOTE_HOSTNAME
|
```bash
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
Install ffmpeg:
|
||||||
|
```bash
|
||||||
|
# Ubuntu/Debian
|
||||||
|
sudo apt install ffmpeg
|
||||||
|
|
||||||
REMOTE_PATH
|
# macOS
|
||||||
|
brew install ffmpeg
|
||||||
|
```
|
||||||
|
## Usage
|
||||||
|
|
||||||
BACKEND_API_URL
|
Run the script manually:
|
||||||
|
```bash
|
||||||
|
python get_episode.py
|
||||||
|
```
|
||||||
|
|
||||||
BACKEND_API_KEY
|
Or schedule with cron and use the provided `grabEpisode.sh` (Monday at 7 AM):
|
||||||
|
```bash
|
||||||
|
0 7 * * 1 /path/to/script/grabEpisode.sh
|
||||||
|
```
|
||||||
|
|
||||||
YOUTUBE_CHANNEL_URL
|
Youtube blocks a lot of server IPs so running this locally is just easier.
|
||||||
|
|
||||||
PODCAST_AUTHORIZATION_TOKEN
|
## Configuration
|
||||||
|
|
||||||
PODCAST_API_URL
|
Create a `.env` file with the following variables:
|
||||||
|
|
||||||
|
```env
|
||||||
|
# YouTube channel to monitor
|
||||||
|
YOUTUBE_CHANNEL_URL=https://www.youtube.com/@PerunAU/videos
|
||||||
|
|
||||||
|
# Audiobookshelf API
|
||||||
|
PODCAST_API_URL=https://your-audiobookshelf.com/api/items/{item-id}
|
||||||
|
PODCAST_AUTHORIZATION_TOKEN=your_token_here
|
||||||
|
|
||||||
|
# SFTP upload destination
|
||||||
|
REMOTE_HOSTNAME=your_ssh_host_alias
|
||||||
|
REMOTE_PATH=/path/to/podcast/folder
|
||||||
|
|
||||||
|
# Backend notification service
|
||||||
|
BACKEND_API_URL=http://backend:8101/internal/receive-notifications
|
||||||
|
BACKEND_API_KEY=your_api_key
|
||||||
|
```
|
||||||
|
|
||||||
|
## Output
|
||||||
|
|
||||||
|
MP3 files are named: `perun-YYYY-MM-DD.mp3`
|
||||||
|
|
||||||
|
Example: `perun-2025-10-19.mp3`
|
||||||
|
|||||||
@ -2,10 +2,11 @@ import requests
|
|||||||
import yt_dlp
|
import yt_dlp
|
||||||
import os
|
import os
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from helper import log_message
|
|
||||||
from ssh_helper import upload_via_sftp, send_notification_via_ssh
|
from ssh_helper import upload_via_sftp, send_notification_via_ssh
|
||||||
from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options
|
from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options
|
||||||
|
from logger_handler import setup_logger
|
||||||
|
|
||||||
|
logger = setup_logger(__name__)
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
PODCAST_AUTHORIZATION_TOKEN = os.getenv("PODCAST_AUTHORIZATION_TOKEN")
|
PODCAST_AUTHORIZATION_TOKEN = os.getenv("PODCAST_AUTHORIZATION_TOKEN")
|
||||||
@ -19,48 +20,53 @@ def get_audiobookshelf_data()->tuple[int | None, str | None]:
|
|||||||
response = requests.get(PODCAST_API_URL, headers=headers)
|
response = requests.get(PODCAST_API_URL, headers=headers)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
result = response.json()
|
result = response.json()
|
||||||
|
|
||||||
audiobookshelf_track = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTrack"]
|
audiobookshelf_track = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTrack"]
|
||||||
audiobookshelf_title = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTitle"]
|
audiobookshelf_title = result["media"]["episodes"][-1]["audioFile"]["metaTags"]["tagTitle"]
|
||||||
|
|
||||||
|
logger.debug(f"Fetched Audiobookshelf data: track={audiobookshelf_track}, title={audiobookshelf_title}")
|
||||||
return audiobookshelf_track, audiobookshelf_title
|
return audiobookshelf_track, audiobookshelf_title
|
||||||
|
|
||||||
except requests.RequestException as e:
|
except requests.RequestException as e:
|
||||||
log_message(f"Failed to fetch data: {e}")
|
logger.warning(f"Failed to fetch Audiobookshelf data: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def download_episode():
|
def download_episode():
|
||||||
log_message("Starting Perun")
|
logger.info("Starting Perun")
|
||||||
|
|
||||||
audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data()
|
audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data()
|
||||||
if audiobookshelf_track is None or audiobookshelf_title is None:
|
if audiobookshelf_track is None or audiobookshelf_title is None:
|
||||||
log_message("Unable to fetch Audiobookshelf data. Exiting.")
|
logger.warning("Unable to fetch Audiobookshelf data. Exiting.")
|
||||||
return
|
return
|
||||||
|
|
||||||
episode_url = get_url_for_latest_video()
|
episode_url = get_url_for_latest_video()
|
||||||
episode_info = get_youtube_data(episode_url)
|
episode_info = get_youtube_data(episode_url)
|
||||||
log_message(f"Latest episode: {episode_info['title']}")
|
logger.info(f"Latest YouTube episode: {episode_info['title']}")
|
||||||
|
|
||||||
if audiobookshelf_title != episode_info["title"]:
|
if audiobookshelf_title != episode_info["title"]:
|
||||||
log_message("New Episode found")
|
logger.info("New episode found")
|
||||||
|
|
||||||
track = str(int(audiobookshelf_track) + 1).zfill(4)
|
track = str(int(audiobookshelf_track) + 1).zfill(4)
|
||||||
options = return_download_options(episode_info,track)
|
options = return_download_options(episode_info,track)
|
||||||
|
|
||||||
log_message("Downloading episode")
|
logger.info("Downloading new episode")
|
||||||
try:
|
try:
|
||||||
with yt_dlp.YoutubeDL(options) as episode:
|
with yt_dlp.YoutubeDL(options) as episode:
|
||||||
episode.download(episode_url)
|
episode.download(episode_url)
|
||||||
|
logger.debug("Download completed successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log_message(f"Failed to download episode: {e}")
|
logger.error(f"Failed to download episode: {e}", exc_info=True)
|
||||||
return
|
return
|
||||||
|
|
||||||
log_message("Uploading episode")
|
logger.info("Uploading episode via SFTP")
|
||||||
upload_via_sftp(f"perun-{episode_info['date']}.mp3")
|
upload_via_sftp(f"perun-{episode_info['date']}.mp3")
|
||||||
log_message("Finished uploading, sending notification")
|
|
||||||
|
logger.info("Sending release notification")
|
||||||
send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"])
|
send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"])
|
||||||
log_message("Finished")
|
logger.info("Workflow complete")
|
||||||
else:
|
else:
|
||||||
log_message("No new episode found, exiting...")
|
logger.debug("No new episode found, exiting.")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
download_episode()
|
download_episode()
|
||||||
|
|||||||
@ -1,5 +1,4 @@
|
|||||||
import re
|
import re
|
||||||
import datetime
|
|
||||||
|
|
||||||
def return_string_as_html(input_text):
|
def return_string_as_html(input_text):
|
||||||
string_without_ads=""
|
string_without_ads=""
|
||||||
@ -8,8 +7,3 @@ def return_string_as_html(input_text):
|
|||||||
if not "Sponsored" in line:
|
if not "Sponsored" in line:
|
||||||
string_without_ads+=line+"\n"
|
string_without_ads+=line+"\n"
|
||||||
return("<p>"+string_without_ads.replace("\n\n", "</p>\n<p>").replace("\n", "<br>")+"</p>")
|
return("<p>"+string_without_ads.replace("\n\n", "</p>\n<p>").replace("\n", "<br>")+"</p>")
|
||||||
|
|
||||||
def log_message(message):
|
|
||||||
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
print(f"[{timestamp}] {message}")
|
|
||||||
return(f"[{timestamp}] {message}\n")
|
|
||||||
19
src/perun/logger_handler.py
Normal file
19
src/perun/logger_handler.py
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||||
|
if LOG_LEVEL not in {"ERROR", "DEBUG", "INFO", "WARNING", "CRITICAL"}:
|
||||||
|
LOG_LEVEL = "INFO"
|
||||||
|
|
||||||
|
def setup_logger(name: str) -> logging.Logger:
|
||||||
|
logger = logging.getLogger(name)
|
||||||
|
if not logger.handlers:
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
formatter = logging.Formatter(
|
||||||
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
logger.addHandler(handler)
|
||||||
|
logger.setLevel(getattr(logging, LOG_LEVEL))
|
||||||
|
logger.debug(f"Logger {name} initialized with level {LOG_LEVEL}")
|
||||||
|
return logger
|
||||||
@ -2,7 +2,10 @@ import paramiko
|
|||||||
import os
|
import os
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from json import dumps
|
from json import dumps
|
||||||
|
from logger_handler import setup_logger
|
||||||
|
import time
|
||||||
|
|
||||||
|
logger = setup_logger(__name__)
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
REMOTE_HOSTNAME = os.getenv("REMOTE_HOSTNAME")
|
REMOTE_HOSTNAME = os.getenv("REMOTE_HOSTNAME")
|
||||||
REMOTE_PATH = os.getenv("REMOTE_PATH")
|
REMOTE_PATH = os.getenv("REMOTE_PATH")
|
||||||
@ -10,10 +13,20 @@ BACKEND_API_URL = os.getenv("BACKEND_API_URL")
|
|||||||
BACKEND_API_KEY= os.getenv("BACKEND_API_KEY")
|
BACKEND_API_KEY= os.getenv("BACKEND_API_KEY")
|
||||||
|
|
||||||
def load_ssh_config(host_alias):
|
def load_ssh_config(host_alias):
|
||||||
|
"""
|
||||||
|
Load SSH connection details from ~/.ssh/config for the given alias.
|
||||||
|
"""
|
||||||
|
logger.debug(f"Loading SSH configuration for host alias '{host_alias}'")
|
||||||
ssh_config = paramiko.SSHConfig()
|
ssh_config = paramiko.SSHConfig()
|
||||||
config_path = os.path.expanduser("~/.ssh/config")
|
config_path = os.path.expanduser("~/.ssh/config")
|
||||||
with open(config_path) as f:
|
|
||||||
ssh_config.parse(f)
|
try:
|
||||||
|
with open(config_path) as f:
|
||||||
|
ssh_config.parse(f)
|
||||||
|
except FileNotFoundError:
|
||||||
|
logger.error(f"SSH config file not found at {config_path}")
|
||||||
|
raise
|
||||||
|
|
||||||
host_config = ssh_config.lookup(host_alias)
|
host_config = ssh_config.lookup(host_alias)
|
||||||
hostname = host_config.get("hostname")
|
hostname = host_config.get("hostname")
|
||||||
port = int(host_config.get("port", 22))
|
port = int(host_config.get("port", 22))
|
||||||
@ -21,57 +34,95 @@ def load_ssh_config(host_alias):
|
|||||||
keyfile = host_config.get("identityfile", [None])[0]
|
keyfile = host_config.get("identityfile", [None])[0]
|
||||||
|
|
||||||
if not all([hostname, username, keyfile]):
|
if not all([hostname, username, keyfile]):
|
||||||
|
logger.error(f"Incomplete SSH configuration for alias '{host_alias}'")
|
||||||
raise ValueError(f"Missing SSH configuration for {host_alias}.")
|
raise ValueError(f"Missing SSH configuration for {host_alias}.")
|
||||||
|
|
||||||
|
logger.debug(f"SSH config loaded: host={hostname}, port={port}, user={username}, key={keyfile}")
|
||||||
return hostname, port, username, keyfile
|
return hostname, port, username, keyfile
|
||||||
|
|
||||||
|
|
||||||
def create_ssh_client(hostname, port, username, keyfile):
|
def create_ssh_client(hostname, port, username, keyfile):
|
||||||
ssh = paramiko.SSHClient()
|
"""
|
||||||
ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts"))
|
Create and return a connected Paramiko SSHClient instance.
|
||||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
"""
|
||||||
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
|
try:
|
||||||
ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey)
|
ssh = paramiko.SSHClient()
|
||||||
return ssh
|
ssh.load_host_keys(os.path.expanduser("~/.ssh/known_hosts"))
|
||||||
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||||
|
|
||||||
|
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
|
||||||
|
ssh.connect(hostname=hostname, username=username, port=port, pkey=pkey)
|
||||||
|
logger.debug("SSH connection established successfully")
|
||||||
|
return ssh
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"SSH connection failed: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
def upload_via_sftp(filename):
|
def upload_via_sftp(filename):
|
||||||
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
|
"""
|
||||||
|
Upload a file to the remote host via SFTP using SSH credentials.
|
||||||
|
"""
|
||||||
|
logger.info(f"Preparing to upload file '{filename}' via SFTP")
|
||||||
|
try:
|
||||||
|
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
|
||||||
|
logger.debug(f"Connecting to {hostname}:{port} for file upload")
|
||||||
|
|
||||||
transport = paramiko.Transport((hostname, port))
|
transport = paramiko.Transport((hostname, port))
|
||||||
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
|
pkey = paramiko.RSAKey.from_private_key_file(keyfile)
|
||||||
transport.connect(username=username, pkey=pkey)
|
transport.connect(username=username, pkey=pkey)
|
||||||
sftp = paramiko.SFTPClient.from_transport(transport)
|
sftp = paramiko.SFTPClient.from_transport(transport)
|
||||||
|
|
||||||
remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename))
|
remote_file = os.path.join(REMOTE_PATH, os.path.basename(filename))
|
||||||
sftp.put(filename, remote_file)
|
logger.info(f"Uploading to remote path: {remote_file}")
|
||||||
|
sftp.put(filename, remote_file)
|
||||||
|
|
||||||
sftp.close()
|
sftp.close()
|
||||||
transport.close()
|
transport.close()
|
||||||
|
logger.info(f"File '{filename}' uploaded successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"SFTP upload failed for '{filename}': {e}", exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
def send_notification_via_ssh(notification_title, notification_info):
|
def send_notification_via_ssh(notification_title, notification_info):
|
||||||
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
|
"""
|
||||||
|
Send a JSON-formatted notification payload via SSH to the backend.
|
||||||
with create_ssh_client(hostname, port, username, keyfile) as ssh:
|
"""
|
||||||
data = {
|
logger.info(f"Sending SSH notification: {notification_title}")
|
||||||
"receipent_user_id": 1,
|
try:
|
||||||
"message": {
|
hostname, port, username, keyfile = load_ssh_config(REMOTE_HOSTNAME)
|
||||||
"title": notification_title,
|
with create_ssh_client(hostname, port, username, keyfile) as ssh:
|
||||||
"info": notification_info,
|
data = {
|
||||||
"category": "mixtapes"
|
"receipent_user_id": 1,
|
||||||
|
"message": {
|
||||||
|
"title": notification_title,
|
||||||
|
"body": notification_info,
|
||||||
|
"category": "mixtapes",
|
||||||
|
"timestamp": int(time.time())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
json_payload = dumps(data)
|
||||||
json_payload = dumps(data)
|
logger.debug(f"Notification payload: {json_payload}")
|
||||||
|
|
||||||
# Command reads API key and JSON from stdin
|
notification_cmd = (
|
||||||
notification_cmd = (
|
f"curl -s -X POST '{BACKEND_API_URL}' "
|
||||||
f"curl -s -X POST '{BACKEND_API_URL}' "
|
f"-H 'Content-Type: application/json' "
|
||||||
f"-H 'Content-Type: application/json' "
|
f"-H 'X-API-Key-Internal: $(head -n1)' "
|
||||||
f"-H 'X-API-Key-Internal: $(head -n1)' "
|
f"-d @-"
|
||||||
f"-d @-"
|
)
|
||||||
)
|
|
||||||
stdin, stdout, stderr = ssh.exec_command(notification_cmd)
|
stdin, stdout, stderr = ssh.exec_command(notification_cmd)
|
||||||
stdin.write(f"{BACKEND_API_KEY}\n{json_payload}")
|
stdin.write(f"{BACKEND_API_KEY}\n{json_payload}")
|
||||||
stdin.flush()
|
stdin.flush()
|
||||||
stdin.channel.shutdown_write()
|
stdin.channel.shutdown_write()
|
||||||
|
|
||||||
|
exit_status = stdout.channel.recv_exit_status()
|
||||||
|
if exit_status == 0:
|
||||||
|
logger.info("Notification sent successfully")
|
||||||
|
else:
|
||||||
|
error_output = stderr.read().decode()
|
||||||
|
logger.warning(f"Notification command exited with {exit_status}: {error_output}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to send SSH notification: {e}", exc_info=True)
|
||||||
|
raise
|
||||||
@ -4,13 +4,19 @@ import contextlib
|
|||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
import os
|
import os
|
||||||
from helper import return_string_as_html
|
from helper import return_string_as_html
|
||||||
|
from logger_handler import setup_logger
|
||||||
|
import json
|
||||||
|
|
||||||
|
logger = setup_logger(__name__)
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
YOUTUBE_CHANNEL_URL = os.getenv("YOUTUBE_CHANNEL_URL")
|
YOUTUBE_CHANNEL_URL = os.getenv("YOUTUBE_CHANNEL_URL")
|
||||||
|
|
||||||
|
|
||||||
def get_url_for_latest_video():
|
def get_url_for_latest_video():
|
||||||
|
"""
|
||||||
|
Fetch the URL of the latest video from a YouTube channel.
|
||||||
|
"""
|
||||||
|
logger.info("Fetching latest video URL from YouTube channel")
|
||||||
options = {
|
options = {
|
||||||
"extract_flat": True,
|
"extract_flat": True,
|
||||||
"playlist_items": "1",
|
"playlist_items": "1",
|
||||||
@ -18,22 +24,50 @@ def get_url_for_latest_video():
|
|||||||
"forcejson": True,
|
"forcejson": True,
|
||||||
"simulate": True,
|
"simulate": True,
|
||||||
}
|
}
|
||||||
with open(os.devnull, "w") as devnull:
|
|
||||||
with contextlib.redirect_stdout(devnull):
|
|
||||||
with yt_dlp.YoutubeDL(options) as video:
|
|
||||||
info_dict = video.extract_info(YOUTUBE_CHANNEL_URL, download = False)
|
|
||||||
if "entries" in info_dict and len(info_dict["entries"]) > 0:
|
|
||||||
return info_dict["entries"][0]["url"]
|
|
||||||
|
|
||||||
def get_youtube_data(url):
|
try:
|
||||||
with yt_dlp.YoutubeDL({"quiet":True,"noprogress":True}) as video:
|
with open(os.devnull, "w") as devnull, contextlib.redirect_stdout(devnull):
|
||||||
info_dict = video.extract_info(url, download = False)
|
with yt_dlp.YoutubeDL(options) as video:
|
||||||
return {"date":datetime.datetime.fromtimestamp(info_dict["timestamp"], datetime.timezone.utc).strftime("%Y-%m-%d"),"title":info_dict["title"],
|
info_dict = video.extract_info(YOUTUBE_CHANNEL_URL, download=False)
|
||||||
"description":return_string_as_html(info_dict["description"]),"upload_date":info_dict["upload_date"]}
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to fetch latest video info: {e}", exc_info=True)
|
||||||
|
return None
|
||||||
|
|
||||||
|
if "entries" in info_dict and len(info_dict["entries"]) > 0:
|
||||||
|
latest_url = info_dict["entries"][0]["url"]
|
||||||
|
logger.debug(f"Latest video URL found: {latest_url}")
|
||||||
|
return latest_url
|
||||||
|
else:
|
||||||
|
logger.warning("No entries found in channel feed")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_youtube_data(url: str) -> dict:
|
||||||
|
"""
|
||||||
|
Fetch metadata for a given YouTube video URL.
|
||||||
|
"""
|
||||||
|
logger.info(f"Fetching YouTube metadata for video: {url}")
|
||||||
|
try:
|
||||||
|
with yt_dlp.YoutubeDL({"quiet": True, "noprogress": True}) as video:
|
||||||
|
info_dict = video.extract_info(url, download=False)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to fetch YouTube video info for {url}: {e}", exc_info=True)
|
||||||
|
return {}
|
||||||
|
|
||||||
|
video_data = {
|
||||||
|
"date": datetime.datetime.fromtimestamp(
|
||||||
|
info_dict["timestamp"], datetime.timezone.utc
|
||||||
|
).strftime("%Y-%m-%d"),
|
||||||
|
"title": info_dict["title"],
|
||||||
|
"description": return_string_as_html(info_dict["description"]),
|
||||||
|
"upload_date": info_dict["upload_date"]
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(f"Fetched video data: {json.dumps(video_data, indent=4)}")
|
||||||
|
return video_data
|
||||||
|
|
||||||
|
|
||||||
def return_download_options(information:dict,track:str)->dict:
|
def return_download_options(information:dict,track:str)->dict:
|
||||||
return {
|
download_options = {
|
||||||
"quiet": True,
|
"quiet": True,
|
||||||
"noprogress": True,
|
"noprogress": True,
|
||||||
"format": "bestaudio/best",
|
"format": "bestaudio/best",
|
||||||
@ -71,4 +105,6 @@ def return_download_options(information:dict,track:str)->dict:
|
|||||||
"-metadata", f"description={information['description']}",
|
"-metadata", f"description={information['description']}",
|
||||||
],
|
],
|
||||||
"merge_output_format": "mp3"
|
"merge_output_format": "mp3"
|
||||||
}
|
}
|
||||||
|
logger.debug(f"Created download options:\n {json.dumps(download_options, indent=4)}")
|
||||||
|
return download_options
|
||||||
Loading…
x
Reference in New Issue
Block a user