211 lines
6.9 KiB
Python
211 lines
6.9 KiB
Python
import os
|
|
|
|
import aiohttp_jinja2
|
|
from aiohttp.web import Application, AppRunner, GracefulExit, HTTPBadRequest
|
|
from aiohttp.web import HTTPException, HTTPNotFound, Response, TCPSite, get
|
|
from aiohttp.web import json_response, middleware
|
|
from aiohttp_jinja2 import render_template, template
|
|
from jinja2 import ChoiceLoader, FileSystemLoader, PackageLoader
|
|
from loguru import logger
|
|
|
|
from archives import _version
|
|
from archives.filebrowser import FileBrowser
|
|
|
|
# big files:
|
|
# https://gist.github.com/kgantsov/619ddb532ae0da3730007b84587257c5
|
|
|
|
|
|
class AsyncServer:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
|
|
self.host = config["http"]["host"]
|
|
self.port = config["http"]["port"]
|
|
|
|
self.env = config["archives"]["env"]
|
|
self.my_username = config["archives"]["my_username"]
|
|
self.path = config["archives"]["path"]
|
|
self.template_shared_path = config["jinja"]["template_shared_path"]
|
|
|
|
self.browser = FileBrowser(config["archives"])
|
|
self.app = Application(
|
|
middlewares=[self.middleware_errors, self.middleware_auth]
|
|
)
|
|
self.app.add_routes([
|
|
get("/", self.index),
|
|
get("/api/ruok", self.ruok),
|
|
get("/{path:.*}", self.archives),
|
|
])
|
|
if self.env.lower() == "dev":
|
|
# 404's for some reason
|
|
self.app.router.add_static("/static/", "./static", show_index=True)
|
|
|
|
self.app.on_shutdown.append(self._on_shutdown)
|
|
aiohttp_jinja2.setup(
|
|
self.app,
|
|
context_processors=[self.jinja_variable_processor],
|
|
loader=ChoiceLoader(
|
|
[
|
|
FileSystemLoader(self.template_shared_path),
|
|
PackageLoader("archives", "templates"),
|
|
]
|
|
),
|
|
)
|
|
|
|
async def _on_shutdown(self, app):
|
|
logger.info("http server shutdown")
|
|
|
|
@middleware
|
|
async def middleware_auth(self, request, handler):
|
|
remote_user = request.headers.get("Remote-User")
|
|
request["auth"] = {
|
|
"user": remote_user,
|
|
"groups": request.headers.get("Remote-Groups", "").split(","),
|
|
"name": request.headers.get("Remote-Name"),
|
|
"email": request.headers.get("Remote-Email"),
|
|
"is_me": remote_user == self.my_username,
|
|
}
|
|
|
|
_path = request.match_info.get("path", "")
|
|
logger.debug(f"path: '{_path}'")
|
|
if _path.startswith("/") or not self.browser.is_relative(_path):
|
|
logger.error(f"weird path: '{_path}'")
|
|
raise HTTPBadRequest
|
|
|
|
# archives_path has been checked for leading / if used
|
|
# from the request object
|
|
request["archives_path"] = _path
|
|
|
|
response = await handler(request)
|
|
return response
|
|
|
|
@middleware
|
|
async def middleware_errors(self, request, handler):
|
|
try:
|
|
response = await handler(request)
|
|
return response
|
|
except HTTPException as ex:
|
|
error = {"reason": ex.reason, "status": ex.status}
|
|
|
|
if ex.status in range(300, 400):
|
|
raise
|
|
else:
|
|
return await self.error_response(request, error)
|
|
|
|
except Exception as ex:
|
|
logger.exception(ex)
|
|
_error = {"reason": "internal server error", "status": 500}
|
|
return await self.error_response(request, _error)
|
|
|
|
async def error_response(self, request, error):
|
|
is_json = "application/json" in request.headers.getall("ACCEPT", [])
|
|
|
|
if is_json:
|
|
return json_response(error, status=error["status"])
|
|
else:
|
|
error["base_url"] = "https://www.sudo.is/"
|
|
error["static_url"] = "https://static.sudo.is/"
|
|
return render_template(
|
|
"error.html.j2",
|
|
request=request,
|
|
context=error,
|
|
status=error["status"],
|
|
)
|
|
|
|
async def download(self, request):
|
|
file_path = request["archives_path"]
|
|
file_name = os.path.basename(file_path)
|
|
|
|
if os.path.splitext(file_name)[1] == ".json":
|
|
j = self.browser.read_file_json(file_path)
|
|
return json_response(j)
|
|
|
|
sender = self.browser.file_sender(file_path)
|
|
return Response(
|
|
body=sender,
|
|
headers={
|
|
"Content-disposition": f"attachment; filename={file_name}"
|
|
})
|
|
|
|
async def download_zip(self, request):
|
|
dir_path = request['archives_path']
|
|
zip_name = self.browser.zip_name(dir_path)
|
|
|
|
return Response(
|
|
body=self.browser.zip_sender(dir_path),
|
|
headers={
|
|
"Content-disposition": f"attachment; filename={zip_name}"
|
|
})
|
|
|
|
@template("files.html.j2")
|
|
async def archives(self, request):
|
|
# make a chooser
|
|
if not self.browser.exists(request["archives_path"]):
|
|
raise HTTPNotFound
|
|
|
|
if self.browser.isfile(request["archives_path"]):
|
|
return await self.download(request)
|
|
|
|
if 'zip' in request.query:
|
|
return await self.download_zip(request)
|
|
|
|
try:
|
|
sort_by = request.query.get("sort_by", "name")
|
|
items = self.browser.list_dir(
|
|
request["archives_path"], sort_by=sort_by
|
|
)
|
|
d = {"items": items}
|
|
except NotADirectoryError:
|
|
raise HTTPNotFound
|
|
|
|
if "json" in request.query:
|
|
return json_response(d)
|
|
else:
|
|
return d
|
|
|
|
async def ruok(self, request):
|
|
return json_response({"iamok": True})
|
|
|
|
@template("index.html.j2")
|
|
async def index(self, request):
|
|
index_list = ["pub", "youtube-dl", "mirrors"]
|
|
if request["auth"]["is_me"]:
|
|
index_list.append(self.my_username)
|
|
|
|
items = self.browser.list_dir(
|
|
request["archives_path"], filter_list=index_list
|
|
)
|
|
return {"items": items}
|
|
|
|
async def jinja_variable_processor(self, request):
|
|
archives_path = request["archives_path"]
|
|
domain = request.headers.get("host", "")
|
|
if archives_path == "":
|
|
ps1_path = "~"
|
|
else:
|
|
ps1_path = f"/{archives_path}"
|
|
return {
|
|
"version": _version,
|
|
"domain": domain,
|
|
"static_url": "https://static.sudo.is/",
|
|
"base_url": "https://www.sudo.is/", # for jinja base.html.j2
|
|
"username": request["auth"]["user"],
|
|
"ps1_path": ps1_path,
|
|
"archives_path": archives_path,
|
|
}
|
|
|
|
async def serve(self):
|
|
runner = AppRunner(self.app)
|
|
await runner.setup()
|
|
site = TCPSite(runner, self.host, self.port)
|
|
await site.start()
|
|
logger.info(f"listening on http://{self.host}:{self.port}")
|
|
|
|
root_dir_names = ", ".join(
|
|
[a["name"] for a in self.browser.list_dir(".")]
|
|
)
|
|
logger.debug(f"root-level in '{self.path}': {root_dir_names}")
|
|
|
|
def stop_graceful(self):
|
|
raise GracefulExit
|