icloud_photos_downloader/tests/test_download_photos.py

2296 lines
93 KiB
Python

from unittest import TestCase
from vcr import VCR
import os
import sys
import shutil
import pytest
import mock
import datetime
from mock import call, ANY
from click.testing import CliRunner
import piexif
from piexif._exceptions import InvalidImageDataError
from icloudpd import constants
from pyicloud_ipd.services.photos import PhotoAsset, PhotoAlbum, PhotoLibrary
from pyicloud_ipd.base import PyiCloudService
from pyicloud_ipd.exceptions import PyiCloudAPIResponseException
from requests.exceptions import ConnectionError
from icloudpd.base import main
from tests.helpers import path_from_project_root, print_result_exception, recreate_path
import inspect
import glob
vcr = VCR(decode_compressed_response=True)
class DownloadPhotoTestCase(TestCase):
@pytest.fixture(autouse=True)
def inject_fixtures(self, caplog):
self._caplog = caplog
self.root_path = path_from_project_root(__file__)
self.fixtures_path = os.path.join(self.root_path, "fixtures")
self.vcr_path = os.path.join(self.root_path, "vcr_cassettes")
def test_download_and_skip_existing_photos(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_create = [
("2018/07/30/IMG_7408.JPG", 1151066),
("2018/07/30/IMG_7407.JPG", 656257),
]
files_to_download = [
'2018/07/31/IMG_7409.JPG'
]
os.makedirs(os.path.join(data_dir, "2018/07/30/"))
for (file_name, file_size) in files_to_create:
with open(os.path.join(data_dir, file_name), "a") as f:
f.truncate(file_size)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"5",
"--skip-videos",
"--skip-live-photos",
"--set-exif-datetime",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading 5 original photos to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertNotIn(
"IMG_7409.MOV",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/30/IMG_7408.JPG'))} already exists",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/30/IMG_7407.JPG'))} already exists",
self._caplog.text,
)
self.assertIn(
"DEBUG Skipping IMG_7405.MOV, only downloading photos.",
self._caplog.text,
)
self.assertIn(
"DEBUG Skipping IMG_7404.MOV, only downloading photos.",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(
files_to_create) + len(files_to_download)
for file_name in files_to_download + ([file_name for (file_name, _) in files_to_create]):
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
# Check that file was downloaded
# Check that mtime was updated to the photo creation date
photo_mtime = os.path.getmtime(os.path.join(
data_dir, os.path.normpath("2018/07/31/IMG_7409.JPG")))
photo_modified_time = datetime.datetime.utcfromtimestamp(photo_mtime)
self.assertEqual(
"2018-07-31 07:22:24",
photo_modified_time.strftime('%Y-%m-%d %H:%M:%S'))
def test_download_photos_and_set_exif(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_create = [
("2018/07/30/IMG_7408.JPG", 1151066),
("2018/07/30/IMG_7407.JPG", 656257),
]
files_to_download = [
'2018/07/30/IMG_7405.MOV',
'2018/07/30/IMG_7407.MOV',
'2018/07/30/IMG_7408.MOV',
'2018/07/31/IMG_7409.JPG',
'2018/07/31/IMG_7409.MOV',
]
os.makedirs(os.path.join(data_dir, "2018/07/30/"))
for (file_name, file_size) in files_to_create:
with open(os.path.join(data_dir, file_name), "a") as f:
f.truncate(file_size)
# Download the first photo, but mock the video download
orig_download = PhotoAsset.download
def mocked_download(self, size):
if not hasattr(PhotoAsset, "already_downloaded"):
response = orig_download(self, size)
setattr(PhotoAsset, "already_downloaded", True)
return response
return mock.MagicMock()
with mock.patch.object(PhotoAsset, "download", new=mocked_download):
with mock.patch(
"icloudpd.exif_datetime.get_photo_exif"
) as get_exif_patched:
get_exif_patched.return_value = False
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"4",
"--set-exif-datetime",
# '--skip-videos',
# "--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos and videos from album All Photos...",
self._caplog.text,
)
self.assertIn(
f"INFO Downloading 4 original photos and videos to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
# 2018:07:31 07:22:24 utc
expectedDatetime = datetime.datetime(
2018, 7, 31, 7, 22, 24, tzinfo=datetime.timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S%z")
self.assertIn(
f"DEBUG Setting EXIF timestamp for {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}: {expectedDatetime}",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(
files_to_create) + len(files_to_download)
for file_name in files_to_download + ([file_name for (file_name, _) in files_to_create]):
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_download_photos_and_get_exif_exceptions(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_download = [
'2018/07/31/IMG_7409.JPG'
]
with mock.patch.object(piexif, "load") as piexif_patched:
piexif_patched.side_effect = InvalidImageDataError
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--set-exif-datetime",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading the first original photo to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertIn(
f"DEBUG Error fetching EXIF data for {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertIn(
f"DEBUG Error setting EXIF data for {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(files_to_download)
for file_name in files_to_download:
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_skip_existing_downloads(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_create = [
("2018/07/31/IMG_7409.JPG", 1884695),
("2018/07/31/IMG_7409.MOV", 3294075),
]
files_to_download = [
]
os.makedirs(os.path.join(data_dir, "2018/07/31/"))
for (file_name, file_size) in files_to_create:
with open(os.path.join(data_dir, file_name), "a") as f:
f.truncate(file_size)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
# '--skip-videos',
# "--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos and videos from album All Photos...", self._caplog.text
)
self.assertIn(
f"INFO Downloading the first original photo or video to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))} already exists",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.MOV'))} already exists",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(
files_to_download) + len(files_to_create)
for file_name in files_to_download + ([file_name for (file_name, _) in files_to_create]):
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_until_found(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
os.makedirs(os.path.join(data_dir, "2018/07/30/"))
os.makedirs(os.path.join(data_dir, "2018/07/31/"))
files_to_download = []
files_to_skip = []
files_to_download.append(("2018/07/31/IMG_7409.JPG", "photo"))
files_to_download.append(("2018/07/31/IMG_7409-medium.MOV", "photo"))
files_to_skip.append(("2018/07/30/IMG_7408.JPG", "photo", 1151066))
files_to_skip.append(
("2018/07/30/IMG_7408-medium.MOV", "photo", 894467))
files_to_download.append(("2018/07/30/IMG_7407.JPG", "photo"))
files_to_download.append(("2018/07/30/IMG_7407-medium.MOV", "photo"))
files_to_skip.append(("2018/07/30/IMG_7405.MOV", "video", 36491351))
files_to_skip.append(("2018/07/30/IMG_7404.MOV", "video", 225935003))
files_to_download.append(("2018/07/30/IMG_7403.MOV", "video"))
files_to_download.append(("2018/07/30/IMG_7402.MOV", "video"))
# TODO large files on Windows times out
files_to_skip.append(("2018/07/30/IMG_7401.MOV", "photo", 565699696))
files_to_skip.append(("2018/07/30/IMG_7400.JPG", "photo", 2308885))
files_to_skip.append(
("2018/07/30/IMG_7400-medium.MOV", "photo", 1238639))
files_to_skip.append(("2018/07/30/IMG_7399.JPG", "photo", 2251047))
files_to_download.append(("2018/07/30/IMG_7399-medium.MOV", "photo"))
for f in files_to_skip:
with open(os.path.join(data_dir, f[0]), "a") as fi:
fi.truncate(f[2])
with mock.patch("icloudpd.download.download_media") as dp_patched:
dp_patched.return_value = True
with mock.patch("icloudpd.download.os.utime") as ut_patched:
ut_patched.return_value = None
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--live-photo-size",
"medium",
"--until-found",
"3",
"--recent",
"20",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
expected_calls = list(
map(
lambda f: call(
ANY, False, ANY, ANY, os.path.join(
data_dir, os.path.normpath(f[0])),
"mediumVideo" if (
f[1] == 'photo' and f[0].endswith('.MOV')
) else "original"),
files_to_download,
)
)
dp_patched.assert_has_calls(expected_calls)
self.assertIn(
"DEBUG Looking up all photos and videos from album All Photos...", self._caplog.text
)
self.assertIn(
f"INFO Downloading ??? original photos and videos to {data_dir} ...",
self._caplog.text,
)
for f in files_to_skip:
expected_message = f"DEBUG {os.path.join(data_dir, os.path.normpath(f[0]))} already exists"
self.assertIn(expected_message, self._caplog.text)
self.assertIn(
"INFO Found 3 consecutive previously downloaded photos. Exiting",
self._caplog.text,
)
self.assertNotIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/30/IMG_7399-medium.MOV'))} already exists",
self._caplog.text
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(
files_to_skip) # we faked downloading
for file_name in ([file_name for (file_name, _, _) in files_to_skip]):
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_handle_io_error(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
with mock.patch("icloudpd.download.open", create=True) as m:
# Raise IOError when we try to write to the destination file
m.side_effect = IOError
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading the first original photo to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
"ERROR IOError while writing file to "
f"{os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}. "
"You might have run out of disk space, or the file might "
"be too large for your OS. Skipping this file...",
self._caplog.text,
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_handle_session_error_during_download(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
def mock_raise_response_error(arg):
raise PyiCloudAPIResponseException("Invalid global session", 100)
with mock.patch("time.sleep") as sleep_mock:
with mock.patch.object(PhotoAsset, "download") as pa_download:
pa_download.side_effect = mock_raise_response_error
# Let the initial authenticate() call succeed,
# but do nothing on the second try.
orig_authenticate = PyiCloudService.authenticate
def mocked_authenticate(self):
if not hasattr(self, "already_authenticated"):
orig_authenticate(self)
setattr(self, "already_authenticated", True)
with mock.patch.object(
PyiCloudService, "authenticate", new=mocked_authenticate
):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
# Error msg should be repeated 5 times
assert (
self._caplog.text.count(
"Session error, re-authenticating..."
)
== 5
)
self.assertIn(
"ERROR Could not download IMG_7409.JPG. Please try again later.",
self._caplog.text,
)
# Make sure we only call sleep 4 times (skip the first retry)
self.assertEqual(sleep_mock.call_count, 4)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_handle_session_error_during_photo_iteration(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
def mock_raise_response_error(offset):
raise PyiCloudAPIResponseException("Invalid global session", 100)
with mock.patch("time.sleep") as sleep_mock:
with mock.patch.object(PhotoAlbum, "photos_request") as pa_photos_request:
pa_photos_request.side_effect = mock_raise_response_error
# Let the initial authenticate() call succeed,
# but do nothing on the second try.
orig_authenticate = PyiCloudService.authenticate
def mocked_authenticate(self):
if not hasattr(self, "already_authenticated"):
orig_authenticate(self)
setattr(self, "already_authenticated", True)
with mock.patch.object(
PyiCloudService, "authenticate", new=mocked_authenticate
):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
# Error msg should be repeated 5 times
assert (
self._caplog.text.count(
"Session error, re-authenticating..."
)
== 5
)
self.assertIn(
"ERROR iCloud re-authentication failed. Please try again later.",
self._caplog.text,
)
# Make sure we only call sleep 4 times (skip the first retry)
self.assertEqual(sleep_mock.call_count, 4)
assert result.exit_code == 1
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_handle_connection_error(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
def mock_raise_response_error(arg):
raise ConnectionError("Connection Error")
with mock.patch.object(PhotoAsset, "download") as pa_download:
pa_download.side_effect = mock_raise_response_error
# Let the initial authenticate() call succeed,
# but do nothing on the second try.
orig_authenticate = PyiCloudService.authenticate
def mocked_authenticate(self):
if not hasattr(self, "already_authenticated"):
orig_authenticate(self)
setattr(self, "already_authenticated", True)
with mock.patch("icloudpd.constants.WAIT_SECONDS", 0):
with mock.patch.object(
PyiCloudService, "authenticate", new=mocked_authenticate
):
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
# Error msg should be repeated 5 times
assert (
self._caplog.text.count(
"Error downloading IMG_7409.JPG, retrying after 0 seconds..."
)
== 5
)
self.assertIn(
"ERROR Could not download IMG_7409.JPG. Please try again later.",
self._caplog.text,
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_handle_albums_error(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
def mock_raise_response_error():
raise PyiCloudAPIResponseException("Api Error", 100)
with mock.patch.object(PhotoLibrary, "_fetch_folders") as pa_photos_request:
pa_photos_request.side_effect = mock_raise_response_error
# Let the initial authenticate() call succeed,
# but do nothing on the second try.
orig_authenticate = PyiCloudService.authenticate
def mocked_authenticate(self):
if not hasattr(self, "already_authenticated"):
orig_authenticate(self)
setattr(self, "already_authenticated", True)
with mock.patch("icloudpd.constants.WAIT_SECONDS", 0):
with mock.patch.object(
PyiCloudService, "authenticate", new=mocked_authenticate
):
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
assert result.exit_code == 1
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_missing_size(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with mock.patch.object(PhotoAsset, "download") as pa_download:
pa_download.return_value = False
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"3",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos and videos from album All Photos...", self._caplog.text
)
self.assertIn(
f"INFO Downloading 3 original photos and videos to {data_dir} ...",
self._caplog.text,
)
# These error messages should not be repeated more than once for each size
for filename in ["IMG_7409.JPG", "IMG_7408.JPG", "IMG_7407.JPG"]:
for size in ["original", "originalVideo"]:
self.assertEqual(
sum(1 for line in self._caplog.text.splitlines() if line ==
f"ERROR Could not find URL to download {filename} for size {size}"
),
1,
f"Errors for {filename} size {size}"
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
self.assertEqual(result.exit_code, 0, "Exit code")
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
self.assertEqual(sum(1 for _ in files_in_result), 0, "Files in result")
def test_size_fallback_to_original(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with mock.patch("icloudpd.download.download_media") as dp_patched:
dp_patched.return_value = True
with mock.patch("icloudpd.download.os.utime") as ut_patched:
ut_patched.return_value = None
with mock.patch.object(PhotoAsset, "versions") as pa:
pa.return_value = ["original", "medium"]
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--size",
"thumb",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos and videos from album All Photos...",
self._caplog.text,
)
self.assertIn(
f"INFO Downloading the first thumb photo or video to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
dp_patched.assert_called_once_with(
ANY,
False,
ANY,
ANY,
f"{os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
"original",
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_force_size(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with mock.patch("icloudpd.download.download_media") as dp_patched:
dp_patched.return_value = True
with mock.patch.object(PhotoAsset, "versions") as pa:
pa.return_value = ["original", "medium"]
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--size",
"thumb",
"--force-size",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos and videos from album All Photos...",
self._caplog.text,
)
self.assertIn(
f"INFO Downloading the first thumb photo or video to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
"ERROR thumb size does not exist for IMG_7409.JPG. Skipping...",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
dp_patched.assert_not_called
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_invalid_creation_date(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_download = [
'2018/01/01/IMG_7409.JPG'
]
with mock.patch.object(PhotoAsset, "created", new_callable=mock.PropertyMock) as dt_mock:
# Can't mock `astimezone` because it's a readonly property, so have to
# create a new class that inherits from datetime.datetime
class NewDateTime(datetime.datetime):
def astimezone(self, tz=None):
raise ValueError('Invalid date')
dt_mock.return_value = NewDateTime(2018, 1, 1, 0, 0, 0)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos and videos from album All Photos...",
self._caplog.text,
)
self.assertIn(
f"INFO Downloading the first original photo or video to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
"ERROR Could not convert photo created date to local timezone (2018-01-01 00:00:00)",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/01/01/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(files_to_download)
for file_name in files_to_download:
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
@pytest.mark.skipif(sys.platform == 'win32',
reason="does not run on windows")
@pytest.mark.skipif(sys.platform == 'darwin',
reason="does not run on mac")
def test_invalid_creation_year(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_download = [
'5/01/01/IMG_7409.JPG'
]
with mock.patch.object(PhotoAsset, "created", new_callable=mock.PropertyMock) as dt_mock:
# Can't mock `astimezone` because it's a readonly property, so have to
# create a new class that inherits from datetime.datetime
class NewDateTime(datetime.datetime):
def astimezone(self, tz=None):
raise ValueError('Invalid date')
dt_mock.return_value = NewDateTime(5, 1, 1, 0, 0, 0)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos and videos from album All Photos...",
self._caplog.text,
)
self.assertIn(
f"INFO Downloading the first original photo or video to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
"ERROR Could not convert photo created date to local timezone (0005-01-01 00:00:00)",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('5/01/01/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(files_to_download)
for file_name in files_to_download:
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_unknown_item_type(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with mock.patch("icloudpd.download.download_media") as dp_patched:
dp_patched.return_value = True
with mock.patch.object(PhotoAsset, "item_type", new_callable=mock.PropertyMock) as it_mock:
it_mock.return_value = 'unknown'
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos and videos from album All Photos...",
self._caplog.text,
)
self.assertIn(
f"INFO Downloading the first original photo or video to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
"DEBUG Skipping IMG_7409.JPG, only downloading photos and videos. (Item type was: unknown)",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
dp_patched.assert_not_called
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_download_and_dedupe_existing_photos(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
os.makedirs(os.path.join(data_dir, os.path.normpath("2018/07/31/")))
with open(os.path.join(data_dir, os.path.normpath("2018/07/31/IMG_7409.JPG")), "a") as f:
f.truncate(1)
with open(os.path.join(data_dir, os.path.normpath("2018/07/31/IMG_7409.MOV")), "a") as f:
f.truncate(1)
os.makedirs(os.path.join(data_dir, os.path.normpath("2018/07/30/")))
with open(os.path.join(data_dir, os.path.normpath("2018/07/30/IMG_7408.JPG")), "a") as f:
f.truncate(1151066)
with open(os.path.join(data_dir, os.path.normpath("2018/07/30/IMG_7408.MOV")), "a") as f:
f.truncate(1606512)
# Download the first photo, but mock the video download
orig_download = PhotoAsset.download
def mocked_download(self, size):
if not hasattr(PhotoAsset, "already_downloaded"):
response = orig_download(self, size)
setattr(PhotoAsset, "already_downloaded", True)
return response
return mock.MagicMock()
with mock.patch.object(PhotoAsset, "download", new=mocked_download):
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"5",
"--skip-videos",
# "--set-exif-datetime",
"--no-progress-bar",
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
"--threads-num",
"1",
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading 5 original photos to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409-1884695.JPG'))} deduplicated",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409-1884695.JPG'))}",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409-3294075.MOV'))} deduplicated",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409-3294075.MOV'))}",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/30/IMG_7408.JPG'))} already exists",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/30/IMG_7408.MOV'))} already exists",
self._caplog.text,
)
self.assertIn(
"DEBUG Skipping IMG_7405.MOV, only downloading photos.", self._caplog.text
)
self.assertIn(
"DEBUG Skipping IMG_7404.MOV, only downloading photos.", self._caplog.text
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
# Check that file was downloaded
self.assertTrue(
os.path.exists(os.path.join(data_dir, os.path.normpath("2018/07/31/IMG_7409-1884695.JPG"))))
# Check that mtime was updated to the photo creation date
photo_mtime = os.path.getmtime(os.path.join(
data_dir, os.path.normpath("2018/07/31/IMG_7409-1884695.JPG")))
photo_modified_time = datetime.datetime.utcfromtimestamp(
photo_mtime)
self.assertEqual(
"2018-07-31 07:22:24",
photo_modified_time.strftime('%Y-%m-%d %H:%M:%S'))
self.assertTrue(
os.path.exists(os.path.join(data_dir, os.path.normpath("2018/07/31/IMG_7409-3294075.MOV"))))
photo_mtime = os.path.getmtime(os.path.join(
data_dir, os.path.normpath("2018/07/31/IMG_7409-3294075.MOV")))
photo_modified_time = datetime.datetime.utcfromtimestamp(
photo_mtime)
self.assertEqual(
"2018-07-31 07:22:24",
photo_modified_time.strftime('%Y-%m-%d %H:%M:%S'))
assert result.exit_code == 0
def test_download_photos_and_set_exif_exceptions(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_download = [
'2018/07/31/IMG_7409.JPG'
]
with mock.patch.object(piexif, "insert") as piexif_patched:
piexif_patched.side_effect = InvalidImageDataError
with mock.patch(
"icloudpd.exif_datetime.get_photo_exif"
) as get_exif_patched:
get_exif_patched.return_value = False
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--set-exif-datetime",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading the first original photo to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
# 2018:07:31 07:22:24 utc
expectedDatetime = datetime.datetime(
2018, 7, 31, 7, 22, 24, tzinfo=datetime.timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S%z")
self.assertIn(
f"DEBUG Setting EXIF timestamp for {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}: {expectedDatetime}",
self._caplog.text,
)
self.assertIn(
f"DEBUG Error setting EXIF data for {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(files_to_download)
for file_name in files_to_download:
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_download_chinese(self):
base_dir = os.path.join(
self.fixtures_path, inspect.stack()[0][3], "中文")
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_download = [
'2018/07/31/IMG_7409.JPG'
]
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--set-exif-datetime",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f'INFO Downloading the first original photo to {data_dir} ...',
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertNotIn(
"IMG_7409.MOV",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
# Check that file was downloaded
self.assertTrue(
os.path.exists(os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))))
# Check that mtime was updated to the photo creation date
photo_mtime = os.path.getmtime(os.path.join(
data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG')))
photo_modified_time = datetime.datetime.utcfromtimestamp(
photo_mtime)
self.assertEqual(
"2018-07-31 07:22:24",
photo_modified_time.strftime('%Y-%m-%d %H:%M:%S'))
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(files_to_download)
for file_name in files_to_download:
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_download_after_delete(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_download = [
'2018/07/31/IMG_7409.JPG'
]
with mock.patch.object(piexif, "insert") as piexif_patched:
piexif_patched.side_effect = InvalidImageDataError
with mock.patch(
"icloudpd.exif_datetime.get_photo_exif"
) as get_exif_patched:
get_exif_patched.return_value = False
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")) as cass:
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"--delete-after-download",
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading the first original photo to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertIn(
"INFO Deleted IMG_7409.JPG in iCloud", self._caplog.text
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert cass.all_played
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(files_to_download)
for file_name in files_to_download:
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_download_after_delete_fail(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos_no_delete.yml")) as cass:
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"--delete-after-download",
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading the first original photo to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertNotIn(
"INFO Deleted IMG_7409.JPG in iCloud", self._caplog.text
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert cass.all_played
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_download_over_old_original_photos(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_create = [
("2018/07/30/IMG_7408-original.JPG", 1151066),
("2018/07/30/IMG_7407.JPG", 656257)
]
files_to_download = [
'2018/07/31/IMG_7409.JPG'
]
os.makedirs(os.path.join(data_dir, "2018/07/30/"))
for (file_name, file_size) in files_to_create:
with open(os.path.join(data_dir, file_name), "a") as f:
f.truncate(file_size)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"5",
"--skip-videos",
"--skip-live-photos",
"--set-exif-datetime",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading 5 original photos to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertNotIn(
"IMG_7409.MOV",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/30/IMG_7408.JPG'))} already exists",
self._caplog.text,
)
self.assertIn(
f"DEBUG {os.path.join(data_dir, os.path.normpath('2018/07/30/IMG_7407.JPG'))} already exists",
self._caplog.text,
)
self.assertIn(
"DEBUG Skipping IMG_7405.MOV, only downloading photos.",
self._caplog.text,
)
self.assertIn(
"DEBUG Skipping IMG_7404.MOV, only downloading photos.",
self._caplog.text,
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
# Check that file was downloaded
self.assertTrue(
os.path.exists(os.path.join(data_dir, os.path.normpath("2018/07/31/IMG_7409.JPG"))))
# Check that mtime was updated to the photo creation date
photo_mtime = os.path.getmtime(os.path.join(
data_dir, os.path.normpath("2018/07/31/IMG_7409.JPG")))
photo_modified_time = datetime.datetime.utcfromtimestamp(
photo_mtime)
self.assertEqual(
"2018-07-31 07:22:24",
photo_modified_time.strftime('%Y-%m-%d %H:%M:%S'))
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(
files_to_download) + len(files_to_create)
for file_name in files_to_download + ([file_name for (file_name, _) in files_to_create]):
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_download_normalized_names(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_create = [
("2018/07/30/IMG_7408.JPG", 1151066),
("2018/07/30/IMG_7407.JPG", 656257),
]
files_to_download = [
# <>:"/\|?* -- windows
# / & \0x00 -- linux
# SU1HXzc0MDkuSlBH -> i/n v:a\0l*i?d\p<a>t"h|.JPG -> aS9uIHY6YQBsKmk/ZFxwPGE+dCJofC5KUEc=
'2018/07/31/i_n v_a_l_i_d_p_a_t_h_.JPG'
]
os.makedirs(os.path.join(data_dir, "2018/07/30/"))
for (file_name, file_size) in files_to_create:
with open(os.path.join(data_dir, file_name), "a") as f:
f.truncate(file_size)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos_bad_filename.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"5",
"--skip-videos",
"--skip-live-photos",
"--set-exif-datetime",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(
files_to_create) + len(files_to_download)
for file_name in files_to_download + ([file_name for (file_name, _) in files_to_create]):
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
@pytest.mark.skip("not ready yet. may be not needed")
def test_download_watch(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_create = [
("2018/07/30/IMG_7408.JPG", 1151066),
("2018/07/30/IMG_7407.JPG", 656257),
]
files_to_download = [
'2018/07/31/IMG_7409.JPG'
]
os.makedirs(os.path.join(data_dir, "2018/07/30/"))
for (file_name, file_size) in files_to_create:
with open(os.path.join(data_dir, file_name), "a") as f:
f.truncate(file_size)
def my_sleep(target_duration):
counter = 0
def sleep_(duration):
if counter > duration:
raise ValueError("SLEEP MOCK")
counter = counter + 1
return sleep_
with mock.patch("time.sleep") as sleep_patched:
# import random
target_duration = 1
sleep_patched.side_effect = my_sleep(target_duration)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"5",
"--skip-videos",
"--skip-live-photos",
"--set-exif-datetime",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--watch-with-interval",
target_duration,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == len(
files_to_create) + len(files_to_download)
for file_name in files_to_download + ([file_name for (file_name, _) in files_to_create]):
assert os.path.exists(os.path.join(data_dir, os.path.normpath(
file_name))), f"File {file_name} expected, but does not exist"
def test_handle_internal_error_during_download(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
def mock_raise_response_error(arg):
raise PyiCloudAPIResponseException(
"INTERNAL_ERROR", "INTERNAL_ERROR")
with mock.patch("time.sleep") as sleep_mock:
with mock.patch.object(PhotoAsset, "download") as pa_download:
pa_download.side_effect = mock_raise_response_error
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
# Error msg should be repeated 5 times
# self.assertEqual(
# self._caplog.text.count(
# "Error downloading"
# ), constants.MAX_RETRIES, "Retry count"
# )
self.assertIn(
"ERROR Could not download IMG_7409.JPG. Please try again later.",
self._caplog.text,
)
# Make sure we only call sleep 4 times (skip the first retry)
self.assertEqual(sleep_mock.call_count, 5)
self.assertEqual(result.exit_code, 0, "Exit Code")
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_handle_internal_error_during_photo_iteration(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
def mock_raise_response_error(offset):
raise PyiCloudAPIResponseException(
"INTERNAL_ERROR", "INTERNAL_ERROR")
with mock.patch("time.sleep") as sleep_mock:
with mock.patch.object(PhotoAlbum, "photos_request") as pa_photos_request:
pa_photos_request.side_effect = mock_raise_response_error
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
# Error msg should be repeated 5 times
self.assertEqual(
self._caplog.text.count(
"Internal Error at Apple, retrying..."
), constants.MAX_RETRIES, "Retry count"
)
self.assertIn(
"ERROR Internal Error at Apple.",
self._caplog.text,
)
# Make sure we only call sleep 4 times (skip the first retry)
self.assertEqual(sleep_mock.call_count, 5)
self.assertEqual(result.exit_code, 1, "Exit Code")
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
assert sum(1 for _ in files_in_result) == 0
def test_handle_io_error_mkdir(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
with mock.patch("os.makedirs", create=True) as m:
# Raise IOError when we try to write to the destination file
m.side_effect = IOError
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading the first original photo to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"ERROR Could not create folder {data_dir}",
self._caplog.text,
)
self.assertEqual(result.exit_code, 0, "Exit code")
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
self.assertEqual(sum(1 for _ in files_in_result),
0, "Files at the end")
def test_dry_run(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
files_to_download = [
'2018/07/31/IMG_7409.JPG',
# "2018/07/30/IMG_7408.JPG",
# "2018/07/30/IMG_7407.JPG",
]
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")):
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--set-exif-datetime",
"--no-progress-bar",
"--dry-run",
"--threads-num",
1,
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
# self.assertIn(
# f"INFO Downloading 2 original photos to {data_dir} ...",
# self._caplog.text,
# )
for f in files_to_download:
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath(f))}",
self._caplog.text,
)
self.assertNotIn(
"IMG_7409.MOV",
self._caplog.text,
)
self.assertNotIn(
"ERROR",
self._caplog.text,
)
# self.assertIn(
# "DEBUG Skipping IMG_7405.MOV, only downloading photos.",
# self._caplog.text,
# )
# self.assertIn(
# "DEBUG Skipping IMG_7404.MOV, only downloading photos.",
# self._caplog.text,
# )
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
assert result.exit_code == 0
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
self.assertEqual(sum(1 for _ in files_in_result),
0, "Files in the result")
def test_download_after_delete_dry_run(self):
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
data_dir = os.path.join(base_dir, "data")
for dir in [base_dir, cookie_dir, data_dir]:
recreate_path(dir)
def raise_response_error(a0_, a1_, a2_):
raise Exception("Unexpected call to delete_photo")
with mock.patch.object(piexif, "insert") as piexif_patched:
piexif_patched.side_effect = InvalidImageDataError
with mock.patch(
"icloudpd.exif_datetime.get_photo_exif"
) as get_exif_patched:
get_exif_patched.return_value = False
with mock.patch(
"icloudpd.base.delete_photo"
) as df_patched:
df_patched.side_effect = raise_response_error
with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")) as cass:
# Pass fixed client ID via environment variable
runner = CliRunner(env={
"CLIENT_ID": "DE309E26-942E-11E8-92F5-14109FE0B321"
})
result = runner.invoke(
main,
[
"--username",
"jdoe@gmail.com",
"--password",
"password1",
"--recent",
"1",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--dry-run",
"--threads-num",
1,
"--delete-after-download",
"-d",
data_dir,
"--cookie-directory",
cookie_dir,
],
)
print_result_exception(result)
self.assertIn(
"DEBUG Looking up all photos from album All Photos...", self._caplog.text)
self.assertIn(
f"INFO Downloading the first original photo to {data_dir} ...",
self._caplog.text,
)
self.assertIn(
f"DEBUG Downloading {os.path.join(data_dir, os.path.normpath('2018/07/31/IMG_7409.JPG'))}",
self._caplog.text,
)
self.assertIn(
"INFO [DRY RUN] Would delete IMG_7409.JPG in iCloud", self._caplog.text
)
self.assertIn(
"INFO All photos have been downloaded", self._caplog.text
)
self.assertEqual(
cass.all_played, False, "All mocks played")
self.assertEqual(result.exit_code, 0, "Exit code")
files_in_result = glob.glob(os.path.join(
data_dir, "**/*.*"), recursive=True)
self.assertEqual(sum(1 for _ in files_in_result),
0, "Files in the result")