rffmpeg/rffmpeg

1021 lines
36 KiB
Python
Executable File

#!/usr/bin/env python3
# rffmpeg.py - Remote FFMPEG transcoding wrapper
#
# Copyright (C) 2019-2022 Joshua M. Boniface <joshua@boniface.me>
# and Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
###############################################################################
import click
import logging
import os
import signal
import sys
import shlex
import yaml
from contextlib import contextmanager
from grp import getgrnam
from pathlib import Path
from pwd import getpwnam
from re import search
from sqlite3 import connect as sqlite_connect
from subprocess import run, PIPE
from time import sleep
from datetime import datetime
# Set up the logger
log = logging.getLogger("rffmpeg")
# Use Postgresql if specified, otherwise use SQLite
DB_TYPE = "SQLITE"
SQL_VAR_SIGN = "?"
SQL_PRIMARY_KEY="INTEGER"
SQL_DATE_TIME="DATETIME"
POSTGRES_DB = os.environ.get("RFFMPEG_POSTGRES_DB", "rffmpeg")
POSTGRES_USER = os.environ.get("RFFMPEG_POSTGRES_USER")
POSTGRES_PASS = os.environ.get("RFFMPEG_POSTGRES_PASS", "")
POSTGRES_PORT = os.environ.get("RFFMPEG_POSTGRES_PORT", "5432")
POSTGRES_HOST = os.environ.get("RFFMPEG_POSTGRES_HOST", "localhost")
if POSTGRES_USER != None:
DB_TYPE = "POSTGRES"
SQL_VAR_SIGN = "%s"
SQL_PRIMARY_KEY="SERIAL"
SQL_DATE_TIME="TIMESTAMP"
POSTGRES_CREDENTIALS = {
"dbname": POSTGRES_DB,
"user": POSTGRES_USER,
"password": POSTGRES_PASS,
"port": int(POSTGRES_PORT),
"host": POSTGRES_HOST,
}
from psycopg2 import connect as postgres_connect
from psycopg2 import DatabaseError as postgres_error
# Open a database connection (context manager)
@contextmanager
def dbconn(config, init = False):
"""
Open a database connection.
"""
if DB_TYPE == "SQLITE":
if not init and not Path(config["db_path"]).is_file():
fail(f"Failed to find database '{config['db_path']}' - did you forget to run 'rffmpeg init' or add all env vars for Postgresql?")
log.debug("Using SQLite as database.")
conn = sqlite_connect(config["db_path"])
conn.execute("PRAGMA foreign_keys = 1")
cur = conn.cursor()
yield cur
conn.commit()
conn.close()
log.debug("SQLite connection closed.")
elif DB_TYPE == "POSTGRES":
conn = None
try:
log.debug("Using Postgresql as database. Connecting...")
conn = postgres_connect(**POSTGRES_CREDENTIALS)
cur = conn.cursor()
cur.execute("SELECT version()")
db_version = cur.fetchone()
log.debug(f"Connected to Postgresql version {db_version}")
yield cur
conn.commit()
except (Exception, postgres_error) as error:
print(error)
log.error(error)
finally:
if conn is not None:
conn.close()
log.debug("Postgresql connection closed.")
def fail(msg):
"""
Output an error message and terminate.
"""
log.error(msg)
exit(1)
def load_config():
"""
Parse the YAML configuration file (either /etc/rffmpeg/rffmpeg.yml or specified by the envvar
RFFMPEG_CONFIG) and return a standard dictionary of configuration values.
"""
default_config_file = "/etc/rffmpeg/rffmpeg.yml"
config_file = os.environ.get("RFFMPEG_CONFIG", default_config_file)
if not Path(config_file).is_file():
log.info(f"No config found in {config_file}. Using default settings.")
o_config = {
"rffmpeg": {"logging": {}, "directories": {}, "remote": {}, "commands": {}}
}
else:
with open(config_file, "r") as cfgfh:
try:
o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader)
except Exception as e:
fail(f"Failed to parse configuration file: {e}")
config = dict()
# Parse the base group ("rffmpeg")
config_base = o_config.get("rffmpeg", dict())
if not config_base:
fail("Failed to parse configuration file top level key 'rffmpeg'.")
# Parse the logging group ("rffmpeg" -> "logging")
config_logging = config_base.get("logging", dict())
if config_logging is None:
config_logging = dict()
# Parse the directories group ("rffmpeg" -> "directories")
config_directories = config_base.get("directories", dict())
if config_directories is None:
config_directories = dict()
# Parse the remote group ("rffmpeg" -> "remote")
config_remote = config_base.get("remote", dict())
if config_remote is None:
config_remote = dict()
# Parse the commands group ("rffmpeg" -> "commands")
config_commands = config_base.get("commands", dict())
if config_commands is None:
config_commands = dict()
# Parse the keys from the logging group
config["log_to_file"] = config_logging.get("log_to_file", True)
config["logfile"] = config_logging.get("logfile", "/var/log/jellyfin/rffmpeg.log")
config["datedlogfiles"] = config_logging.get("datedlogfiles", False)
if config["datedlogfiles"] is True:
config["datedlogdir"] = config_logging.get("datedlogdir", "/var/log/jellyfin")
config["logfile"] = (
config["datedlogdir"]
+ "/"
+ datetime.today().strftime("%Y%m%d")
+ "_rffmpeg.log"
)
config["logdebug"] = config_logging.get("debug", False)
# Parse the keys from the state group
config["state_dir"] = config_directories.get("state", "/var/lib/rffmpeg")
config["persist_dir"] = config_directories.get("persist", "/run/shm")
config["dir_owner"] = config_directories.get("owner", "jellyfin")
config["dir_group"] = config_directories.get("group", "sudo")
# Parse the keys from the remote group
config["remote_user"] = config_remote.get("user", "")
config["remote_args"] = config_remote.get(
"args", ["-i", "/var/lib/jellyfin/.ssh/id_rsa"]
)
if config["remote_args"] is None:
config["remote_args"] = []
config["persist_time"] = config_remote.get("persist", 300)
# Parse the keys from the commands group
config["ssh_command"] = config_commands.get("ssh", "/usr/bin/ssh")
config["pre_commands"] = config_commands.get("pre", [])
if config["pre_commands"] is None:
config["pre_commands"] = []
config["ffmpeg_command"] = config_commands.get(
"ffmpeg", "/usr/lib/jellyfin-ffmpeg/ffmpeg"
)
config["ffprobe_command"] = config_commands.get(
"ffprobe", "/usr/lib/jellyfin-ffmpeg/ffprobe"
)
config["fallback_ffmpeg_command"] = config_commands.get(
"fallback_ffmpeg", "/usr/lib/jellyfin-ffmpeg/ffmpeg"
)
config["fallback_ffprobe_command"] = config_commands.get(
"fallback_ffprobe", "/usr/lib/jellyfin-ffmpeg/ffprobe"
)
# Set the database path
config["db_path"] = config["state_dir"] + "/rffmpeg.db"
# Set a list of special flags that cause different behaviour
config["special_flags"] = [
"-version",
"-encoders",
"-decoders",
"-hwaccels",
"-filters",
"-h",
"-muxers",
"-fp_format",
] + config_commands.get("special_flags", [])
# Set the current PID of this process
config["current_pid"] = os.getpid()
return config
def cleanup(signum="", frame=""):
"""
Clean up this processes stored transient data.
"""
global config
with dbconn(config) as cur:
cur.execute(
f"DELETE FROM states WHERE process_id = {SQL_VAR_SIGN}",
(config["current_pid"],),
)
cur.execute(
f"DELETE FROM processes WHERE process_id = {SQL_VAR_SIGN}",
(config["current_pid"],),
)
def generate_ssh_command(config, target_hostname):
"""
Generate an SSH command for use.
"""
ssh_command = list()
# Add SSH component
ssh_command.append(config["ssh_command"])
ssh_command.append("-q")
ssh_command.append("-t")
# Set our connection details
ssh_command.extend(["-o", "ConnectTimeout=1"])
ssh_command.extend(["-o", "ConnectionAttempts=1"])
ssh_command.extend(["-o", "StrictHostKeyChecking=no"])
ssh_command.extend(["-o", "UserKnownHostsFile=/dev/null"])
# Use SSH control persistence to keep sessions alive for subsequent commands
if config["persist_time"] > 0:
ssh_command.extend(["-o", "ControlMaster=auto"])
ssh_command.extend(
["-o", f"ControlPath={config['persist_dir']}/ssh-%r@%h:%p"]
)
ssh_command.extend(["-o", f"ControlPersist={config['persist_time']}"])
# Add the remote config args
for arg in config["remote_args"]:
if arg:
ssh_command.append(arg)
# Add user+host string
ssh_command.append(f"{config['remote_user']}@{target_hostname}")
return ssh_command
def run_command(command, stdin, stdout, stderr):
"""
Execute the command using subprocess.
"""
p = run(
command,
shell=False,
bufsize=0,
universal_newlines=True,
stdin=stdin,
stdout=stdout,
stderr=stderr,
)
return p
def get_target_host(config):
"""
Determine an optimal target host via data on currently active processes and states.
"""
log.debug("Determining optimal target host")
# Select all hosts and active processes from the database
with dbconn(config) as cur:
cur.execute("SELECT * FROM hosts")
hosts = cur.fetchall()
cur.execute("SELECT * FROM processes")
processes = cur.fetchall()
# Generate a mapping dictionary of hosts and processes
host_mappings = dict()
for host in hosts:
hid, servername, hostname, weight, created = host
# Get the latest state
with dbconn(config) as cur:
cur.execute(
f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC",
(hid,),
)
current_state = cur.fetchone()
if not current_state:
current_state = "idle"
marking_pid = "N/A"
else:
sid, host_id, process_id, state = current_state
current_state = state
marking_pid = process_id
# Create the mappings entry
host_mappings[hid] = {
"hostname": hostname,
"weight": weight,
"servername": servername,
"current_state": current_state,
"marking_pid": marking_pid,
"commands": [proc[2] for proc in processes if proc[1] == hid],
}
lowest_count = 9999
target_hid = None
target_hostname = None
target_servername = None
# For each host in the mapping, let's determine if it is suitable
for hid, host in host_mappings.items():
log.debug(f"Trying host ID {hid} '{host['hostname']}'")
# If it's marked as bad, continue
if host["current_state"] == "bad":
log.debug(f"Host previously marked bad by PID {host['marking_pid']}")
continue
# Try to connect to the host and run a very quick command to determine if it is workable
if host["hostname"] not in ["localhost", "127.0.0.1"]:
log.debug("Running SSH test")
test_ssh_command = generate_ssh_command(config, host["hostname"])
test_ssh_command.remove("-q")
test_ssh_command = [arg.replace('@', '', 1) if arg.startswith('@') else arg for arg in test_ssh_command]
test_ffmpeg_command = [config["ffmpeg_command"], "-version"]
ret = run_command(test_ssh_command + test_ffmpeg_command, PIPE, PIPE, PIPE)
if ret.returncode != 0:
# Mark the host as bad
log.warning(f"Marking host {host['hostname']} ({host['servername']}) as bad due to retcode {ret.returncode}")
log.debug(f"SSH test command was: {' '.join(test_ssh_command + test_ffmpeg_command)}")
log.debug(f"SSH test command stdout: {ret.stdout}")
log.debug(f"SSH test command stderr: {ret.stderr}")
with dbconn(config) as cur:
cur.execute(
f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(hid, config["current_pid"], "bad"),
)
continue
log.debug(f"SSH test succeeded with retcode {ret.returncode}")
# If the host state is idle, we can use it immediately
if host["current_state"] == "idle":
target_hid = hid
target_hostname = host["hostname"]
target_servername = host["servername"]
log.debug("Selecting host as idle")
break
# Get the modified count of the host
raw_proc_count = len(host["commands"])
weighted_proc_count = raw_proc_count // host["weight"]
# If this host is currently the least used, provisionally set it as the target
if weighted_proc_count < lowest_count:
lowest_count = weighted_proc_count
target_hid = hid
target_hostname = host["hostname"]
target_servername = host["servername"]
log.debug(f"Selecting host as current lowest proc count (raw count: {raw_proc_count}, weighted count: {weighted_proc_count})")
log.debug(f"Found optimal host ID {target_hid} '{target_hostname}' ({target_servername})")
return target_hid, target_hostname, target_servername
def run_local_command(config, command, command_args, stderr_as_stdout = False, mapped_cmd = None):
"""
Run command locally, either because "localhost" is the target host, or because no good target
host was found by get_target_host().
"""
rffmpeg_command = [mapped_cmd or command]
# Prepare our default stdin/stdout/stderr
stdin = sys.stdin
stderr = sys.stderr
if stderr_as_stdout:
stdout = sys.stderr
else:
stdout = sys.stdout
# Append all the passed arguments directly
for arg in command_args:
rffmpeg_command.append(f"{arg}")
log.info("Running command on host 'localhost'")
log.debug(f"Local command: {' '.join(rffmpeg_command)}")
with dbconn(config) as cur:
cur.execute(
f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(0, config["current_pid"], command + ' ' + ' '.join(command_args)),
)
cur.execute(
f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(0, config["current_pid"], "active"),
)
return run_command(rffmpeg_command, stdin, stdout, stderr)
def run_local_ffmpeg(config, ffmpeg_args):
"""
Run ffmpeg locally, either because "localhost" is the target host, or because no good target
host was found by get_target_host().
"""
if "ffprobe" in cmd_name:
return run_local_command(config, cmd_name, ffmpeg_args, mapped_cmd=config["fallback_ffprobe_command"])
else:
return run_local_command(config, cmd_name, ffmpeg_args, stderr_as_stdout=not any(item in config["special_flags"] for item in ffmpeg_args), mapped_cmd=config["fallback_ffmpeg_command"])
def run_remote_command(
config, target_hid, target_hostname, target_servername, command, command_args, stderr_as_stdout = False, mapped_cmd = None, pre_commands = []
):
"""
Run command against the remote target_hostname.
"""
rffmpeg_ssh_command = generate_ssh_command(config, target_hostname)
rffmpeg_ssh_command = [arg.replace('@', '', 1) if arg.startswith('@') else arg for arg in rffmpeg_ssh_command]
rffmpeg_command = list()
# Add any pre commands
for cmd in pre_commands:
if cmd:
rffmpeg_command.append(cmd)
rffmpeg_command.append(mapped_cmd or command)
# Prepare our default stdin/stderr
stdin = sys.stdin
stderr = sys.stderr
if stderr_as_stdout:
stdout = sys.stderr
else:
stdout = sys.stdout
rffmpeg_command.extend(map(shlex.quote, command_args))
log.info(f"Running command on host '{target_hostname}' ({target_servername})")
log.debug(f"Remote command: {' '.join(rffmpeg_ssh_command + rffmpeg_command)}")
with dbconn(config) as cur:
cur.execute(
f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(target_hid, config["current_pid"], command + ' ' + ' '.join(command_args)),
)
cur.execute(
f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(target_hid, config["current_pid"], "active"),
)
return run_command(
rffmpeg_ssh_command + rffmpeg_command, stdin, stdout, stderr
)
def run_remote_ffmpeg(
config, target_hid, target_hostname, target_servername, ffmpeg_args
):
"""
Run ffmpeg against the remote target_hostname.
"""
if "ffprobe" in cmd_name:
# If we're in ffprobe mode use that command and sys.stdout as stdout
return run_remote_command(config, target_hid, target_hostname, target_servername, cmd_name, ffmpeg_args, mapped_cmd=config["ffprobe_command"], pre_commands=config["pre_commands"])
else:
# Otherwise, we use stderr as stdout
return run_remote_command(config, target_hid, target_hostname, target_servername, cmd_name, ffmpeg_args, stderr_as_stdout=not any(item in config["special_flags"] for item in ffmpeg_args), mapped_cmd=config["ffmpeg_command"], pre_commands=config["pre_commands"])
def setup_logging(config):
"""
Set up logging.
"""
if config["logdebug"] is True:
logging_level = logging.DEBUG
else:
logging_level = logging.INFO
if config["log_to_file"]:
logging.basicConfig(
filename=config["logfile"],
level=logging_level,
format="%(asctime)s - %(name)s[%(process)s] - %(levelname)s - %(message)s",
)
else:
logging.basicConfig(
level=logging_level,
format="%(asctime)s - %(name)s[%(process)s] - %(levelname)s - %(message)s",
)
def hook_signals():
signal.signal(signal.SIGTERM, cleanup)
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGQUIT, cleanup)
signal.signal(signal.SIGHUP, cleanup)
def run_ffmpeg(config, ffmpeg_args):
"""
Entrypoint for an ffmpeg/ffprobe aliased process.
"""
hook_signals()
setup_logging(config)
log.info(f"Starting rffmpeg as {cmd_name} with args: {' '.join(ffmpeg_args)}")
target_hid, target_hostname, target_servername = get_target_host(config)
if not target_hostname or target_hostname == "localhost":
ret = run_local_ffmpeg(config, ffmpeg_args)
else:
ret = run_remote_ffmpeg(
config, target_hid, target_hostname, target_servername, ffmpeg_args
)
cleanup()
if ret.returncode == 0:
log.info(f"Finished rffmpeg with return code {ret.returncode}")
else:
log.error(f"Finished rffmpeg with return code {ret.returncode}")
exit(ret.returncode)
def run_control(config):
"""
Entrypoint for the Click CLI for managing the rffmpeg system.
"""
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"], max_content_width=120)
@click.group(context_settings=CONTEXT_SETTINGS)
def rffmpeg_click():
"""
rffmpeg CLI interface
"""
if (
not Path(config["state_dir"]).is_dir()
or not Path(config["db_path"]).is_file()
):
return
# List all DB migrations here
did_alter_0001AddServername = False
# Check conditions for migrations
with dbconn(config) as cur:
# Migration for new servername (PR #36)
cur.execute(
"SELECT COUNT(*) AS CNTREC FROM pragma_table_info('hosts') WHERE name='servername'"
)
if cur.fetchone()[0] == 0:
cur.execute(
"ALTER TABLE hosts ADD servername TEXT NOT NULL DEFAULT 'invalid'"
)
did_alter_0001AddServername = True
# Migration for new servername (PR #36)
if did_alter_0001AddServername:
with dbconn(config) as cur:
cur.execute("SELECT * FROM hosts")
for host in cur.fetchall():
hid, servername, hostname, weight, created = host
if servername == "invalid":
cur.execute(
f"UPDATE hosts SET servername = {SQL_VAR_SIGN} WHERE hostname = {SQL_VAR_SIGN}",
(hostname, hostname),
)
@click.command(name="init", short_help="Initialize the system.")
@click.option(
"-y",
"--yes",
"confirm_flag",
is_flag=True,
default=False,
help="Confirm initialization.",
)
def rffmpeg_click_init(confirm_flag):
"""
Initialize the rffmpeg system and database; this will erase all hosts and current state.
This command should be run as "sudo" before any attempts to use rffmpeg.
"""
if os.getuid() != 0:
click.echo("Error: This command requires root privileges.")
exit(1)
if not confirm_flag:
try:
click.confirm(
"Are you sure you want to (re)initalize the database",
prompt_suffix="? ",
abort=True,
)
except Exception:
fail("Aborting due to failed confirmation.")
click.echo("Initializing database")
if not Path(config["state_dir"]).is_dir():
try:
os.makedirs(config["state_dir"])
except OSError as e:
fail(f"Failed to create state directory '{config['state_dir']}': {e}")
try:
with dbconn(config, True) as cur:
cur.execute("DROP TABLE IF EXISTS hosts")
cur.execute("DROP TABLE IF EXISTS processes")
cur.execute("DROP TABLE IF EXISTS states")
cur.execute(f"CREATE TABLE hosts (id {SQL_PRIMARY_KEY} PRIMARY KEY, servername TEXT NOT NULL UNIQUE, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, created {SQL_DATE_TIME} NOT NULL)")
cur.execute(f"CREATE TABLE processes (id {SQL_PRIMARY_KEY} PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)")
cur.execute(f"CREATE TABLE states (id {SQL_PRIMARY_KEY} PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)")
except Exception as e:
fail(f"Failed to create database: {e}")
os.chown(
config["state_dir"],
getpwnam(config["dir_owner"]).pw_uid,
getgrnam(config["dir_group"]).gr_gid,
)
os.chmod(config["state_dir"], 0o770)
if DB_TYPE == "SQLITE":
os.chown(
config["db_path"],
getpwnam(config["dir_owner"]).pw_uid,
getgrnam(config["dir_group"]).gr_gid,
)
os.chmod(config["db_path"], 0o660)
rffmpeg_click.add_command(rffmpeg_click_init)
@click.command(name="status", short_help="Show hosts and status.")
def rffmpeg_click_status():
"""
Show the current status of all rffmpeg target hosts and active processes.
"""
with dbconn(config) as cur:
cur.execute("SELECT * FROM hosts")
hosts = cur.fetchall()
cur.execute("SELECT * FROM processes")
processes = cur.fetchall()
cur.execute("SELECT * FROM states")
states = cur.fetchall()
# Determine if there are any fallback processes running
fallback_processes = list()
for process in processes:
pid, host_id, process_id, cmd = process
if host_id == 0:
fallback_processes.append(process)
# Generate a mapping dictionary of hosts and processes
host_mappings = dict()
if len(fallback_processes) > 0:
host_mappings[0] = {
"hostname": "localhost (fallback)",
"servername": "localhost (fallback)",
"weight": 0,
"current_state": "fallback",
"commands": fallback_processes,
}
for host in hosts:
hid, servername, hostname, weight, created = host
# Get the latest state
with dbconn(config) as cur:
cur.execute(
f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC",
(hid,),
)
current_state = cur.fetchone()
if not current_state:
current_state = "idle"
else:
sid, host_id, process_id, state = current_state
current_state = state
# Create the mappings entry
host_mappings[hid] = {
"hostname": hostname,
"servername": servername,
"weight": weight,
"current_state": current_state,
"commands": [proc for proc in processes if proc[1] == hid],
}
hostname_length = 9
servername_length = 11
hid_length = 3
weight_length = 7
state_length = 6
for hid, host in host_mappings.items():
if len(host["hostname"]) + 1 > hostname_length:
hostname_length = len(host["hostname"]) + 1
if len(host["servername"]) + 1 > servername_length:
servername_length = len(host["servername"]) + 1
if len(str(hid)) + 1 > hid_length:
hid_length = len(str(hid)) + 1
if len(str(host["weight"])) + 1 > weight_length:
weight_length = len(str(host["weight"])) + 1
if len(host["current_state"]) + 1 > state_length:
state_length = len(host["current_state"]) + 1
output = list()
output.append(
"{bold}{hostname: <{hostname_length}} {servername: <{servername_length}} {hid: <{hid_length}} {weight: <{weight_length}} {state: <{state_length}} {commands}{end_bold}".format(
bold="\033[1m",
end_bold="\033[0m",
hostname="Hostname",
hostname_length=hostname_length,
servername="Servername",
servername_length=servername_length,
hid="ID",
hid_length=hid_length,
weight="Weight",
weight_length=weight_length,
state="State",
state_length=state_length,
commands="Active Commands",
)
)
for hid, host in host_mappings.items():
if len(host["commands"]) < 1:
first_command = "N/A"
else:
first_command = f"PID {host['commands'][0][2]}: {host['commands'][0][3]}"
host_entry = list()
host_entry.append(
"{hostname: <{hostname_length}} {servername: <{servername_length}} {hid: <{hid_length}} {weight: <{weight_length}} {state: <{state_length}} {commands}".format(
hostname=host["hostname"],
hostname_length=hostname_length,
servername=host["servername"],
servername_length=servername_length,
hid=hid,
hid_length=hid_length,
weight=host["weight"],
weight_length=weight_length,
state=host["current_state"],
state_length=state_length,
commands=first_command,
)
)
for idx, command in enumerate(host["commands"]):
if idx == 0:
continue
host_entry.append(
"{hostname: <{hostname_length}} {servername: <{servername_length}} {hid: <{hid_length}} {weight: <{weight_length}} {state: <{state_length}} {commands}".format(
hostname="",
hostname_length=hostname_length,
servername="",
servername_length=servername_length,
hid="",
hid_length=hid_length,
weight="",
weight_length=weight_length,
state="",
state_length=state_length,
commands=f"PID {command[2]}: {command[3]}",
)
)
output.append("\n".join(host_entry))
click.echo("\n".join(output))
rffmpeg_click.add_command(rffmpeg_click_status)
@click.command(name="add", short_help="Add a host.")
@click.option(
"-w",
"--weight",
"weight",
required=False,
default=1,
help="The weight of the host.",
)
@click.option(
"-n",
"--name",
"name",
required=False,
default=None,
help="The name of the host (if different from the HOST).",
)
@click.argument("host")
def rffmpeg_click_add(weight, name, host):
"""
Add a new host with IP or hostname HOST to the database.
"""
created = datetime.now()
if name is None:
name = host
click.echo(f"Adding new host '{host}' ({name})")
with dbconn(config) as cur:
cur.execute(
f"INSERT INTO hosts (servername, hostname, weight, created) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(name, host, weight, created),
)
rffmpeg_click.add_command(rffmpeg_click_add)
@click.command(name="remove", short_help="Remove a host.")
@click.argument("host")
def rffmpeg_click_remove(host):
"""
Remove a host with internal ID/IP/hostname/servername HOST from the database.
"""
try:
host = int(host)
field = "id"
except ValueError:
field = "servername"
fieldAlt = "hostname"
with dbconn(config) as cur:
cur.execute(f"SELECT * FROM hosts WHERE {field} = {SQL_VAR_SIGN}", (host,))
entry = cur.fetchall()
if len(entry) < 1:
cur.execute(
f"SELECT * FROM hosts WHERE {fieldAlt} = {SQL_VAR_SIGN}", (host,)
)
entry = cur.fetchall()
if len(entry) < 1:
fail("No hosts found to delete!")
if len(entry) == 1:
click.echo(f"Removing {len(entry)} host:")
else:
click.echo(f"Removing {len(entry)} hosts:")
for host in entry:
hid, servername, hostname, weight, created = host
click.echo(f"\tID: {hid}\tHostname: {hostname}\tServername: {servername}")
cur.execute(f"DELETE FROM hosts WHERE id = {SQL_VAR_SIGN}", (hid,))
rffmpeg_click.add_command(rffmpeg_click_remove)
@click.command(name="run", short_help="Run a command.", context_settings={
"ignore_unknown_options": True
})
@click.option("--stderr-as-stdout", "stderr_as_stdout", is_flag=True, default=False, help="Use stderr as stdout for the command.")
@click.argument('full_command', nargs=-1, type=click.UNPROCESSED)
def rffmpeg_click_run(stderr_as_stdout, full_command):
"""
Run a command on the optimal host.
"""
hook_signals()
setup_logging(config)
command = full_command[0]
command_args = full_command[1:]
log.info(f"Starting rffmpeg as {command} with args: {' '.join(command_args)}")
target_hid, target_hostname, target_servername = get_target_host(config)
if not target_hostname or target_hostname == "localhost":
ret = run_local_command(config, command, command_args)
else:
ret = run_remote_command(
config,
target_hid,
target_hostname,
target_servername,
command,
command_args,
stderr_as_stdout=stderr_as_stdout
)
cleanup()
if ret.returncode == 0:
log.info(f"Finished rffmpeg with return code {ret.returncode}")
else:
log.error(f"Finished rffmpeg with return code {ret.returncode}")
exit(ret.returncode)
rffmpeg_click.add_command(rffmpeg_click_run)
@click.command(name="log", short_help="View the rffmpeg log.")
@click.option(
"-f",
"--follow",
"follow_flag",
is_flag=True,
default=False,
help="Follow new log entries instead of seeing current log entries.",
)
def rffmpeg_click_log(follow_flag):
"""
View the rffmpeg log file.
"""
if follow_flag:
with open(config["logfile"]) as file_:
# Go to the end of file
file_.seek(0, 2)
while True:
curr_position = file_.tell()
line = file_.readline()
if not line:
file_.seek(curr_position)
sleep(0.1)
else:
click.echo(line, nl=False)
else:
with open(config["logfile"], "r") as logfh:
click.echo_via_pager(logfh.readlines())
rffmpeg_click.add_command(rffmpeg_click_log)
@click.command(name="clear", short_help="Clear processes and states.")
@click.argument("host", required=False, default=None)
def rffmpeg_click_log(host):
"""
Clear all active process and states from the database, optionally limited to a host with internal ID/IP/hostname/servername HOST.
This command is designed to assist in rare error cases whereby stuck process states are present in the database, and should be used sparingly. This will not affect running processes negatively, though rffmpeg will no longer see them as active. It is recommended to run this command only when you are sure that no processes are actually running.
"""
with dbconn(config) as cur:
if host is not None:
try:
host = int(host)
field = "id"
except ValueError:
field = "servername"
fieldAlt = "hostname"
cur.execute(f"SELECT id FROM hosts WHERE {field} = {SQL_VAR_SIGN}", (host,))
entry = cur.fetchall()
if len(entry) < 1:
cur.execute(f"SELECT id FROM hosts WHERE {fieldAlt} = {SQL_VAR_SIGN}", (host,))
entry = cur.fetchall()
if len(entry) < 1:
fail("Host not found!")
if len(entry) > 1:
fail("Multiple hosts found, please be more specific!")
host_id = entry[0][0]
click.echo(f"Clearing all active processes and states for host ID '{host_id}'")
cur.execute(f"SELECT id FROM processes WHERE host_id = {SQL_VAR_SIGN}", (host_id,))
processes = cur.fetchall()
cur.execute(f"SELECT id FROM states WHERE host_id = {SQL_VAR_SIGN}", (host_id,))
states = cur.fetchall()
for process in processes:
cur.execute(f"DELETE FROM processes WHERE id = {SQL_VAR_SIGN}", process)
for state in states:
cur.execute(f"DELETE FROM states WHERE id = {SQL_VAR_SIGN}", state)
else:
click.echo("Clearing all active processes and states")
cur.execute("DELETE FROM processes")
cur.execute("DELETE FROM states")
rffmpeg_click.add_command(rffmpeg_click_log)
return rffmpeg_click(obj={})
# Entrypoint
if __name__ == "__main__":
all_args = sys.argv
cmd_name = all_args[0]
# Load the config
config = load_config()
if "rffmpeg" in cmd_name:
run_control(config)
else:
ffmpeg_args = all_args[1:]
run_ffmpeg(config, ffmpeg_args)