196 lines
6.0 KiB
Django/Jinja
196 lines
6.0 KiB
Django/Jinja
#!/usr/bin/env python3
|
|
#
|
|
|
|
import argparse
|
|
import json
|
|
from datetime import datetime
|
|
import time
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import urllib.parse
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
|
|
from mutagen.mp3 import MP3
|
|
from mutagen.mp4 import MP4
|
|
import requests
|
|
import feedparser
|
|
from podgen import Podcast, Person, Media, Category, htmlencode
|
|
from loguru import logger
|
|
|
|
def generate_feed(rss, entries, name, path):
|
|
feed = rss['feed']
|
|
|
|
p = Podcast()
|
|
p.name = f"BK: {feed['title']}"
|
|
p.authors.append(Person(feed['author'], "icloud@sudo.is"))
|
|
p.website = f"https://{{ archives_url }}/pub/podcasts/{name}"
|
|
p.copyright = 'cc-by'
|
|
p.description = f"desc: {feed['title']}"
|
|
p.language = 'en-US'
|
|
p.feed_url = f"https://{{ archives_url }}/{{ podcasts }}/{name}/podcast.rss"
|
|
p.category = Category('Science', 'Chemistry')
|
|
p.explicit = False
|
|
p.complete = False
|
|
#p.new_feed_url = 'http://example.com/new-feed.rss'
|
|
p.owner = Person("Ben K", 'icloud@sudo.is')
|
|
p.withhold_from_itunes = True
|
|
|
|
#p.xslt = "http://example.com/stylesheet.xsl"
|
|
|
|
for entry in reversed(entries):
|
|
try:
|
|
filename = urllib.parse.quote(entry['filename'])
|
|
except KeyError:
|
|
logger.error(f"missing audio file for episode {entry['title']}")
|
|
continue
|
|
|
|
urltitle = urllib.parse.quote(entry['title'])
|
|
|
|
e = p.add_episode()
|
|
e.id = entry['id']
|
|
e.title = entry['title']
|
|
e.link = f"https://{{ archives_url }}/pub/podcast/{name}/{urltitle}"
|
|
e.summary = entry['summary']
|
|
|
|
#e.authors = [Person(feed['author'], "icloud@sudo.is")]
|
|
published = entry['published_parsed'][:6]
|
|
e.publication_date = datetime(*published, tzinfo=pytz.utc)
|
|
|
|
|
|
medialink = make_link_to_file(name, urltitle, filename)
|
|
#medialink = make_link_to_file(name, entry['title'], filename)
|
|
|
|
full_path_file = os.path.join(path, entry['title'], entry['filename'])
|
|
|
|
|
|
length = os.stat(full_path_file).st_size
|
|
|
|
if os.path.splitext(entry['filename'])[1].lower() == ".m4a":
|
|
audio = MP4(full_path_file)
|
|
else:
|
|
audio = MP3(full_path_file)
|
|
|
|
duration = audio.info.length
|
|
e.media = Media(
|
|
medialink,
|
|
length,
|
|
duration=timedelta(seconds=duration)
|
|
)
|
|
|
|
|
|
rss_file = os.path.join(path, 'podcast.rss')
|
|
logger.debug(f"wrote '{rss_file}'")
|
|
# Write to file
|
|
p.rss_file(rss_file, minimize=False)
|
|
|
|
|
|
def make_link_to_file(name, title, filename):
|
|
return f"https://{{ archives_url }}/{{ podcasts }}/{name}/{title}/{filename}"
|
|
|
|
def get_file_url(links):
|
|
for item in links:
|
|
if item['rel'] != "alternate" and item['type'] != "text/html":
|
|
return item['href']
|
|
|
|
else:
|
|
logger.error("\n" + json.dumps(links, indent=2))
|
|
raise RuntimeError(f"no mp3 url found")
|
|
|
|
def download_file(url, dest):
|
|
subprocess.run(['wget', '-c', '--content-disposition', '-q', '-P', dest, url])
|
|
|
|
def download_episode(item, path):
|
|
podcastpath = os.path.join(path, item['title'])
|
|
url = get_file_url(item['links'])
|
|
|
|
if os.path.exists(podcastpath):
|
|
logger.debug(f"already downloaded: '{item['title']}'")
|
|
infopath = os.path.join(podcastpath, 'info.json')
|
|
with open(infopath, 'r') as f:
|
|
jstr = f.read()
|
|
|
|
return json.loads(jstr)
|
|
|
|
else:
|
|
os.makedirs(podcastpath, exist_ok=False)
|
|
try:
|
|
download_file(get_file_url(item['links']), podcastpath)
|
|
except RuntimeError as f:
|
|
logger.error(f"{item['title']}: {e}")
|
|
# crash
|
|
raise
|
|
|
|
logger.info(f"downloaded: '{item['title']}'")
|
|
|
|
published = datetime(*item['published_parsed'][0:7])
|
|
ctime = time.mktime(published.timetuple())
|
|
|
|
for f in os.listdir(podcastpath):
|
|
if '?' in f:
|
|
just_filename = f.split("?")[0]
|
|
|
|
old = os.path.join(podcastpath, f)
|
|
new = os.path.join(podcastpath, just_filename)
|
|
os.rename(old, new)
|
|
os.utime(new, (ctime, ctime))
|
|
|
|
logger.info(just_filename)
|
|
item['filename'] = just_filename
|
|
else:
|
|
os.utime(os.path.join(podcastpath, f), (ctime, ctime))
|
|
if f != "info.json":
|
|
logger.info(f)
|
|
item['filename'] = f
|
|
|
|
with open(os.path.join(podcastpath, 'info.json'), 'w') as f:
|
|
f.write(json.dumps(item, indent=2))
|
|
|
|
os.utime(podcastpath, (ctime, ctime))
|
|
return item
|
|
|
|
|
|
@logger.catch
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('name', type=str, help="podcast name in config")
|
|
parser.add_argument('--debug', action="store_true", help="print DEBUG to stderr")
|
|
args = parser.parse_args()
|
|
|
|
with open('/usr/local/etc/podcast.json', 'r') as f:
|
|
try:
|
|
podcast_config = json.load(f)[args.name]
|
|
except KeyError:
|
|
logger.error(f"no podcast '{args.name}' configured")
|
|
raise SystemExit(1)
|
|
|
|
if not args.debug:
|
|
logger.remove()
|
|
logger.add(sys.stderr, level=podcast_config.get("stderr_loglevel", "INFO"))
|
|
else:
|
|
# do nothing, --debug causes DEBUG logs to stderr
|
|
logger.debug(f"{args.name}: {podcast_config['path']}")
|
|
|
|
if 'logfile' in podcast_config:
|
|
logfile_loglevel = podcast_config.get("logfile_loglevel", "INFO")
|
|
logger.add(podcast_config['logfile'], level=logfile_loglevel)
|
|
|
|
|
|
os.makedirs(podcast_config['path'], exist_ok=True)
|
|
|
|
rss = feedparser.parse(podcast_config['rss_url'])
|
|
with open(os.path.join(podcast_config['path'], "rss-feed.json"), 'w') as f:
|
|
f.write(json.dumps(dict(rss), indent=2))
|
|
|
|
# with filenames!
|
|
entries = []
|
|
for item in rss['entries']:
|
|
info = download_episode(item, podcast_config['path'])
|
|
entries.append(info)
|
|
|
|
generate_feed(rss, entries, args.name, podcast_config['path'])
|
|
|
|
if __name__ == "__main__":
|
|
main()
|