archives/archives/filebrowser.py

145 lines
4.3 KiB
Python

import json
import os
from datetime import datetime
from pathlib import Path
from urllib.parse import quote, urljoin
import aiofiles
import humanize
from aiohttp.web import HTTPBadRequest, HTTPNotFound
from loguru import logger
from zipstream import AioZipStream
from archives.ducache import DuCache
class FileBrowser:
def __init__(self, archives_config):
self.config = archives_config
self.path = self.config["path"]
self.du_file = self.config["du_cache_file"]
self.du_cache = DuCache(self.du_file, self.path)
def read_file_json(self, rel_path):
# for small files
with open(self.full_path(rel_path), 'r') as f:
s = f.read()
return json.loads(s)
def zip_name(self, rel_path):
p = Path(rel_path)
return f"{p.stem}.zip"
def full_path(self, rel_path):
return os.path.join(self.path, rel_path)
def is_relative(self, rel_path):
local_path = self.full_path(rel_path)
return Path(local_path).is_relative_to(self.path)
def isdir(self, rel_path):
return os.path.isdir(self.full_path(rel_path))
def isfile(self, rel_path):
return os.path.isfile(self.full_path(rel_path))
def exists(self, rel_path):
local_path = self.full_path(rel_path)
e = os.path.exists(local_path)
if not e:
logger.error(f"file not found: '{local_path}'")
return e
def human_size(self, size, binary):
if size is None:
return None
else:
return humanize.naturalsize(size, binary=binary)
async def file_sender(self, rel_path):
# has been checked if its from the request object
if rel_path.startswith("/"):
raise HTTPBadRequest
local_path = self.full_path(rel_path)
async with aiofiles.open(local_path, "rb") as f:
while chunk := await f.read(2**16):
yield chunk
def files_to_zip(self, local_path):
for f in Path(local_path).glob("**/*"):
if self.isfile(f):
item = {
'file': str(f),
'name': str(f.relative_to(local_path)),
'compression': 'deflate',
'is_dir': self.isdir(str(f))
}
yield item
async def zip_sender(self, rel_path):
local_path = self.full_path(rel_path)
files = self.files_to_zip(local_path)
aiozip = AioZipStream(files, chunksize=2**16)
async for chunk in aiozip.stream():
yield chunk
def list_dir(self, rel_path, sort_by="name", filter_list=None):
files = list(self.browse_dir(rel_path))
if filter_list is not None:
files = [a for a in files if a["name"] in filter_list]
by_key = sorted(
list(files), key=lambda a: a[sort_by], reverse=sort_by != "name"
)
return sorted(by_key, key=lambda a: a["is_dir"], reverse=True)
def browse_dir(self, rel_path, items=None):
if rel_path.startswith("/"):
raise HTTPBadRequest
if not self.isdir(rel_path):
raise HTTPBadRequest
p = self.full_path(rel_path)
if not os.path.isdir(p):
logger.error(f"not a dir: '{p}'")
# raise NotADirectoryError
raise HTTPNotFound
if not items:
items = os.listdir(p)
for item in items:
fpath = os.path.join(p, item)
try:
st = os.stat(fpath)
except FileNotFoundError:
# for example symlinks pointing nowhere
continue
if item.startswith("."):
continue
is_dir = os.path.isdir(fpath)
date = datetime.fromtimestamp(st.st_ctime).date()
joined_path = urljoin("/", urljoin(rel_path, quote(item)))
if is_dir:
# returns None if its not in the cache
size = self.du_cache.get_size(fpath)
href = joined_path + "/"
else:
size = st.st_size
href = joined_path
yield {
"is_dir": is_dir,
"name": item,
"size": size,
"size_human": self.human_size(size, binary=False),
"href": href,
"date": date.isoformat(),
}