infra/roles/jellyfin/templates/jellyfin_auth.py.j2

121 lines
2.8 KiB
Django/Jinja

#!/usr/bin/python3 -u
import time
import subprocess
import select
import json
import sys
from dataclasses import dataclass, asdict
from loguru import logger
logger.remove()
logger.add('/var/log/jellyfin_auth.log')
LOGFILE="/var/log/nginx/access_{{ jellyfin_url }}.log"
STATE="/etc/nginx/jellyfin/ipset.json"
ACL="/etc/nginx/jellyfin/acl.conf"
ACL_TEMPLATE="""
if ($remote_addr = "__ALLOW_IP__") {
set $auth_type off;
}
"""
# another way of doing this:
# https://serverfault.com/questions/743414/how-can-i-check-if-remote-addr-ip-is-not-in-cidr-range-in-nginx
def logmsg(msg):
print(msg)
logger.info(msg)
@dataclass
class IpState(object):
ipset: set
filename: str
def __post_init__(self):
if isinstance(self.ipset, list):
self.ipset = set(self.ipset)
@classmethod
def read(cls, filename):
try:
with open(filename, 'r') as f:
j = json.load(f)
return cls(
ipset=set(j['ipset']),
filename=filename)
except FileNotFoundError:
return cls(
ipset=set(),
filename=filename)
def add(self, new_ip):
self.ipset.add(new_ip)
self.save()
def save(self):
d = {'ipset': list(self.ipset)}
with open(self.filename, 'w') as f:
f.write(json.dumps(d, indent=2))
def write_acl(aclfile, ipset):
with open(aclfile, 'w') as f:
for ip in ipset:
allowip = ACL_TEMPLATE.replace("__ALLOW_IP__", ip)
f.write(allowip)
f.write('\n')
def reload_nginx():
reloading = subprocess.run(["service", "nginx", "reload"])
# raise exception (and crash) if non-zero
reloading.check_returncode()
logmsg('nginx reloaded')
def tail(filename):
logmsg(f"tailing {filename}")
f = subprocess.Popen(
['tail','-F', filename],
stdout=subprocess.PIPE,stderr=subprocess.PIPE
)
p = select.poll()
p.register(f.stdout)
while True:
try:
if p.poll(10 * 1000):
yield json.loads(f.stdout.readline())
except KeyboardInterrupt:
raise SystemExit
@logger.catch
def main():
state = IpState.read(STATE)
write_acl(ACL, state.ipset)
#for ip in state.ipset:
# logmsg(f"allow listed known ip {ip}")
logmsg(f"wrote {len(state.ipset)} rules to acl")
reload_nginx()
for line in tail(LOGFILE):
ip = line['remote_addr']
status = line['status']
if ip not in state.ipset and status in ['204', '200']:
ua = line['http_user_agent']
req = line['request']
user = line['remote_user']
logmsg(f"allow listing {ip} - {req} - {ua} - {user}")
state.add(ip)
write_acl(ACL, state.ipset)
reload_nginx()
if __name__ == "__main__":
main()