152 lines
4.5 KiB
Python
152 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from loguru import logger
|
|
|
|
from telegram import Update, Bot
|
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
|
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters
|
|
from telegram.ext import CallbackContext, DispatcherHandlerStop
|
|
from telegram.ext import CallbackQueryHandler
|
|
from telegram.utils.helpers import escape_markdown
|
|
|
|
from youtube_dl.utils import DownloadError
|
|
|
|
from sudoisytdl import __version__
|
|
from sudoisytdl import yt
|
|
from sudoisytdl import util
|
|
from sudoisytdl import config
|
|
|
|
dlmodes = {
|
|
"audio": "audio",
|
|
"video": "video",
|
|
"both": "both"
|
|
}
|
|
|
|
def get_url(msg):
|
|
for word in msg.split(" "):
|
|
if word.startswith("https://"):
|
|
return word
|
|
raise ValueError("please send/share a youtube link")
|
|
|
|
|
|
def get_user_name(user):
|
|
for param in ['username', 'first_name', 'id']:
|
|
name = getattr(user, param, False)
|
|
if name:
|
|
if param == "username":
|
|
return name
|
|
if param == "first_name":
|
|
# try to get the last name as well
|
|
return name + " " + gettr(user, "last_name", "")
|
|
else:
|
|
return name
|
|
|
|
def msg_me(text):
|
|
bot = Bot(token=config.TG_TOKEN)
|
|
bot.send_message(chat_id=config.MY_TG, text=text, parse_mode="markdown")
|
|
|
|
def notify_me(user, msg, loglevel="WARNING"):
|
|
username = get_user_name(user)
|
|
text = f"{username}: '{msg}'"
|
|
if user.id != config.MY_TG:
|
|
msg_me(text)
|
|
logger.log(loglevel, text)
|
|
|
|
def start(update: Update, context: CallbackContext) -> None:
|
|
update.message.reply_text('send me a youtube link')
|
|
|
|
def callback(update: Update, _: CallbackContext) -> None:
|
|
query = update.callback_query
|
|
username = get_user_name(query.from_user)
|
|
data = query.to_dict()['data'].split('$')
|
|
|
|
def error(msg: str) -> None:
|
|
logger.error(f"user: '{username}', error: '{msg}'")
|
|
query.answer(text=msg)
|
|
notify_me(msg)
|
|
|
|
try:
|
|
dlmode = dlmodes[data[1]]
|
|
except KeyError as e:
|
|
error("pick one")
|
|
raise DispatcherHandlerStop
|
|
|
|
try:
|
|
# stop the loading message in the client
|
|
query.answer()
|
|
query.edit_message_text(text=f"downloading {dlmode}..")
|
|
|
|
logger.info(f"{username}: {dlmode} of '{data[0]}'")
|
|
dl = yt.download(data[0], dlmode, username=username)
|
|
|
|
# copy files
|
|
dl_urls = dict()
|
|
for k, a in dl['files'].items():
|
|
dl_url = util.copy_to_webdir(a)
|
|
dl_urls[k] = dl_url
|
|
|
|
md_links = [f"[{k}]({dl_urls[k]})" for k in sorted(dl_urls)]
|
|
msg = (f"*{escape_markdown(dl['name'])}* \n\n"
|
|
f"download: {' | '.join(md_links)} (valid 1h)"
|
|
)
|
|
logger.info(msg)
|
|
|
|
query.message.reply_text(msg, parse_mode="markdown")
|
|
|
|
except DownloadError as e:
|
|
if "is not a valid URL" in str(e) or "Unsupported URL" in str(e):
|
|
error("that wasnt a youtube link")
|
|
else:
|
|
error("error downloading, maybe ask ben")
|
|
raise DispatcherHandlerStop
|
|
|
|
logger.success(f"{username} downloaded {dlmode} for {data[0]}")
|
|
|
|
|
|
|
|
def handle_link(update: Update, context: CallbackContext) -> None:
|
|
username = get_user_name(update.message.from_user)
|
|
|
|
notify_me(update.message.from_user, update.message.text)
|
|
|
|
try:
|
|
url = get_url(update.message.text)
|
|
except ValueError as e:
|
|
update.message.reply_text(str(e))
|
|
notify_me(update.message.from_user, update.message.text, "ERROR")
|
|
raise DispatcherHandlerStop
|
|
|
|
|
|
keyboard = [[
|
|
InlineKeyboardButton(
|
|
mode, callback_data=f"{url}${mode}"
|
|
) for mode in dlmodes.keys() ]]
|
|
ikm = InlineKeyboardMarkup(keyboard)
|
|
|
|
update.message.reply_text("audio or video?", reply_markup=ikm)
|
|
raise DispatcherHandlerStop
|
|
|
|
def cleaner(context: CallbackContext) -> None:
|
|
util.remove_expired_from_webdir(config.EXPIRE_AFTER_MINS)
|
|
|
|
def start_bot():
|
|
updater = Updater(config.TG_TOKEN, use_context=True)
|
|
dispatcher = updater.dispatcher
|
|
job_queue = updater.job_queue
|
|
|
|
logger.info(f"web links are exired after {config.EXPIRE_AFTER_MINS}m")
|
|
dispatcher.add_handler(CommandHandler("start", start))
|
|
dispatcher.add_handler(
|
|
MessageHandler(Filters.text & ~Filters.command, handle_link)
|
|
)
|
|
dispatcher.add_handler(CallbackQueryHandler(callback))
|
|
|
|
job_queue.run_repeating(cleaner, interval=60, first=1)
|
|
|
|
version = f"sudoisytdl {__version__}"
|
|
logger.info(version)
|
|
msg_me(text=version)
|
|
|
|
updater.start_polling()
|
|
updater.idle()
|