1021 lines
36 KiB
Python
Executable File
1021 lines
36 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# rffmpeg.py - Remote FFMPEG transcoding wrapper
|
|
#
|
|
# Copyright (C) 2019-2022 Joshua M. Boniface <joshua@boniface.me>
|
|
# and Contributors.
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
#
|
|
###############################################################################
|
|
|
|
import click
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import shlex
|
|
import yaml
|
|
|
|
from contextlib import contextmanager
|
|
from grp import getgrnam
|
|
from pathlib import Path
|
|
from pwd import getpwnam
|
|
from re import search
|
|
from sqlite3 import connect as sqlite_connect
|
|
from subprocess import run, PIPE
|
|
from time import sleep
|
|
from datetime import datetime
|
|
|
|
|
|
# Set up the logger
|
|
log = logging.getLogger("rffmpeg")
|
|
|
|
|
|
# Use Postgresql if specified, otherwise use SQLite
|
|
DB_TYPE = "SQLITE"
|
|
SQL_VAR_SIGN = "?"
|
|
SQL_PRIMARY_KEY="INTEGER"
|
|
SQL_DATE_TIME="DATETIME"
|
|
POSTGRES_DB = os.environ.get("RFFMPEG_POSTGRES_DB", "rffmpeg")
|
|
POSTGRES_USER = os.environ.get("RFFMPEG_POSTGRES_USER")
|
|
POSTGRES_PASS = os.environ.get("RFFMPEG_POSTGRES_PASS", "")
|
|
POSTGRES_PORT = os.environ.get("RFFMPEG_POSTGRES_PORT", "5432")
|
|
POSTGRES_HOST = os.environ.get("RFFMPEG_POSTGRES_HOST", "localhost")
|
|
|
|
if POSTGRES_USER != None:
|
|
DB_TYPE = "POSTGRES"
|
|
SQL_VAR_SIGN = "%s"
|
|
SQL_PRIMARY_KEY="SERIAL"
|
|
SQL_DATE_TIME="TIMESTAMP"
|
|
POSTGRES_CREDENTIALS = {
|
|
"dbname": POSTGRES_DB,
|
|
"user": POSTGRES_USER,
|
|
"password": POSTGRES_PASS,
|
|
"port": int(POSTGRES_PORT),
|
|
"host": POSTGRES_HOST,
|
|
}
|
|
from psycopg2 import connect as postgres_connect
|
|
from psycopg2 import DatabaseError as postgres_error
|
|
|
|
|
|
# Open a database connection (context manager)
|
|
@contextmanager
|
|
def dbconn(config, init = False):
|
|
"""
|
|
Open a database connection.
|
|
"""
|
|
if DB_TYPE == "SQLITE":
|
|
if not init and not Path(config["db_path"]).is_file():
|
|
fail(f"Failed to find database '{config['db_path']}' - did you forget to run 'rffmpeg init' or add all env vars for Postgresql?")
|
|
log.debug("Using SQLite as database.")
|
|
conn = sqlite_connect(config["db_path"])
|
|
conn.execute("PRAGMA foreign_keys = 1")
|
|
cur = conn.cursor()
|
|
yield cur
|
|
conn.commit()
|
|
conn.close()
|
|
log.debug("SQLite connection closed.")
|
|
elif DB_TYPE == "POSTGRES":
|
|
conn = None
|
|
try:
|
|
log.debug("Using Postgresql as database. Connecting...")
|
|
conn = postgres_connect(**POSTGRES_CREDENTIALS)
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT version()")
|
|
db_version = cur.fetchone()
|
|
log.debug(f"Connected to Postgresql version {db_version}")
|
|
yield cur
|
|
conn.commit()
|
|
except (Exception, postgres_error) as error:
|
|
print(error)
|
|
log.error(error)
|
|
finally:
|
|
if conn is not None:
|
|
conn.close()
|
|
log.debug("Postgresql connection closed.")
|
|
|
|
|
|
def fail(msg):
|
|
"""
|
|
Output an error message and terminate.
|
|
"""
|
|
log.error(msg)
|
|
exit(1)
|
|
|
|
|
|
def load_config():
|
|
"""
|
|
Parse the YAML configuration file (either /etc/rffmpeg/rffmpeg.yml or specified by the envvar
|
|
RFFMPEG_CONFIG) and return a standard dictionary of configuration values.
|
|
"""
|
|
|
|
default_config_file = "/etc/rffmpeg/rffmpeg.yml"
|
|
config_file = os.environ.get("RFFMPEG_CONFIG", default_config_file)
|
|
|
|
if not Path(config_file).is_file():
|
|
log.info(f"No config found in {config_file}. Using default settings.")
|
|
o_config = {
|
|
"rffmpeg": {"logging": {}, "directories": {}, "remote": {}, "commands": {}}
|
|
}
|
|
else:
|
|
with open(config_file, "r") as cfgfh:
|
|
try:
|
|
o_config = yaml.load(cfgfh, Loader=yaml.SafeLoader)
|
|
except Exception as e:
|
|
fail(f"Failed to parse configuration file: {e}")
|
|
|
|
config = dict()
|
|
|
|
# Parse the base group ("rffmpeg")
|
|
config_base = o_config.get("rffmpeg", dict())
|
|
if not config_base:
|
|
fail("Failed to parse configuration file top level key 'rffmpeg'.")
|
|
|
|
# Parse the logging group ("rffmpeg" -> "logging")
|
|
config_logging = config_base.get("logging", dict())
|
|
if config_logging is None:
|
|
config_logging = dict()
|
|
|
|
# Parse the directories group ("rffmpeg" -> "directories")
|
|
config_directories = config_base.get("directories", dict())
|
|
if config_directories is None:
|
|
config_directories = dict()
|
|
|
|
# Parse the remote group ("rffmpeg" -> "remote")
|
|
config_remote = config_base.get("remote", dict())
|
|
if config_remote is None:
|
|
config_remote = dict()
|
|
|
|
# Parse the commands group ("rffmpeg" -> "commands")
|
|
config_commands = config_base.get("commands", dict())
|
|
if config_commands is None:
|
|
config_commands = dict()
|
|
|
|
# Parse the keys from the logging group
|
|
config["log_to_file"] = config_logging.get("log_to_file", True)
|
|
config["logfile"] = config_logging.get("logfile", "/var/log/jellyfin/rffmpeg.log")
|
|
config["datedlogfiles"] = config_logging.get("datedlogfiles", False)
|
|
if config["datedlogfiles"] is True:
|
|
config["datedlogdir"] = config_logging.get("datedlogdir", "/var/log/jellyfin")
|
|
config["logfile"] = (
|
|
config["datedlogdir"]
|
|
+ "/"
|
|
+ datetime.today().strftime("%Y%m%d")
|
|
+ "_rffmpeg.log"
|
|
)
|
|
config["logdebug"] = config_logging.get("debug", False)
|
|
|
|
# Parse the keys from the state group
|
|
config["state_dir"] = config_directories.get("state", "/var/lib/rffmpeg")
|
|
config["persist_dir"] = config_directories.get("persist", "/run/shm")
|
|
config["dir_owner"] = config_directories.get("owner", "jellyfin")
|
|
config["dir_group"] = config_directories.get("group", "sudo")
|
|
|
|
# Parse the keys from the remote group
|
|
config["remote_user"] = config_remote.get("user", "")
|
|
config["remote_args"] = config_remote.get(
|
|
"args", ["-i", "/var/lib/jellyfin/.ssh/id_rsa"]
|
|
)
|
|
if config["remote_args"] is None:
|
|
config["remote_args"] = []
|
|
config["persist_time"] = config_remote.get("persist", 300)
|
|
|
|
# Parse the keys from the commands group
|
|
config["ssh_command"] = config_commands.get("ssh", "/usr/bin/ssh")
|
|
config["pre_commands"] = config_commands.get("pre", [])
|
|
if config["pre_commands"] is None:
|
|
config["pre_commands"] = []
|
|
config["ffmpeg_command"] = config_commands.get(
|
|
"ffmpeg", "/usr/lib/jellyfin-ffmpeg/ffmpeg"
|
|
)
|
|
config["ffprobe_command"] = config_commands.get(
|
|
"ffprobe", "/usr/lib/jellyfin-ffmpeg/ffprobe"
|
|
)
|
|
config["fallback_ffmpeg_command"] = config_commands.get(
|
|
"fallback_ffmpeg", "/usr/lib/jellyfin-ffmpeg/ffmpeg"
|
|
)
|
|
config["fallback_ffprobe_command"] = config_commands.get(
|
|
"fallback_ffprobe", "/usr/lib/jellyfin-ffmpeg/ffprobe"
|
|
)
|
|
|
|
# Set the database path
|
|
config["db_path"] = config["state_dir"] + "/rffmpeg.db"
|
|
|
|
|
|
# Set a list of special flags that cause different behaviour
|
|
config["special_flags"] = [
|
|
"-version",
|
|
"-encoders",
|
|
"-decoders",
|
|
"-hwaccels",
|
|
"-filters",
|
|
"-h",
|
|
"-muxers",
|
|
"-fp_format",
|
|
] + config_commands.get("special_flags", [])
|
|
|
|
# Set the current PID of this process
|
|
config["current_pid"] = os.getpid()
|
|
|
|
return config
|
|
|
|
|
|
def cleanup(signum="", frame=""):
|
|
"""
|
|
Clean up this processes stored transient data.
|
|
"""
|
|
global config
|
|
|
|
with dbconn(config) as cur:
|
|
cur.execute(
|
|
f"DELETE FROM states WHERE process_id = {SQL_VAR_SIGN}",
|
|
(config["current_pid"],),
|
|
)
|
|
cur.execute(
|
|
f"DELETE FROM processes WHERE process_id = {SQL_VAR_SIGN}",
|
|
(config["current_pid"],),
|
|
)
|
|
|
|
|
|
def generate_ssh_command(config, target_hostname):
|
|
"""
|
|
Generate an SSH command for use.
|
|
"""
|
|
ssh_command = list()
|
|
|
|
# Add SSH component
|
|
ssh_command.append(config["ssh_command"])
|
|
ssh_command.append("-q")
|
|
ssh_command.append("-t")
|
|
|
|
# Set our connection details
|
|
ssh_command.extend(["-o", "ConnectTimeout=1"])
|
|
ssh_command.extend(["-o", "ConnectionAttempts=1"])
|
|
ssh_command.extend(["-o", "StrictHostKeyChecking=no"])
|
|
ssh_command.extend(["-o", "UserKnownHostsFile=/dev/null"])
|
|
|
|
# Use SSH control persistence to keep sessions alive for subsequent commands
|
|
if config["persist_time"] > 0:
|
|
ssh_command.extend(["-o", "ControlMaster=auto"])
|
|
ssh_command.extend(
|
|
["-o", f"ControlPath={config['persist_dir']}/ssh-%r@%h:%p"]
|
|
)
|
|
ssh_command.extend(["-o", f"ControlPersist={config['persist_time']}"])
|
|
|
|
# Add the remote config args
|
|
for arg in config["remote_args"]:
|
|
if arg:
|
|
ssh_command.append(arg)
|
|
|
|
# Add user+host string
|
|
ssh_command.append(f"{config['remote_user']}@{target_hostname}")
|
|
|
|
return ssh_command
|
|
|
|
|
|
def run_command(command, stdin, stdout, stderr):
|
|
"""
|
|
Execute the command using subprocess.
|
|
"""
|
|
p = run(
|
|
command,
|
|
shell=False,
|
|
bufsize=0,
|
|
universal_newlines=True,
|
|
stdin=stdin,
|
|
stdout=stdout,
|
|
stderr=stderr,
|
|
)
|
|
return p
|
|
|
|
|
|
def get_target_host(config):
|
|
"""
|
|
Determine an optimal target host via data on currently active processes and states.
|
|
"""
|
|
log.debug("Determining optimal target host")
|
|
# Select all hosts and active processes from the database
|
|
with dbconn(config) as cur:
|
|
cur.execute("SELECT * FROM hosts")
|
|
hosts = cur.fetchall()
|
|
cur.execute("SELECT * FROM processes")
|
|
processes = cur.fetchall()
|
|
|
|
# Generate a mapping dictionary of hosts and processes
|
|
host_mappings = dict()
|
|
for host in hosts:
|
|
hid, servername, hostname, weight, created = host
|
|
|
|
# Get the latest state
|
|
with dbconn(config) as cur:
|
|
cur.execute(
|
|
f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC",
|
|
(hid,),
|
|
)
|
|
current_state = cur.fetchone()
|
|
|
|
if not current_state:
|
|
current_state = "idle"
|
|
marking_pid = "N/A"
|
|
else:
|
|
sid, host_id, process_id, state = current_state
|
|
current_state = state
|
|
marking_pid = process_id
|
|
|
|
# Create the mappings entry
|
|
host_mappings[hid] = {
|
|
"hostname": hostname,
|
|
"weight": weight,
|
|
"servername": servername,
|
|
"current_state": current_state,
|
|
"marking_pid": marking_pid,
|
|
"commands": [proc[2] for proc in processes if proc[1] == hid],
|
|
}
|
|
|
|
lowest_count = 9999
|
|
target_hid = None
|
|
target_hostname = None
|
|
target_servername = None
|
|
# For each host in the mapping, let's determine if it is suitable
|
|
for hid, host in host_mappings.items():
|
|
log.debug(f"Trying host ID {hid} '{host['hostname']}'")
|
|
# If it's marked as bad, continue
|
|
if host["current_state"] == "bad":
|
|
log.debug(f"Host previously marked bad by PID {host['marking_pid']}")
|
|
continue
|
|
|
|
# Try to connect to the host and run a very quick command to determine if it is workable
|
|
if host["hostname"] not in ["localhost", "127.0.0.1"]:
|
|
log.debug("Running SSH test")
|
|
test_ssh_command = generate_ssh_command(config, host["hostname"])
|
|
test_ssh_command.remove("-q")
|
|
test_ssh_command = [arg.replace('@', '', 1) if arg.startswith('@') else arg for arg in test_ssh_command]
|
|
test_ffmpeg_command = [config["ffmpeg_command"], "-version"]
|
|
ret = run_command(test_ssh_command + test_ffmpeg_command, PIPE, PIPE, PIPE)
|
|
if ret.returncode != 0:
|
|
# Mark the host as bad
|
|
log.warning(f"Marking host {host['hostname']} ({host['servername']}) as bad due to retcode {ret.returncode}")
|
|
log.debug(f"SSH test command was: {' '.join(test_ssh_command + test_ffmpeg_command)}")
|
|
log.debug(f"SSH test command stdout: {ret.stdout}")
|
|
log.debug(f"SSH test command stderr: {ret.stderr}")
|
|
with dbconn(config) as cur:
|
|
cur.execute(
|
|
f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
|
|
(hid, config["current_pid"], "bad"),
|
|
)
|
|
continue
|
|
log.debug(f"SSH test succeeded with retcode {ret.returncode}")
|
|
|
|
# If the host state is idle, we can use it immediately
|
|
if host["current_state"] == "idle":
|
|
target_hid = hid
|
|
target_hostname = host["hostname"]
|
|
target_servername = host["servername"]
|
|
log.debug("Selecting host as idle")
|
|
break
|
|
|
|
# Get the modified count of the host
|
|
raw_proc_count = len(host["commands"])
|
|
weighted_proc_count = raw_proc_count // host["weight"]
|
|
|
|
# If this host is currently the least used, provisionally set it as the target
|
|
if weighted_proc_count < lowest_count:
|
|
lowest_count = weighted_proc_count
|
|
target_hid = hid
|
|
target_hostname = host["hostname"]
|
|
target_servername = host["servername"]
|
|
log.debug(f"Selecting host as current lowest proc count (raw count: {raw_proc_count}, weighted count: {weighted_proc_count})")
|
|
|
|
log.debug(f"Found optimal host ID {target_hid} '{target_hostname}' ({target_servername})")
|
|
return target_hid, target_hostname, target_servername
|
|
|
|
|
|
def run_local_command(config, command, command_args, stderr_as_stdout = False, mapped_cmd = None):
|
|
"""
|
|
Run command locally, either because "localhost" is the target host, or because no good target
|
|
host was found by get_target_host().
|
|
"""
|
|
rffmpeg_command = [mapped_cmd or command]
|
|
|
|
# Prepare our default stdin/stdout/stderr
|
|
stdin = sys.stdin
|
|
stderr = sys.stderr
|
|
|
|
if stderr_as_stdout:
|
|
stdout = sys.stderr
|
|
else:
|
|
stdout = sys.stdout
|
|
|
|
# Append all the passed arguments directly
|
|
for arg in command_args:
|
|
rffmpeg_command.append(f"{arg}")
|
|
|
|
log.info("Running command on host 'localhost'")
|
|
log.debug(f"Local command: {' '.join(rffmpeg_command)}")
|
|
|
|
with dbconn(config) as cur:
|
|
cur.execute(
|
|
f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
|
|
(0, config["current_pid"], command + ' ' + ' '.join(command_args)),
|
|
)
|
|
cur.execute(
|
|
f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
|
|
(0, config["current_pid"], "active"),
|
|
)
|
|
|
|
return run_command(rffmpeg_command, stdin, stdout, stderr)
|
|
|
|
|
|
def run_local_ffmpeg(config, ffmpeg_args):
|
|
"""
|
|
Run ffmpeg locally, either because "localhost" is the target host, or because no good target
|
|
host was found by get_target_host().
|
|
"""
|
|
if "ffprobe" in cmd_name:
|
|
return run_local_command(config, cmd_name, ffmpeg_args, mapped_cmd=config["fallback_ffprobe_command"])
|
|
else:
|
|
return run_local_command(config, cmd_name, ffmpeg_args, stderr_as_stdout=not any(item in config["special_flags"] for item in ffmpeg_args), mapped_cmd=config["fallback_ffmpeg_command"])
|
|
|
|
|
|
def run_remote_command(
|
|
config, target_hid, target_hostname, target_servername, command, command_args, stderr_as_stdout = False, mapped_cmd = None, pre_commands = []
|
|
):
|
|
"""
|
|
Run command against the remote target_hostname.
|
|
"""
|
|
rffmpeg_ssh_command = generate_ssh_command(config, target_hostname)
|
|
rffmpeg_ssh_command = [arg.replace('@', '', 1) if arg.startswith('@') else arg for arg in rffmpeg_ssh_command]
|
|
|
|
rffmpeg_command = list()
|
|
|
|
# Add any pre commands
|
|
for cmd in pre_commands:
|
|
if cmd:
|
|
rffmpeg_command.append(cmd)
|
|
|
|
rffmpeg_command.append(mapped_cmd or command)
|
|
|
|
# Prepare our default stdin/stderr
|
|
stdin = sys.stdin
|
|
stderr = sys.stderr
|
|
|
|
if stderr_as_stdout:
|
|
stdout = sys.stderr
|
|
else:
|
|
stdout = sys.stdout
|
|
|
|
rffmpeg_command.extend(map(shlex.quote, command_args))
|
|
|
|
log.info(f"Running command on host '{target_hostname}' ({target_servername})")
|
|
log.debug(f"Remote command: {' '.join(rffmpeg_ssh_command + rffmpeg_command)}")
|
|
|
|
with dbconn(config) as cur:
|
|
cur.execute(
|
|
f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
|
|
(target_hid, config["current_pid"], command + ' ' + ' '.join(command_args)),
|
|
)
|
|
cur.execute(
|
|
f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
|
|
(target_hid, config["current_pid"], "active"),
|
|
)
|
|
|
|
return run_command(
|
|
rffmpeg_ssh_command + rffmpeg_command, stdin, stdout, stderr
|
|
)
|
|
|
|
|
|
def run_remote_ffmpeg(
|
|
config, target_hid, target_hostname, target_servername, ffmpeg_args
|
|
):
|
|
"""
|
|
Run ffmpeg against the remote target_hostname.
|
|
"""
|
|
if "ffprobe" in cmd_name:
|
|
# If we're in ffprobe mode use that command and sys.stdout as stdout
|
|
return run_remote_command(config, target_hid, target_hostname, target_servername, cmd_name, ffmpeg_args, mapped_cmd=config["ffprobe_command"], pre_commands=config["pre_commands"])
|
|
else:
|
|
# Otherwise, we use stderr as stdout
|
|
return run_remote_command(config, target_hid, target_hostname, target_servername, cmd_name, ffmpeg_args, stderr_as_stdout=not any(item in config["special_flags"] for item in ffmpeg_args), mapped_cmd=config["ffmpeg_command"], pre_commands=config["pre_commands"])
|
|
|
|
|
|
def setup_logging(config):
|
|
"""
|
|
Set up logging.
|
|
"""
|
|
if config["logdebug"] is True:
|
|
logging_level = logging.DEBUG
|
|
else:
|
|
logging_level = logging.INFO
|
|
|
|
if config["log_to_file"]:
|
|
logging.basicConfig(
|
|
filename=config["logfile"],
|
|
level=logging_level,
|
|
format="%(asctime)s - %(name)s[%(process)s] - %(levelname)s - %(message)s",
|
|
)
|
|
else:
|
|
logging.basicConfig(
|
|
level=logging_level,
|
|
format="%(asctime)s - %(name)s[%(process)s] - %(levelname)s - %(message)s",
|
|
)
|
|
|
|
|
|
def hook_signals():
|
|
signal.signal(signal.SIGTERM, cleanup)
|
|
signal.signal(signal.SIGINT, cleanup)
|
|
signal.signal(signal.SIGQUIT, cleanup)
|
|
signal.signal(signal.SIGHUP, cleanup)
|
|
|
|
|
|
def run_ffmpeg(config, ffmpeg_args):
|
|
"""
|
|
Entrypoint for an ffmpeg/ffprobe aliased process.
|
|
"""
|
|
hook_signals()
|
|
|
|
setup_logging(config)
|
|
|
|
log.info(f"Starting rffmpeg as {cmd_name} with args: {' '.join(ffmpeg_args)}")
|
|
|
|
target_hid, target_hostname, target_servername = get_target_host(config)
|
|
|
|
if not target_hostname or target_hostname == "localhost":
|
|
ret = run_local_ffmpeg(config, ffmpeg_args)
|
|
else:
|
|
ret = run_remote_ffmpeg(
|
|
config, target_hid, target_hostname, target_servername, ffmpeg_args
|
|
)
|
|
|
|
cleanup()
|
|
if ret.returncode == 0:
|
|
log.info(f"Finished rffmpeg with return code {ret.returncode}")
|
|
else:
|
|
log.error(f"Finished rffmpeg with return code {ret.returncode}")
|
|
exit(ret.returncode)
|
|
|
|
|
|
def run_control(config):
|
|
"""
|
|
Entrypoint for the Click CLI for managing the rffmpeg system.
|
|
"""
|
|
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"], max_content_width=120)
|
|
|
|
@click.group(context_settings=CONTEXT_SETTINGS)
|
|
def rffmpeg_click():
|
|
"""
|
|
rffmpeg CLI interface
|
|
"""
|
|
if (
|
|
not Path(config["state_dir"]).is_dir()
|
|
or not Path(config["db_path"]).is_file()
|
|
):
|
|
return
|
|
|
|
# List all DB migrations here
|
|
did_alter_0001AddServername = False
|
|
# Check conditions for migrations
|
|
with dbconn(config) as cur:
|
|
# Migration for new servername (PR #36)
|
|
cur.execute(
|
|
"SELECT COUNT(*) AS CNTREC FROM pragma_table_info('hosts') WHERE name='servername'"
|
|
)
|
|
if cur.fetchone()[0] == 0:
|
|
cur.execute(
|
|
"ALTER TABLE hosts ADD servername TEXT NOT NULL DEFAULT 'invalid'"
|
|
)
|
|
did_alter_0001AddServername = True
|
|
# Migration for new servername (PR #36)
|
|
if did_alter_0001AddServername:
|
|
with dbconn(config) as cur:
|
|
cur.execute("SELECT * FROM hosts")
|
|
for host in cur.fetchall():
|
|
hid, servername, hostname, weight, created = host
|
|
if servername == "invalid":
|
|
cur.execute(
|
|
f"UPDATE hosts SET servername = {SQL_VAR_SIGN} WHERE hostname = {SQL_VAR_SIGN}",
|
|
(hostname, hostname),
|
|
)
|
|
|
|
@click.command(name="init", short_help="Initialize the system.")
|
|
@click.option(
|
|
"-y",
|
|
"--yes",
|
|
"confirm_flag",
|
|
is_flag=True,
|
|
default=False,
|
|
help="Confirm initialization.",
|
|
)
|
|
def rffmpeg_click_init(confirm_flag):
|
|
"""
|
|
Initialize the rffmpeg system and database; this will erase all hosts and current state.
|
|
|
|
This command should be run as "sudo" before any attempts to use rffmpeg.
|
|
"""
|
|
if os.getuid() != 0:
|
|
click.echo("Error: This command requires root privileges.")
|
|
exit(1)
|
|
|
|
if not confirm_flag:
|
|
try:
|
|
click.confirm(
|
|
"Are you sure you want to (re)initalize the database",
|
|
prompt_suffix="? ",
|
|
abort=True,
|
|
)
|
|
except Exception:
|
|
fail("Aborting due to failed confirmation.")
|
|
|
|
click.echo("Initializing database")
|
|
|
|
if not Path(config["state_dir"]).is_dir():
|
|
try:
|
|
os.makedirs(config["state_dir"])
|
|
except OSError as e:
|
|
fail(f"Failed to create state directory '{config['state_dir']}': {e}")
|
|
|
|
try:
|
|
with dbconn(config, True) as cur:
|
|
cur.execute("DROP TABLE IF EXISTS hosts")
|
|
cur.execute("DROP TABLE IF EXISTS processes")
|
|
cur.execute("DROP TABLE IF EXISTS states")
|
|
cur.execute(f"CREATE TABLE hosts (id {SQL_PRIMARY_KEY} PRIMARY KEY, servername TEXT NOT NULL UNIQUE, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, created {SQL_DATE_TIME} NOT NULL)")
|
|
cur.execute(f"CREATE TABLE processes (id {SQL_PRIMARY_KEY} PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)")
|
|
cur.execute(f"CREATE TABLE states (id {SQL_PRIMARY_KEY} PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)")
|
|
except Exception as e:
|
|
fail(f"Failed to create database: {e}")
|
|
|
|
os.chown(
|
|
config["state_dir"],
|
|
getpwnam(config["dir_owner"]).pw_uid,
|
|
getgrnam(config["dir_group"]).gr_gid,
|
|
)
|
|
os.chmod(config["state_dir"], 0o770)
|
|
if DB_TYPE == "SQLITE":
|
|
os.chown(
|
|
config["db_path"],
|
|
getpwnam(config["dir_owner"]).pw_uid,
|
|
getgrnam(config["dir_group"]).gr_gid,
|
|
)
|
|
os.chmod(config["db_path"], 0o660)
|
|
|
|
rffmpeg_click.add_command(rffmpeg_click_init)
|
|
|
|
@click.command(name="status", short_help="Show hosts and status.")
|
|
def rffmpeg_click_status():
|
|
"""
|
|
Show the current status of all rffmpeg target hosts and active processes.
|
|
"""
|
|
with dbconn(config) as cur:
|
|
cur.execute("SELECT * FROM hosts")
|
|
hosts = cur.fetchall()
|
|
cur.execute("SELECT * FROM processes")
|
|
processes = cur.fetchall()
|
|
cur.execute("SELECT * FROM states")
|
|
states = cur.fetchall()
|
|
|
|
# Determine if there are any fallback processes running
|
|
fallback_processes = list()
|
|
for process in processes:
|
|
pid, host_id, process_id, cmd = process
|
|
if host_id == 0:
|
|
fallback_processes.append(process)
|
|
|
|
# Generate a mapping dictionary of hosts and processes
|
|
host_mappings = dict()
|
|
|
|
if len(fallback_processes) > 0:
|
|
host_mappings[0] = {
|
|
"hostname": "localhost (fallback)",
|
|
"servername": "localhost (fallback)",
|
|
"weight": 0,
|
|
"current_state": "fallback",
|
|
"commands": fallback_processes,
|
|
}
|
|
|
|
for host in hosts:
|
|
hid, servername, hostname, weight, created = host
|
|
|
|
# Get the latest state
|
|
with dbconn(config) as cur:
|
|
cur.execute(
|
|
f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC",
|
|
(hid,),
|
|
)
|
|
current_state = cur.fetchone()
|
|
|
|
if not current_state:
|
|
current_state = "idle"
|
|
else:
|
|
sid, host_id, process_id, state = current_state
|
|
current_state = state
|
|
|
|
# Create the mappings entry
|
|
host_mappings[hid] = {
|
|
"hostname": hostname,
|
|
"servername": servername,
|
|
"weight": weight,
|
|
"current_state": current_state,
|
|
"commands": [proc for proc in processes if proc[1] == hid],
|
|
}
|
|
|
|
hostname_length = 9
|
|
servername_length = 11
|
|
hid_length = 3
|
|
weight_length = 7
|
|
state_length = 6
|
|
for hid, host in host_mappings.items():
|
|
if len(host["hostname"]) + 1 > hostname_length:
|
|
hostname_length = len(host["hostname"]) + 1
|
|
if len(host["servername"]) + 1 > servername_length:
|
|
servername_length = len(host["servername"]) + 1
|
|
if len(str(hid)) + 1 > hid_length:
|
|
hid_length = len(str(hid)) + 1
|
|
if len(str(host["weight"])) + 1 > weight_length:
|
|
weight_length = len(str(host["weight"])) + 1
|
|
if len(host["current_state"]) + 1 > state_length:
|
|
state_length = len(host["current_state"]) + 1
|
|
|
|
output = list()
|
|
output.append(
|
|
"{bold}{hostname: <{hostname_length}} {servername: <{servername_length}} {hid: <{hid_length}} {weight: <{weight_length}} {state: <{state_length}} {commands}{end_bold}".format(
|
|
bold="\033[1m",
|
|
end_bold="\033[0m",
|
|
hostname="Hostname",
|
|
hostname_length=hostname_length,
|
|
servername="Servername",
|
|
servername_length=servername_length,
|
|
hid="ID",
|
|
hid_length=hid_length,
|
|
weight="Weight",
|
|
weight_length=weight_length,
|
|
state="State",
|
|
state_length=state_length,
|
|
commands="Active Commands",
|
|
)
|
|
)
|
|
|
|
for hid, host in host_mappings.items():
|
|
if len(host["commands"]) < 1:
|
|
first_command = "N/A"
|
|
else:
|
|
first_command = f"PID {host['commands'][0][2]}: {host['commands'][0][3]}"
|
|
|
|
host_entry = list()
|
|
host_entry.append(
|
|
"{hostname: <{hostname_length}} {servername: <{servername_length}} {hid: <{hid_length}} {weight: <{weight_length}} {state: <{state_length}} {commands}".format(
|
|
hostname=host["hostname"],
|
|
hostname_length=hostname_length,
|
|
servername=host["servername"],
|
|
servername_length=servername_length,
|
|
hid=hid,
|
|
hid_length=hid_length,
|
|
weight=host["weight"],
|
|
weight_length=weight_length,
|
|
state=host["current_state"],
|
|
state_length=state_length,
|
|
commands=first_command,
|
|
)
|
|
)
|
|
|
|
for idx, command in enumerate(host["commands"]):
|
|
if idx == 0:
|
|
continue
|
|
host_entry.append(
|
|
"{hostname: <{hostname_length}} {servername: <{servername_length}} {hid: <{hid_length}} {weight: <{weight_length}} {state: <{state_length}} {commands}".format(
|
|
hostname="",
|
|
hostname_length=hostname_length,
|
|
servername="",
|
|
servername_length=servername_length,
|
|
hid="",
|
|
hid_length=hid_length,
|
|
weight="",
|
|
weight_length=weight_length,
|
|
state="",
|
|
state_length=state_length,
|
|
commands=f"PID {command[2]}: {command[3]}",
|
|
)
|
|
)
|
|
|
|
output.append("\n".join(host_entry))
|
|
|
|
click.echo("\n".join(output))
|
|
|
|
rffmpeg_click.add_command(rffmpeg_click_status)
|
|
|
|
@click.command(name="add", short_help="Add a host.")
|
|
@click.option(
|
|
"-w",
|
|
"--weight",
|
|
"weight",
|
|
required=False,
|
|
default=1,
|
|
help="The weight of the host.",
|
|
)
|
|
@click.option(
|
|
"-n",
|
|
"--name",
|
|
"name",
|
|
required=False,
|
|
default=None,
|
|
help="The name of the host (if different from the HOST).",
|
|
)
|
|
@click.argument("host")
|
|
def rffmpeg_click_add(weight, name, host):
|
|
"""
|
|
Add a new host with IP or hostname HOST to the database.
|
|
"""
|
|
created = datetime.now()
|
|
if name is None:
|
|
name = host
|
|
click.echo(f"Adding new host '{host}' ({name})")
|
|
with dbconn(config) as cur:
|
|
cur.execute(
|
|
f"INSERT INTO hosts (servername, hostname, weight, created) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
|
|
(name, host, weight, created),
|
|
)
|
|
|
|
rffmpeg_click.add_command(rffmpeg_click_add)
|
|
|
|
@click.command(name="remove", short_help="Remove a host.")
|
|
@click.argument("host")
|
|
def rffmpeg_click_remove(host):
|
|
"""
|
|
Remove a host with internal ID/IP/hostname/servername HOST from the database.
|
|
"""
|
|
try:
|
|
host = int(host)
|
|
field = "id"
|
|
except ValueError:
|
|
field = "servername"
|
|
fieldAlt = "hostname"
|
|
|
|
with dbconn(config) as cur:
|
|
cur.execute(f"SELECT * FROM hosts WHERE {field} = {SQL_VAR_SIGN}", (host,))
|
|
entry = cur.fetchall()
|
|
if len(entry) < 1:
|
|
cur.execute(
|
|
f"SELECT * FROM hosts WHERE {fieldAlt} = {SQL_VAR_SIGN}", (host,)
|
|
)
|
|
entry = cur.fetchall()
|
|
if len(entry) < 1:
|
|
fail("No hosts found to delete!")
|
|
|
|
if len(entry) == 1:
|
|
click.echo(f"Removing {len(entry)} host:")
|
|
else:
|
|
click.echo(f"Removing {len(entry)} hosts:")
|
|
for host in entry:
|
|
hid, servername, hostname, weight, created = host
|
|
click.echo(f"\tID: {hid}\tHostname: {hostname}\tServername: {servername}")
|
|
cur.execute(f"DELETE FROM hosts WHERE id = {SQL_VAR_SIGN}", (hid,))
|
|
|
|
rffmpeg_click.add_command(rffmpeg_click_remove)
|
|
|
|
@click.command(name="run", short_help="Run a command.", context_settings={
|
|
"ignore_unknown_options": True
|
|
})
|
|
@click.option("--stderr-as-stdout", "stderr_as_stdout", is_flag=True, default=False, help="Use stderr as stdout for the command.")
|
|
@click.argument('full_command', nargs=-1, type=click.UNPROCESSED)
|
|
def rffmpeg_click_run(stderr_as_stdout, full_command):
|
|
"""
|
|
Run a command on the optimal host.
|
|
"""
|
|
hook_signals()
|
|
|
|
setup_logging(config)
|
|
|
|
command = full_command[0]
|
|
command_args = full_command[1:]
|
|
|
|
log.info(f"Starting rffmpeg as {command} with args: {' '.join(command_args)}")
|
|
|
|
target_hid, target_hostname, target_servername = get_target_host(config)
|
|
|
|
if not target_hostname or target_hostname == "localhost":
|
|
ret = run_local_command(config, command, command_args)
|
|
else:
|
|
ret = run_remote_command(
|
|
config,
|
|
target_hid,
|
|
target_hostname,
|
|
target_servername,
|
|
command,
|
|
command_args,
|
|
stderr_as_stdout=stderr_as_stdout
|
|
)
|
|
|
|
cleanup()
|
|
if ret.returncode == 0:
|
|
log.info(f"Finished rffmpeg with return code {ret.returncode}")
|
|
else:
|
|
log.error(f"Finished rffmpeg with return code {ret.returncode}")
|
|
exit(ret.returncode)
|
|
|
|
rffmpeg_click.add_command(rffmpeg_click_run)
|
|
|
|
@click.command(name="log", short_help="View the rffmpeg log.")
|
|
@click.option(
|
|
"-f",
|
|
"--follow",
|
|
"follow_flag",
|
|
is_flag=True,
|
|
default=False,
|
|
help="Follow new log entries instead of seeing current log entries.",
|
|
)
|
|
def rffmpeg_click_log(follow_flag):
|
|
"""
|
|
View the rffmpeg log file.
|
|
"""
|
|
if follow_flag:
|
|
with open(config["logfile"]) as file_:
|
|
# Go to the end of file
|
|
file_.seek(0, 2)
|
|
while True:
|
|
curr_position = file_.tell()
|
|
line = file_.readline()
|
|
if not line:
|
|
file_.seek(curr_position)
|
|
sleep(0.1)
|
|
else:
|
|
click.echo(line, nl=False)
|
|
else:
|
|
with open(config["logfile"], "r") as logfh:
|
|
click.echo_via_pager(logfh.readlines())
|
|
|
|
rffmpeg_click.add_command(rffmpeg_click_log)
|
|
|
|
@click.command(name="clear", short_help="Clear processes and states.")
|
|
@click.argument("host", required=False, default=None)
|
|
def rffmpeg_click_log(host):
|
|
"""
|
|
Clear all active process and states from the database, optionally limited to a host with internal ID/IP/hostname/servername HOST.
|
|
|
|
This command is designed to assist in rare error cases whereby stuck process states are present in the database, and should be used sparingly. This will not affect running processes negatively, though rffmpeg will no longer see them as active. It is recommended to run this command only when you are sure that no processes are actually running.
|
|
"""
|
|
with dbconn(config) as cur:
|
|
if host is not None:
|
|
try:
|
|
host = int(host)
|
|
field = "id"
|
|
except ValueError:
|
|
field = "servername"
|
|
fieldAlt = "hostname"
|
|
|
|
cur.execute(f"SELECT id FROM hosts WHERE {field} = {SQL_VAR_SIGN}", (host,))
|
|
entry = cur.fetchall()
|
|
if len(entry) < 1:
|
|
cur.execute(f"SELECT id FROM hosts WHERE {fieldAlt} = {SQL_VAR_SIGN}", (host,))
|
|
entry = cur.fetchall()
|
|
if len(entry) < 1:
|
|
fail("Host not found!")
|
|
if len(entry) > 1:
|
|
fail("Multiple hosts found, please be more specific!")
|
|
host_id = entry[0][0]
|
|
|
|
click.echo(f"Clearing all active processes and states for host ID '{host_id}'")
|
|
cur.execute(f"SELECT id FROM processes WHERE host_id = {SQL_VAR_SIGN}", (host_id,))
|
|
processes = cur.fetchall()
|
|
cur.execute(f"SELECT id FROM states WHERE host_id = {SQL_VAR_SIGN}", (host_id,))
|
|
states = cur.fetchall()
|
|
|
|
for process in processes:
|
|
cur.execute(f"DELETE FROM processes WHERE id = {SQL_VAR_SIGN}", process)
|
|
for state in states:
|
|
cur.execute(f"DELETE FROM states WHERE id = {SQL_VAR_SIGN}", state)
|
|
else:
|
|
click.echo("Clearing all active processes and states")
|
|
cur.execute("DELETE FROM processes")
|
|
cur.execute("DELETE FROM states")
|
|
|
|
rffmpeg_click.add_command(rffmpeg_click_log)
|
|
|
|
return rffmpeg_click(obj={})
|
|
|
|
|
|
# Entrypoint
|
|
if __name__ == "__main__":
|
|
all_args = sys.argv
|
|
cmd_name = all_args[0]
|
|
|
|
# Load the config
|
|
config = load_config()
|
|
|
|
if "rffmpeg" in cmd_name:
|
|
run_control(config)
|
|
|
|
else:
|
|
ffmpeg_args = all_args[1:]
|
|
run_ffmpeg(config, ffmpeg_args)
|