This commit is contained in:
florian 2025-11-02 22:19:13 +01:00
parent ab603c0d9a
commit c23f6592de
2 changed files with 72 additions and 30 deletions

View File

@ -1,9 +1,10 @@
import requests
import yt_dlp
import os
import time
from dotenv import load_dotenv
from ssh_helper import upload_via_sftp, send_notification_via_ssh
from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options
from youtube_handler import get_url_for_latest_video, get_youtube_data, return_download_options, check_for_sponsorblock_segments
from logger_handler import setup_logger
logger = setup_logger(__name__)
@ -31,42 +32,64 @@ def get_audiobookshelf_data()->tuple[int | None, str | None]:
logger.warning(f"Failed to fetch Audiobookshelf data: {e}")
return None
def check_until_new_episode_gets_released():
CHECK_INTERVAL = 3600 # seconds
MAX_HOURS = 24
for _ in range(int(MAX_HOURS * 3600 / CHECK_INTERVAL)):
audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data()
if audiobookshelf_track is None or audiobookshelf_title is None:
logger.warning("Unable to fetch Audiobookshelf data. Exiting.")
episode_url = get_url_for_latest_video()
episode_info = get_youtube_data(episode_url)
if audiobookshelf_title != episode_info["title"]:
logger.info(f"Latest YouTube episode: {episode_info['title']}")
return audiobookshelf_track,episode_info,episode_url
else:
logger.debug("No new episode found, going to sleep.")
time.sleep(CHECK_INTERVAL)
def wait_for_sponsorblock_segments_to_be_added():
CHECK_INTERVAL = 3600 # seconds
MAX_HOURS = 24
for _ in range(int(MAX_HOURS * 3600 / CHECK_INTERVAL)):
segments= check_for_sponsorblock_segments()
if segments:
break
else:
logger.debug("Code eepy")
time.sleep(CHECK_INTERVAL)
def download_episode():
logger.info("Starting Perun")
audiobookshelf_track, audiobookshelf_title = get_audiobookshelf_data()
if audiobookshelf_track is None or audiobookshelf_title is None:
logger.warning("Unable to fetch Audiobookshelf data. Exiting.")
audiobookshelf_track,episode_info,episode_url = check_until_new_episode_gets_released()
logger.info("New episode found")
episode_description = episode_info["description"]
if "sponsor" in episode_description.lower():
wait_for_sponsorblock_segments_to_be_added()
track = str(int(audiobookshelf_track) + 1).zfill(4)
options = return_download_options(episode_info,track)
logger.info("Downloading new episode")
try:
with yt_dlp.YoutubeDL(options) as episode:
episode.download(episode_url)
logger.debug("Download completed successfully")
except Exception as e:
logger.error(f"Failed to download episode: {e}", exc_info=True)
return
episode_url = get_url_for_latest_video()
episode_info = get_youtube_data(episode_url)
logger.info(f"Latest YouTube episode: {episode_info['title']}")
logger.info("Uploading episode via SFTP")
upload_via_sftp(f"perun-{episode_info['date']}.mp3")
if audiobookshelf_title != episode_info["title"]:
logger.info("New episode found")
track = str(int(audiobookshelf_track) + 1).zfill(4)
options = return_download_options(episode_info,track)
logger.info("Downloading new episode")
try:
with yt_dlp.YoutubeDL(options) as episode:
episode.download(episode_url)
logger.debug("Download completed successfully")
except Exception as e:
logger.error(f"Failed to download episode: {e}", exc_info=True)
return
logger.info("Uploading episode via SFTP")
upload_via_sftp(f"perun-{episode_info['date']}.mp3")
logger.info("Sending release notification")
send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"])
logger.info("Workflow complete")
logger.info("Sending release notification")
send_notification_via_ssh(f"Perun episode {track} has been released",episode_info["title"])
logger.info("Workflow complete")
else:
logger.info("No new episode found, exiting.")
if __name__ == "__main__":
download_episode()

View File

@ -6,6 +6,8 @@ import os
from helper import return_string_as_html
from logger_handler import setup_logger
import json
import sponsorblock as sb
logger = setup_logger(__name__)
load_dotenv()
@ -65,7 +67,21 @@ def get_youtube_data(url: str) -> dict:
logger.debug(f"Fetched video data: {json.dumps(video_data, indent=4)}")
return video_data
def check_for_sponsorblock_segments(youtube_video:str) -> bool:
client = sb.Client()
try:
segments = client.get_skip_segments(youtube_video)
except sb.errors.NotFoundException:
logger.debug(f"No SponsorBlock information for video:{youtube_video}")
return False
if segments:
logger.debug(f"SponsorBlock segments found for video: {youtube_video}")
return True
else:
logger.debug(f"SponsorBlock returned empty segments for video: {youtube_video}")
return False
def return_download_options(information:dict,track:str)->dict:
download_options = {
"quiet": True,
@ -107,4 +123,7 @@ def return_download_options(information:dict,track:str)->dict:
"merge_output_format": "mp3"
}
logger.debug(f"Created download options:\n {json.dumps(download_options, indent=4)}")
return download_options
return download_options
if __name__ == "__main__":
print(check_for_sponsorblock_segments("https://www.youtube.com/watch?v=M0t8UYZ9rrQ"))