145 lines
4.3 KiB
Python
145 lines
4.3 KiB
Python
import json
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from urllib.parse import quote, urljoin
|
|
|
|
import aiofiles
|
|
import humanize
|
|
from aiohttp.web import HTTPBadRequest, HTTPNotFound
|
|
from loguru import logger
|
|
from zipstream import AioZipStream
|
|
|
|
from archives.ducache import DuCache
|
|
|
|
|
|
class FileBrowser:
|
|
def __init__(self, archives_config):
|
|
self.config = archives_config
|
|
self.path = self.config["path"]
|
|
self.du_file = self.config["du_cache_file"]
|
|
self.du_cache = DuCache(self.du_file, self.path)
|
|
|
|
def read_file_json(self, rel_path):
|
|
# for small files
|
|
with open(self.full_path(rel_path), 'r') as f:
|
|
s = f.read()
|
|
return json.loads(s)
|
|
|
|
def zip_name(self, rel_path):
|
|
p = Path(rel_path)
|
|
return f"{p.stem}.zip"
|
|
|
|
def full_path(self, rel_path):
|
|
return os.path.join(self.path, rel_path)
|
|
|
|
def is_relative(self, rel_path):
|
|
local_path = self.full_path(rel_path)
|
|
return Path(local_path).is_relative_to(self.path)
|
|
|
|
def isdir(self, rel_path):
|
|
return os.path.isdir(self.full_path(rel_path))
|
|
|
|
def isfile(self, rel_path):
|
|
return os.path.isfile(self.full_path(rel_path))
|
|
|
|
def exists(self, rel_path):
|
|
local_path = self.full_path(rel_path)
|
|
e = os.path.exists(local_path)
|
|
if not e:
|
|
logger.error(f"file not found: '{local_path}'")
|
|
return e
|
|
|
|
def human_size(self, size, binary):
|
|
if size is None:
|
|
return None
|
|
else:
|
|
return humanize.naturalsize(size, binary=binary)
|
|
|
|
async def file_sender(self, rel_path):
|
|
# has been checked if its from the request object
|
|
if rel_path.startswith("/"):
|
|
raise HTTPBadRequest
|
|
|
|
local_path = self.full_path(rel_path)
|
|
async with aiofiles.open(local_path, "rb") as f:
|
|
while chunk := await f.read(2**16):
|
|
yield chunk
|
|
|
|
def files_to_zip(self, local_path):
|
|
for f in Path(local_path).glob("**/*"):
|
|
if self.isfile(f):
|
|
item = {
|
|
'file': str(f),
|
|
'name': str(f.relative_to(local_path)),
|
|
'compression': 'deflate',
|
|
'is_dir': self.isdir(str(f))
|
|
}
|
|
yield item
|
|
|
|
async def zip_sender(self, rel_path):
|
|
local_path = self.full_path(rel_path)
|
|
|
|
files = self.files_to_zip(local_path)
|
|
aiozip = AioZipStream(files, chunksize=2**16)
|
|
async for chunk in aiozip.stream():
|
|
yield chunk
|
|
|
|
def list_dir(self, rel_path, sort_by="name", filter_list=None):
|
|
files = list(self.browse_dir(rel_path))
|
|
|
|
if filter_list is not None:
|
|
files = [a for a in files if a["name"] in filter_list]
|
|
|
|
by_key = sorted(
|
|
list(files), key=lambda a: a[sort_by], reverse=sort_by != "name"
|
|
)
|
|
return sorted(by_key, key=lambda a: a["is_dir"], reverse=True)
|
|
|
|
def browse_dir(self, rel_path, items=None):
|
|
if rel_path.startswith("/"):
|
|
raise HTTPBadRequest
|
|
if not self.isdir(rel_path):
|
|
raise HTTPBadRequest
|
|
|
|
p = self.full_path(rel_path)
|
|
if not os.path.isdir(p):
|
|
logger.error(f"not a dir: '{p}'")
|
|
# raise NotADirectoryError
|
|
raise HTTPNotFound
|
|
|
|
if not items:
|
|
items = os.listdir(p)
|
|
for item in items:
|
|
fpath = os.path.join(p, item)
|
|
|
|
try:
|
|
st = os.stat(fpath)
|
|
except FileNotFoundError:
|
|
# for example symlinks pointing nowhere
|
|
continue
|
|
|
|
if item.startswith("."):
|
|
continue
|
|
|
|
is_dir = os.path.isdir(fpath)
|
|
date = datetime.fromtimestamp(st.st_ctime).date()
|
|
|
|
joined_path = urljoin("/", urljoin(rel_path, quote(item)))
|
|
if is_dir:
|
|
# returns None if its not in the cache
|
|
size = self.du_cache.get_size(fpath)
|
|
href = joined_path + "/"
|
|
else:
|
|
size = st.st_size
|
|
href = joined_path
|
|
|
|
yield {
|
|
"is_dir": is_dir,
|
|
"name": item,
|
|
"size": size,
|
|
"size_human": self.human_size(size, binary=False),
|
|
"href": href,
|
|
"date": date.isoformat(),
|
|
}
|