295 lines
10 KiB
Python
295 lines
10 KiB
Python
import os
|
|
import sys
|
|
import logging
|
|
import asyncio
|
|
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from sqlite3 import connect as sqlite_connect
|
|
|
|
from hcloud import Client
|
|
from hcloud.server_types.domain import ServerType
|
|
from hcloud.images.domain import Image
|
|
from hcloud.placement_groups.domain import PlacementGroup
|
|
from hcloud.locations.domain import Location
|
|
|
|
|
|
log = logging.getLogger("rffmpeg")
|
|
|
|
def setup_logger(config):
|
|
|
|
if os.getenv("DEBUG") == None:
|
|
logging_level = logging.INFO
|
|
else:
|
|
logging_level = logging.DEBUG
|
|
|
|
logging.basicConfig(
|
|
filename=config["log_file"],
|
|
level=logging_level,
|
|
format="%(asctime)s - %(name)s[%(process)s] - %(levelname)s - %(message)s",
|
|
)
|
|
|
|
def fail(msg):
|
|
print(msg)
|
|
exit(1)
|
|
|
|
def setup():
|
|
config = dict()
|
|
|
|
HCLOUD_API_TOKEN = os.getenv("HCLOUD_API_TOKEN")
|
|
if HCLOUD_API_TOKEN == None:
|
|
fail("HCLOUD_API_TOKEN env isn't set")
|
|
|
|
JELLYFIN_LAN_ONLY_IP = os.getenv("JELLYFIN_LAN_ONLY_IP")
|
|
if JELLYFIN_LAN_ONLY_IP == None:
|
|
fail("JELLYFIN_LAN_ONLY_IP env isn't set")
|
|
|
|
MEDIA_USERNAME = os.getenv("MEDIA_USERNAME", "")
|
|
MEDIA_PASSWORD = os.getenv("MEDIA_PASSWORD", "")
|
|
config["cloud_config"] = os.getenv("CLOUD_CONFIG", "#cloud-config\nruncmd:\n- systemctl disable --now ssh.service\n- echo 'JELLYFIN_LAN_ONLY_IP=%s' | tee -a /root/.env\n- echo 'MEDIA_USERNAME=%s' | tee -a /root/.env\n- echo 'MEDIA_PASSWORD=%s' | tee -a /root/.env\n- wget https://raw.githubusercontent.com/aleksasiriski/rffmpeg-worker/main/docker-compose.example.yml -O /root/docker-compose.yml\n- cd /root && docker compose pull && docker compose up -d\n"%(JELLYFIN_LAN_ONLY_IP, MEDIA_USERNAME, MEDIA_PASSWORD))
|
|
|
|
config["state_dir"] = os.getenv("STATE_DIR", "/config")
|
|
config["log_file"] = os.getenv("LOG_FILE", config["state_dir"] + "/log/hcloud-rffmpeg.log")
|
|
config["db_path"] = os.getenv("DB_PATH", config["state_dir"] + "/rffmpeg/rffmpeg.db")
|
|
config["ssh_key"] = os.getenv("SSH_KEY", config["state_dir"] + "/rffmpeg/.ssh/id_ed25519.pub")
|
|
|
|
config["client"] = Client(token=HCLOUD_API_TOKEN)
|
|
config["server_type"] = os.getenv("SERVER_TYPE", "cx21")
|
|
config["image"] = Image(name=os.getenv("IMAGE_TYPE", "docker-ce"))
|
|
config["networks"] = config["client"].networks.get_all(name=os.getenv("NETWORK_NAME", "rffmpeg-workers"))
|
|
config["firewalls"] = config["client"].firewalls.get_all(name=os.getenv("FIREWALL_NAME", "rffmpeg-workers"))
|
|
config["placement_group"] = config["client"].placement_groups.get_by_name(name=os.getenv("PLACEMENT_GROUP_NAME", "rffmpeg-workers"))
|
|
config["location"] = Location(name=os.getenv("LOCATION_NAME", "nbg1"))
|
|
config["jobs_per_worker"] = int(os.getenv("JOBS_PER_WORKER", "2"))
|
|
config["recently_made_worker_bool"] = False
|
|
|
|
SSH_KEY_NAME = os.getenv("SSH_KEY_NAME", "root@jellyfin")
|
|
ssh_key = config["client"].ssh_keys.get_by_name(name=SSH_KEY_NAME)
|
|
try:
|
|
config["client"].ssh_keys.delete(ssh_key)
|
|
log.debug("Found key and removed it.")
|
|
except:
|
|
log.debug("No key found to remove.")
|
|
public_key = ""
|
|
with open(config["ssh_key"], 'r') as ssh_key_file:
|
|
public_key = ssh_key_file.readline()
|
|
ssh_key_file.close()
|
|
config["client"].ssh_keys.create(
|
|
name=SSH_KEY_NAME,
|
|
public_key=public_key
|
|
)
|
|
config["ssh_keys"] = config["client"].ssh_keys.get_all(name=SSH_KEY_NAME)
|
|
|
|
return config
|
|
|
|
@contextmanager
|
|
def dbconn(config):
|
|
conn = sqlite_connect(config["db_path"])
|
|
conn.execute("PRAGMA foreign_keys = 1")
|
|
cur = conn.cursor()
|
|
yield cur
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
async def recently_made_worker_timer(config, delay):
|
|
await asyncio.sleep(delay)
|
|
config["recently_made_worker_bool"] = False
|
|
log.debug("Timer finished, able to make workers again!")
|
|
|
|
async def recently_made_worker(config):
|
|
if config["recently_made_worker_bool"]:
|
|
return True
|
|
else:
|
|
config["recently_made_worker_bool"] = True
|
|
asyncio.create_task(recently_made_worker_timer(config, 180))
|
|
return False
|
|
|
|
async def create_server(config):
|
|
log.info("Creating a server!")
|
|
|
|
if not await recently_made_worker(config):
|
|
log.debug("No recently made servers!")
|
|
|
|
response = config["client"].servers.create(
|
|
server_type=config["server_type"],
|
|
image=config["image"],
|
|
ssh_keys=config["ssh_keys"],
|
|
networks=config["networks"],
|
|
firewalls=config["firewalls"],
|
|
placement_group=config["placement_group"],
|
|
location=config["location"],
|
|
user_data=config["cloud_config"]
|
|
)
|
|
|
|
await asyncio.sleep(120)
|
|
if response.action.status == "error":
|
|
log.error("Error occured while creating the server in HCloud!")
|
|
else:
|
|
log.debug("Successfully created a server in HCloud!")
|
|
|
|
server_name = response.model.name
|
|
server_ip = config["client"].servers.get_by_name(name=server_name).private_net[0].ip
|
|
|
|
with dbconn(config) as cur:
|
|
cur.execute(
|
|
"INSERT INTO hosts (hostname, weight, servername) VALUES (?, ?, ?)",
|
|
(server_ip, 1, server_name),
|
|
)
|
|
|
|
log.info("Added %s with IP %s to database!"%(server_name,server_ip))
|
|
asyncio.create_task(check_unused_worker(config, server_name))
|
|
|
|
else:
|
|
log.debug("Recently made a server!")
|
|
|
|
async def remove_server(config, server_name):
|
|
server = config["client"].servers.get_by_name(name=server_name)
|
|
|
|
with dbconn(config) as cur:
|
|
cur.execute("DELETE FROM hosts WHERE server_name = ?", (server_name,))
|
|
|
|
try:
|
|
config["client"].servers.delete(server)
|
|
log.debug("Found server and removed it.")
|
|
except:
|
|
log.debug("No server found to remove.")
|
|
|
|
async def check_unused_worker(config, server_name):
|
|
log.debug("Started checking if %s is unused, firstly sleeping for 50 minutes"%(server_name))
|
|
|
|
delay_hour = 3000
|
|
delay_ending_hour = 240
|
|
await asyncio.sleep(delay_hour)
|
|
|
|
with dbconn(config) as cur:
|
|
host = cur.execute(
|
|
"SELECT * FROM hosts WHERE server_name = ?", (server_name,)
|
|
).fetchone()
|
|
host_id = host[0]
|
|
|
|
removed = False
|
|
while not removed:
|
|
# how many times to check and sleep for 4 minutes after initial 50 minutes
|
|
for counter in range(2):
|
|
log.debug("Checking if worker %s is unused"%(server_name))
|
|
|
|
with dbconn(config) as cur:
|
|
current_state = cur.execute(
|
|
"SELECT * FROM states WHERE host_id = ?", (host_id,)
|
|
).fetchone()
|
|
|
|
if not current_state:
|
|
current_state = "idle"
|
|
else:
|
|
current_state = current_state[3]
|
|
|
|
if current_state != "active":
|
|
log.info("Worker %s marked as inactive and is being removed."%(server_name))
|
|
await remove_server(config, server_name)
|
|
removed = True
|
|
break
|
|
else:
|
|
log.debug("Worker %s marked as active, sleeping."%(server_name))
|
|
await asyncio.sleep(delay_ending_hour)
|
|
|
|
if not removed:
|
|
await asyncio.sleep(delay_hour)
|
|
|
|
|
|
async def remove_all_processes(config):
|
|
log.info("Removing all processes from database.")
|
|
|
|
with dbconn(config) as cur:
|
|
processes = cur.execute("SELECT * FROM processes").fetchall()
|
|
|
|
if len(processes) < 1:
|
|
log.debug("No processes found.")
|
|
else:
|
|
log.debug("Removing all processes.")
|
|
for process in processes:
|
|
pid, host_id, process_id, cmd = process
|
|
with dbconn(config) as cur:
|
|
cur.execute("DELETE FROM processes WHERE id = ?", (pid,))
|
|
|
|
async def remove_all_workers(config):
|
|
log.info("Removing all workers from database and HCloud.")
|
|
|
|
with dbconn(config) as cur:
|
|
hosts = cur.execute("SELECT * FROM hosts").fetchall()
|
|
|
|
if len(hosts) < 1:
|
|
log.debug("No workers found.")
|
|
else:
|
|
log.debug("Removing all workers.")
|
|
for host in hosts:
|
|
hid, hostname, weight, server_name = host
|
|
log.debug("Removing worker %s."%(server_name))
|
|
await remove_server(config, server_name)
|
|
|
|
|
|
async def check_processes_and_rescale(config):
|
|
while True:
|
|
log.debug("Checking processes and rescaling.")
|
|
|
|
with dbconn(config) as cur:
|
|
hosts = cur.execute("SELECT * FROM hosts").fetchall()
|
|
processes = cur.execute("SELECT * FROM processes").fetchall()
|
|
|
|
if len(hosts) < 1:
|
|
log.debug("No workers found. Checking if there are any transcodes on fallback.")
|
|
transcodes = 0
|
|
for process in processes:
|
|
pid, host_id, process_id, cmd = process
|
|
if "transcode" in cmd:
|
|
transcodes += 1
|
|
|
|
if transcodes > 0:
|
|
log.info("Found transcodes on fallback!")
|
|
asyncio.create_task(create_server(config))
|
|
else:
|
|
log.debug("Workers found. Checking if there are any workers with room.")
|
|
workers_with_room = 0
|
|
for host in hosts:
|
|
transcodes = 0
|
|
hid, hostname, weight, server_name = host
|
|
|
|
for process in processes:
|
|
pid, host_id, process_id, cmd = process
|
|
if host_id == hid and "transcode" in cmd:
|
|
transcodes += 1
|
|
|
|
if transcodes < config["jobs_per_worker"]:
|
|
workers_with_room += 1
|
|
|
|
if workers_with_room == 0:
|
|
log.debug("No workers with room found.")
|
|
asyncio.create_task(create_server(config))
|
|
else:
|
|
log.debug("Workers with room found.")
|
|
|
|
log.debug("Sleeping for 5 minutes until next check.")
|
|
await asyncio.sleep(300)
|
|
|
|
|
|
async def main():
|
|
print("Starting HCloud!")
|
|
|
|
config = setup()
|
|
setup_logger(config)
|
|
|
|
if not Path(config["db_path"]).is_file():
|
|
fail("Failed to find database %s - did you forget to run 'rffmpeg init'?"%(config["db_path"]))
|
|
|
|
await remove_all_processes(config)
|
|
await remove_all_workers(config)
|
|
|
|
await check_processes_and_rescale(config)
|
|
|
|
# Exit, it should never happen
|
|
print("Stopping HCloud!")
|
|
|
|
# Startup
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|