132 lines
3.9 KiB
Python
132 lines
3.9 KiB
Python
import glob
|
|
import os
|
|
import shutil
|
|
import traceback
|
|
from typing import List, Protocol, Sequence, Tuple, TypeVar
|
|
|
|
import vcr
|
|
from click.testing import CliRunner, Result
|
|
from icloudpd.base import main
|
|
|
|
|
|
def print_result_exception(result: Result) -> None:
|
|
ex = result.exception
|
|
if ex:
|
|
# This only works on Python 3
|
|
if hasattr(ex, "__traceback__"):
|
|
traceback.print_exception(type(ex), value=ex, tb=ex.__traceback__)
|
|
else:
|
|
print(ex)
|
|
|
|
|
|
def path_from_project_root(file_name: str) -> str:
|
|
parent = os.path.relpath(os.path.dirname(file_name), "./")
|
|
return parent
|
|
|
|
|
|
def recreate_path(path_name: str) -> None:
|
|
"""Removes if exists and creates dir"""
|
|
if os.path.exists(path_name):
|
|
shutil.rmtree(path_name)
|
|
os.makedirs(path_name)
|
|
|
|
|
|
def create_files(data_dir: str, files_to_create: Sequence[Tuple[str, str, int]]) -> None:
|
|
for dir_name, file_name, file_size in files_to_create:
|
|
normalized_dir_name = os.path.normpath(dir_name)
|
|
os.makedirs(os.path.join(data_dir, normalized_dir_name), exist_ok=True)
|
|
with open(os.path.join(data_dir, normalized_dir_name, file_name), "a") as f:
|
|
f.truncate(file_size)
|
|
|
|
|
|
# TypeVar to parameterize for specific types
|
|
# _SA = TypeVar('_SA', bound='SupportsAdd')
|
|
|
|
# class SupportsAdd(Protocol):
|
|
# """Any type T where +(:T, :T) -> T"""
|
|
# def __add__(self: _SA, other: _SA) -> _SA: ...
|
|
|
|
# class IterableAdd(SupportsAdd, Iterable, Protocol): ...
|
|
|
|
|
|
def combine_file_lists(
|
|
files_to_create: Sequence[Tuple[str, str, int]], files_to_download: List[Tuple[str, str]]
|
|
) -> Sequence[Tuple[str, str]]:
|
|
return (
|
|
[(dir_name, file_name) for (dir_name, file_name, _) in files_to_create]
|
|
) + files_to_download
|
|
|
|
|
|
_T = TypeVar("_T")
|
|
|
|
|
|
class AssertEquality(Protocol):
|
|
def __call__(self, __first: _T, __second: _T, __msg: str) -> None: ...
|
|
|
|
|
|
def assert_files(
|
|
assert_equal: AssertEquality, data_dir: str, files_to_assert: Sequence[Tuple[str, str]]
|
|
) -> None:
|
|
files_in_result = glob.glob(os.path.join(data_dir, "**/*.*"), recursive=True)
|
|
|
|
assert_equal(sum(1 for _ in files_in_result), len(files_to_assert), "File count does not match")
|
|
|
|
for dir_name, file_name in files_to_assert:
|
|
normalized_dir_name = os.path.normpath(dir_name)
|
|
file_path = os.path.join(normalized_dir_name, file_name)
|
|
assert_equal(
|
|
os.path.exists(os.path.join(data_dir, file_path)),
|
|
True,
|
|
f"File {file_path} expected, but does not exist",
|
|
)
|
|
|
|
|
|
def run_cassette(cassette_path: str, params: Sequence[str]) -> Result:
|
|
with vcr.use_cassette(cassette_path):
|
|
# Pass fixed client ID via environment variable
|
|
runner = CliRunner(env={"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"})
|
|
result = runner.invoke(
|
|
main,
|
|
params,
|
|
)
|
|
print_result_exception(result)
|
|
return result
|
|
|
|
|
|
def run_icloudpd_test(
|
|
assert_equal: AssertEquality,
|
|
root_path: str,
|
|
base_dir: str,
|
|
cassette_filename: str,
|
|
files_to_create: Sequence[Tuple[str, str, int]],
|
|
files_to_download: List[Tuple[str, str]],
|
|
params: List[str],
|
|
) -> Tuple[str, Result]:
|
|
cookie_dir = os.path.join(base_dir, "cookie")
|
|
data_dir = os.path.join(base_dir, "data")
|
|
vcr_path = os.path.join(root_path, "vcr_cassettes")
|
|
cookie_master_path = os.path.join(root_path, "cookie")
|
|
|
|
for dir in [base_dir, data_dir]:
|
|
recreate_path(dir)
|
|
|
|
shutil.copytree(cookie_master_path, cookie_dir)
|
|
|
|
create_files(data_dir, files_to_create)
|
|
|
|
result = run_cassette(
|
|
os.path.join(vcr_path, cassette_filename),
|
|
[
|
|
"-d",
|
|
data_dir,
|
|
"--cookie-directory",
|
|
cookie_dir,
|
|
]
|
|
+ params,
|
|
)
|
|
|
|
files_to_assert = combine_file_lists(files_to_create, files_to_download)
|
|
assert_files(assert_equal, data_dir, files_to_assert)
|
|
|
|
return (data_dir, result)
|