966 lines
35 KiB
Python
966 lines
35 KiB
Python
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import csv
|
|
import functools
|
|
import itertools
|
|
import json
|
|
import threading
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from concurrent.futures import wait
|
|
from pathlib import Path
|
|
from subprocess import CalledProcessError
|
|
from typing import TYPE_CHECKING
|
|
from typing import Any
|
|
|
|
from cleo.io.null_io import NullIO
|
|
from poetry.core.packages.utils.link import Link
|
|
|
|
from poetry.installation.chef import Chef
|
|
from poetry.installation.chef import ChefBuildError
|
|
from poetry.installation.chef import ChefInstallError
|
|
from poetry.installation.chooser import Chooser
|
|
from poetry.installation.operations import Install
|
|
from poetry.installation.operations import Uninstall
|
|
from poetry.installation.operations import Update
|
|
from poetry.installation.wheel_installer import WheelInstaller
|
|
from poetry.puzzle.exceptions import SolverProblemError
|
|
from poetry.utils._compat import decode
|
|
from poetry.utils.authenticator import Authenticator
|
|
from poetry.utils.env import EnvCommandError
|
|
from poetry.utils.helpers import Downloader
|
|
from poetry.utils.helpers import get_file_hash
|
|
from poetry.utils.helpers import get_highest_priority_hash_type
|
|
from poetry.utils.helpers import pluralize
|
|
from poetry.utils.helpers import remove_directory
|
|
from poetry.utils.pip import pip_install
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from cleo.io.io import IO
|
|
from cleo.io.outputs.section_output import SectionOutput
|
|
from poetry.core.masonry.builders.builder import Builder
|
|
from poetry.core.packages.package import Package
|
|
|
|
from poetry.config.config import Config
|
|
from poetry.installation.operations.operation import Operation
|
|
from poetry.repositories import RepositoryPool
|
|
from poetry.utils.env import Env
|
|
|
|
|
|
class Executor:
|
|
def __init__(
|
|
self,
|
|
env: Env,
|
|
pool: RepositoryPool,
|
|
config: Config,
|
|
io: IO,
|
|
parallel: bool | None = None,
|
|
disable_cache: bool = False,
|
|
) -> None:
|
|
self._env = env
|
|
self._io = io
|
|
self._dry_run = False
|
|
self._enabled = True
|
|
self._verbose = False
|
|
self._wheel_installer = WheelInstaller(self._env)
|
|
self._use_modern_installation = config.get(
|
|
"installer.modern-installation", True
|
|
)
|
|
if not self._use_modern_installation:
|
|
self._io.write_line(
|
|
"<warning>Warning: Setting `installer.modern-installation` to `false` "
|
|
"is deprecated.</>"
|
|
)
|
|
self._io.write_line(
|
|
"<warning>The pip-based installer will be removed in a future release.</>"
|
|
)
|
|
self._io.write_line(
|
|
"<warning>See https://github.com/python-poetry/poetry/issues/8987.</>"
|
|
)
|
|
|
|
if parallel is None:
|
|
parallel = config.get("installer.parallel", True)
|
|
|
|
if parallel:
|
|
self._max_workers = config.installer_max_workers
|
|
else:
|
|
self._max_workers = 1
|
|
|
|
self._artifact_cache = pool.artifact_cache
|
|
self._authenticator = Authenticator(
|
|
config, self._io, disable_cache=disable_cache, pool_size=self._max_workers
|
|
)
|
|
self._chef = Chef(self._artifact_cache, self._env, pool)
|
|
self._chooser = Chooser(pool, self._env, config)
|
|
|
|
self._executor = ThreadPoolExecutor(max_workers=self._max_workers)
|
|
self._total_operations = 0
|
|
self._executed_operations = 0
|
|
self._executed = {"install": 0, "update": 0, "uninstall": 0}
|
|
self._skipped = {"install": 0, "update": 0, "uninstall": 0}
|
|
self._sections: dict[int, SectionOutput] = {}
|
|
self._yanked_warnings: list[str] = []
|
|
self._lock = threading.Lock()
|
|
self._shutdown = False
|
|
self._hashes: dict[str, str] = {}
|
|
|
|
@property
|
|
def installations_count(self) -> int:
|
|
return self._executed["install"]
|
|
|
|
@property
|
|
def updates_count(self) -> int:
|
|
return self._executed["update"]
|
|
|
|
@property
|
|
def removals_count(self) -> int:
|
|
return self._executed["uninstall"]
|
|
|
|
@property
|
|
def enabled(self) -> bool:
|
|
return self._enabled
|
|
|
|
def supports_fancy_output(self) -> bool:
|
|
return self._io.output.is_decorated() and not self._dry_run
|
|
|
|
def disable(self) -> Executor:
|
|
self._enabled = False
|
|
|
|
return self
|
|
|
|
def dry_run(self, dry_run: bool = True) -> Executor:
|
|
self._dry_run = dry_run
|
|
|
|
return self
|
|
|
|
def verbose(self, verbose: bool = True) -> Executor:
|
|
self._verbose = verbose
|
|
|
|
return self
|
|
|
|
def enable_bytecode_compilation(self, enable: bool = True) -> None:
|
|
self._wheel_installer.enable_bytecode_compilation(enable)
|
|
|
|
def pip_install(
|
|
self, req: Path, upgrade: bool = False, editable: bool = False
|
|
) -> int:
|
|
try:
|
|
pip_install(req, self._env, upgrade=upgrade, editable=editable)
|
|
except EnvCommandError as e:
|
|
output = decode(e.e.output)
|
|
if (
|
|
"KeyboardInterrupt" in output
|
|
or "ERROR: Operation cancelled by user" in output
|
|
):
|
|
return -2
|
|
raise
|
|
|
|
return 0
|
|
|
|
def execute(self, operations: list[Operation]) -> int:
|
|
self._total_operations = len(operations)
|
|
for job_type in self._executed:
|
|
self._executed[job_type] = 0
|
|
self._skipped[job_type] = 0
|
|
|
|
if operations and (self._enabled or self._dry_run):
|
|
self._display_summary(operations)
|
|
|
|
self._sections = {}
|
|
self._yanked_warnings = []
|
|
|
|
# pip has to be installed first without parallelism if we install via pip
|
|
for i, op in enumerate(operations):
|
|
if op.package.name == "pip":
|
|
wait([self._executor.submit(self._execute_operation, op)])
|
|
del operations[i]
|
|
break
|
|
|
|
# We group operations by priority
|
|
groups = itertools.groupby(operations, key=lambda o: -o.priority)
|
|
for _, group in groups:
|
|
tasks = []
|
|
serial_operations = []
|
|
for operation in group:
|
|
if self._shutdown:
|
|
break
|
|
|
|
# Some operations are unsafe, we must execute them serially in a group
|
|
# https://github.com/python-poetry/poetry/issues/3086
|
|
# https://github.com/python-poetry/poetry/issues/2658
|
|
#
|
|
# We need to explicitly check source type here, see:
|
|
# https://github.com/python-poetry/poetry-core/pull/98
|
|
is_parallel_unsafe = operation.job_type == "uninstall" or (
|
|
operation.package.develop
|
|
and operation.package.source_type in {"directory", "git"}
|
|
)
|
|
if not operation.skipped and is_parallel_unsafe:
|
|
serial_operations.append(operation)
|
|
continue
|
|
|
|
tasks.append(self._executor.submit(self._execute_operation, operation))
|
|
|
|
try:
|
|
wait(tasks)
|
|
|
|
for operation in serial_operations:
|
|
wait([self._executor.submit(self._execute_operation, operation)])
|
|
|
|
except KeyboardInterrupt:
|
|
self._shutdown = True
|
|
|
|
if self._shutdown:
|
|
# Cancelling further tasks from being executed
|
|
[task.cancel() for task in tasks]
|
|
self._executor.shutdown(wait=True)
|
|
|
|
break
|
|
|
|
for warning in self._yanked_warnings:
|
|
self._io.write_error_line(f"<warning>Warning: {warning}</warning>")
|
|
for path, issues in self._wheel_installer.invalid_wheels.items():
|
|
formatted_issues = "\n".join(issues)
|
|
warning = (
|
|
f"Validation of the RECORD file of {path.name} failed."
|
|
" Please report to the maintainers of that package so they can fix"
|
|
f" their build process. Details:\n{formatted_issues}\n"
|
|
)
|
|
self._io.write_error_line(f"<warning>Warning: {warning}</warning>")
|
|
|
|
return 1 if self._shutdown else 0
|
|
|
|
def _write(self, operation: Operation, line: str) -> None:
|
|
if not self.supports_fancy_output() or not self._should_write_operation(
|
|
operation
|
|
):
|
|
return
|
|
|
|
if self._io.is_debug():
|
|
with self._lock:
|
|
section = self._sections[id(operation)]
|
|
section.write_line(line)
|
|
|
|
return
|
|
|
|
with self._lock:
|
|
section = self._sections[id(operation)]
|
|
section.clear()
|
|
section.write(line)
|
|
|
|
def _execute_operation(self, operation: Operation) -> None:
|
|
try:
|
|
op_message = self.get_operation_message(operation)
|
|
if self.supports_fancy_output():
|
|
if id(operation) not in self._sections and self._should_write_operation(
|
|
operation
|
|
):
|
|
with self._lock:
|
|
self._sections[id(operation)] = self._io.section()
|
|
self._sections[id(operation)].write_line(
|
|
f" <fg=blue;options=bold>-</> {op_message}:"
|
|
" <fg=blue>Pending...</>"
|
|
)
|
|
else:
|
|
if self._should_write_operation(operation):
|
|
if not operation.skipped:
|
|
self._io.write_line(
|
|
f" <fg=blue;options=bold>-</> {op_message}"
|
|
)
|
|
else:
|
|
self._io.write_line(
|
|
f" <fg=default;options=bold,dark>-</> {op_message}: "
|
|
"<fg=default;options=bold,dark>Skipped</> "
|
|
"<fg=default;options=dark>for the following reason:</> "
|
|
f"<fg=default;options=bold,dark>{operation.skip_reason}</>"
|
|
)
|
|
|
|
try:
|
|
result = self._do_execute_operation(operation)
|
|
except EnvCommandError as e:
|
|
if e.e.returncode == -2:
|
|
result = -2
|
|
else:
|
|
raise
|
|
|
|
# If we have a result of -2 it means a KeyboardInterrupt
|
|
# in the any python subprocess, so we raise a KeyboardInterrupt
|
|
# error to be picked up by the error handler.
|
|
if result == -2:
|
|
raise KeyboardInterrupt
|
|
except Exception as e:
|
|
try:
|
|
from cleo.ui.exception_trace import ExceptionTrace
|
|
|
|
io: IO | SectionOutput
|
|
if not self.supports_fancy_output():
|
|
io = self._io
|
|
else:
|
|
message = (
|
|
" <error>-</error>"
|
|
f" {self.get_operation_message(operation, error=True)}:"
|
|
" <error>Failed</error>"
|
|
)
|
|
self._write(operation, message)
|
|
io = self._sections.get(id(operation), self._io)
|
|
|
|
with self._lock:
|
|
trace = ExceptionTrace(e)
|
|
trace.render(io)
|
|
pkg = operation.package
|
|
if isinstance(e, ChefBuildError):
|
|
pip_command = "pip wheel --no-cache-dir --use-pep517"
|
|
if pkg.develop:
|
|
requirement = pkg.source_url
|
|
pip_command += " --editable"
|
|
else:
|
|
requirement = (
|
|
pkg.to_dependency().to_pep_508().split(";")[0].strip()
|
|
)
|
|
message = (
|
|
"<info>"
|
|
"Note: This error originates from the build backend,"
|
|
" and is likely not a problem with poetry"
|
|
f" but with {pkg.pretty_name} ({pkg.full_pretty_version})"
|
|
" not supporting PEP 517 builds. You can verify this by"
|
|
f" running '{pip_command} \"{requirement}\"'."
|
|
"</info>"
|
|
)
|
|
elif isinstance(e, ChefInstallError):
|
|
message = (
|
|
"<error>"
|
|
"Cannot install build-system.requires"
|
|
f" for {pkg.pretty_name}."
|
|
"</error>"
|
|
)
|
|
elif isinstance(e, SolverProblemError):
|
|
message = (
|
|
"<error>"
|
|
"Cannot resolve build-system.requires"
|
|
f" for {pkg.pretty_name}."
|
|
"</error>"
|
|
)
|
|
else:
|
|
message = f"<error>Cannot install {pkg.pretty_name}.</error>"
|
|
|
|
io.write_line("")
|
|
io.write_line(message)
|
|
io.write_line("")
|
|
finally:
|
|
with self._lock:
|
|
self._shutdown = True
|
|
|
|
except KeyboardInterrupt:
|
|
try:
|
|
message = (
|
|
" <warning>-</warning>"
|
|
f" {self.get_operation_message(operation, warning=True)}:"
|
|
" <warning>Cancelled</warning>"
|
|
)
|
|
if not self.supports_fancy_output():
|
|
self._io.write_line(message)
|
|
else:
|
|
self._write(operation, message)
|
|
finally:
|
|
with self._lock:
|
|
self._shutdown = True
|
|
|
|
def _do_execute_operation(self, operation: Operation) -> int:
|
|
method = operation.job_type
|
|
|
|
operation_message = self.get_operation_message(operation)
|
|
if operation.skipped:
|
|
if self.supports_fancy_output():
|
|
self._write(
|
|
operation,
|
|
f" <fg=default;options=bold,dark>-</> {operation_message}: "
|
|
"<fg=default;options=bold,dark>Skipped</> "
|
|
"<fg=default;options=dark>for the following reason:</> "
|
|
f"<fg=default;options=bold,dark>{operation.skip_reason}</>",
|
|
)
|
|
|
|
self._skipped[operation.job_type] += 1
|
|
|
|
return 0
|
|
|
|
if not self._enabled or self._dry_run:
|
|
return 0
|
|
|
|
result: int = getattr(self, f"_execute_{method}")(operation)
|
|
|
|
if result != 0:
|
|
return result
|
|
|
|
operation_message = self.get_operation_message(operation, done=True)
|
|
message = f" <fg=green;options=bold>-</> {operation_message}"
|
|
self._write(operation, message)
|
|
|
|
self._increment_operations_count(operation, True)
|
|
|
|
return result
|
|
|
|
def _increment_operations_count(self, operation: Operation, executed: bool) -> None:
|
|
with self._lock:
|
|
if executed:
|
|
self._executed_operations += 1
|
|
self._executed[operation.job_type] += 1
|
|
else:
|
|
self._skipped[operation.job_type] += 1
|
|
|
|
def run_pip(self, *args: Any, **kwargs: Any) -> int:
|
|
try:
|
|
self._env.run_pip(*args, **kwargs)
|
|
except EnvCommandError as e:
|
|
output = decode(e.e.output)
|
|
if (
|
|
"KeyboardInterrupt" in output
|
|
or "ERROR: Operation cancelled by user" in output
|
|
):
|
|
return -2
|
|
|
|
raise
|
|
|
|
return 0
|
|
|
|
def get_operation_message(
|
|
self,
|
|
operation: Operation,
|
|
done: bool = False,
|
|
error: bool = False,
|
|
warning: bool = False,
|
|
) -> str:
|
|
base_tag = "fg=default"
|
|
operation_color = "c2"
|
|
source_operation_color = "c2"
|
|
package_color = "c1"
|
|
|
|
if error:
|
|
operation_color = "error"
|
|
elif warning:
|
|
operation_color = "warning"
|
|
elif done:
|
|
operation_color = "success"
|
|
|
|
if operation.skipped:
|
|
base_tag = "fg=default;options=dark"
|
|
operation_color += "_dark"
|
|
source_operation_color += "_dark"
|
|
package_color += "_dark"
|
|
|
|
if isinstance(operation, Install):
|
|
return (
|
|
f"<{base_tag}>Installing"
|
|
f" <{package_color}>{operation.package.name}</{package_color}>"
|
|
f" (<{operation_color}>{operation.package.full_pretty_version}</>)</>"
|
|
)
|
|
|
|
if isinstance(operation, Uninstall):
|
|
return (
|
|
f"<{base_tag}>Removing"
|
|
f" <{package_color}>{operation.package.name}</{package_color}>"
|
|
f" (<{operation_color}>{operation.package.full_pretty_version}</>)</>"
|
|
)
|
|
|
|
if isinstance(operation, Update):
|
|
initial_version = (initial_pkg := operation.initial_package).version
|
|
target_version = (target_pkg := operation.target_package).version
|
|
update_kind = (
|
|
"Updating" if target_version >= initial_version else "Downgrading"
|
|
)
|
|
return (
|
|
f"<{base_tag}>{update_kind}"
|
|
f" <{package_color}>{initial_pkg.name}</{package_color}> "
|
|
f"(<{source_operation_color}>"
|
|
f"{initial_pkg.full_pretty_version}"
|
|
f"</{source_operation_color}> -> <{operation_color}>"
|
|
f"{target_pkg.full_pretty_version}</>)</>"
|
|
)
|
|
return ""
|
|
|
|
def _display_summary(self, operations: list[Operation]) -> None:
|
|
installs = 0
|
|
updates = 0
|
|
uninstalls = 0
|
|
skipped = 0
|
|
for op in operations:
|
|
if op.skipped:
|
|
skipped += 1
|
|
continue
|
|
|
|
if op.job_type == "install":
|
|
installs += 1
|
|
elif op.job_type == "update":
|
|
updates += 1
|
|
elif op.job_type == "uninstall":
|
|
uninstalls += 1
|
|
|
|
if not installs and not updates and not uninstalls and not self._verbose:
|
|
self._io.write_line("")
|
|
self._io.write_line("No dependencies to install or update")
|
|
|
|
return
|
|
|
|
self._io.write_line("")
|
|
self._io.write("<b>Package operations</b>: ")
|
|
self._io.write(f"<info>{installs}</> install{pluralize(installs)}, ")
|
|
self._io.write(f"<info>{updates}</> update{pluralize(updates)}, ")
|
|
self._io.write(f"<info>{uninstalls}</> removal{pluralize(uninstalls)}")
|
|
if skipped and self._verbose:
|
|
self._io.write(f", <info>{skipped}</> skipped")
|
|
self._io.write_line("")
|
|
self._io.write_line("")
|
|
|
|
def _execute_install(self, operation: Install | Update) -> int:
|
|
status_code = self._install(operation)
|
|
|
|
self._save_url_reference(operation)
|
|
|
|
return status_code
|
|
|
|
def _execute_update(self, operation: Install | Update) -> int:
|
|
status_code = self._update(operation)
|
|
|
|
self._save_url_reference(operation)
|
|
|
|
return status_code
|
|
|
|
def _execute_uninstall(self, operation: Uninstall) -> int:
|
|
op_msg = self.get_operation_message(operation)
|
|
message = f" <fg=blue;options=bold>-</> {op_msg}: <info>Removing...</info>"
|
|
self._write(operation, message)
|
|
|
|
return self._remove(operation.package)
|
|
|
|
def _install(self, operation: Install | Update) -> int:
|
|
package = operation.package
|
|
if package.source_type == "directory" and not self._use_modern_installation:
|
|
return self._install_directory_without_wheel_installer(operation)
|
|
|
|
cleanup_archive: bool = False
|
|
if package.source_type == "git":
|
|
archive = self._prepare_git_archive(operation)
|
|
cleanup_archive = operation.package.develop
|
|
elif package.source_type == "file":
|
|
archive = self._prepare_archive(operation)
|
|
elif package.source_type == "directory":
|
|
archive = self._prepare_archive(operation)
|
|
cleanup_archive = True
|
|
elif package.source_type == "url":
|
|
assert package.source_url is not None
|
|
archive = self._download_link(operation, Link(package.source_url))
|
|
else:
|
|
archive = self._download(operation)
|
|
|
|
operation_message = self.get_operation_message(operation)
|
|
message = (
|
|
f" <fg=blue;options=bold>-</> {operation_message}:"
|
|
" <info>Installing...</info>"
|
|
)
|
|
self._write(operation, message)
|
|
|
|
if not self._use_modern_installation:
|
|
return self.pip_install(archive, upgrade=operation.job_type == "update")
|
|
|
|
try:
|
|
if operation.job_type == "update":
|
|
# Uninstall first
|
|
# TODO: Make an uninstaller and find a way to rollback in case
|
|
# the new package can't be installed
|
|
assert isinstance(operation, Update)
|
|
self._remove(operation.initial_package)
|
|
|
|
self._wheel_installer.install(archive)
|
|
finally:
|
|
if cleanup_archive:
|
|
archive.unlink()
|
|
|
|
return 0
|
|
|
|
def _update(self, operation: Install | Update) -> int:
|
|
return self._install(operation)
|
|
|
|
def _remove(self, package: Package) -> int:
|
|
# If we have a VCS package, remove its source directory
|
|
if package.source_type == "git":
|
|
src_dir = self._env.path / "src" / package.name
|
|
if src_dir.exists():
|
|
remove_directory(src_dir, force=True)
|
|
|
|
try:
|
|
return self.run_pip("uninstall", package.name, "-y")
|
|
except CalledProcessError as e:
|
|
if "not installed" in str(e):
|
|
return 0
|
|
|
|
raise
|
|
|
|
def _prepare_archive(
|
|
self, operation: Install | Update, *, output_dir: Path | None = None
|
|
) -> Path:
|
|
package = operation.package
|
|
operation_message = self.get_operation_message(operation)
|
|
|
|
message = (
|
|
f" <fg=blue;options=bold>-</> {operation_message}:"
|
|
" <info>Preparing...</info>"
|
|
)
|
|
self._write(operation, message)
|
|
|
|
assert package.source_url is not None
|
|
archive = Path(package.source_url)
|
|
if package.source_subdirectory:
|
|
archive = archive / package.source_subdirectory
|
|
if not Path(package.source_url).is_absolute() and package.root_dir:
|
|
archive = package.root_dir / archive
|
|
|
|
self._populate_hashes_dict(archive, package)
|
|
|
|
return self._chef.prepare(
|
|
archive, editable=package.develop, output_dir=output_dir
|
|
)
|
|
|
|
def _prepare_git_archive(self, operation: Install | Update) -> Path:
|
|
from poetry.vcs.git import Git
|
|
|
|
package = operation.package
|
|
assert package.source_url is not None
|
|
|
|
if package.source_resolved_reference and not package.develop:
|
|
# Only cache git archives when we know precise reference hash,
|
|
# otherwise we might get stale archives
|
|
cached_archive = self._artifact_cache.get_cached_archive_for_git(
|
|
package.source_url,
|
|
package.source_resolved_reference,
|
|
package.source_subdirectory,
|
|
env=self._env,
|
|
)
|
|
if cached_archive is not None:
|
|
return cached_archive
|
|
|
|
operation_message = self.get_operation_message(operation)
|
|
|
|
message = (
|
|
f" <fg=blue;options=bold>-</> {operation_message}: <info>Cloning...</info>"
|
|
)
|
|
self._write(operation, message)
|
|
|
|
source = Git.clone(
|
|
url=package.source_url,
|
|
source_root=self._env.path / "src",
|
|
revision=package.source_resolved_reference or package.source_reference,
|
|
)
|
|
|
|
# Now we just need to install from the source directory
|
|
original_url = package.source_url
|
|
package._source_url = str(source.path)
|
|
|
|
output_dir = None
|
|
if package.source_resolved_reference and not package.develop:
|
|
output_dir = self._artifact_cache.get_cache_directory_for_git(
|
|
original_url,
|
|
package.source_resolved_reference,
|
|
package.source_subdirectory,
|
|
)
|
|
|
|
archive = self._prepare_archive(operation, output_dir=output_dir)
|
|
if not package.develop:
|
|
package._source_url = original_url
|
|
|
|
if output_dir is not None and output_dir.is_dir():
|
|
# Mark directories with cached git packages, to distinguish from
|
|
# "normal" cache
|
|
(output_dir / ".created_from_git_dependency").touch()
|
|
|
|
return archive
|
|
|
|
def _install_directory_without_wheel_installer(
|
|
self, operation: Install | Update
|
|
) -> int:
|
|
from poetry.factory import Factory
|
|
from poetry.pyproject.toml import PyProjectTOML
|
|
|
|
package = operation.package
|
|
operation_message = self.get_operation_message(operation)
|
|
|
|
message = (
|
|
f" <fg=blue;options=bold>-</> {operation_message}:"
|
|
" <info>Building...</info>"
|
|
)
|
|
self._write(operation, message)
|
|
|
|
assert package.source_url is not None
|
|
if package.root_dir:
|
|
req = package.root_dir / package.source_url
|
|
else:
|
|
req = Path(package.source_url).resolve(strict=False)
|
|
|
|
if package.source_subdirectory:
|
|
req /= package.source_subdirectory
|
|
|
|
pyproject = PyProjectTOML(req / "pyproject.toml")
|
|
|
|
package_poetry = None
|
|
if pyproject.is_poetry_project():
|
|
with contextlib.suppress(RuntimeError):
|
|
package_poetry = Factory().create_poetry(pyproject.file.path.parent)
|
|
|
|
if package_poetry is not None:
|
|
builder: Builder
|
|
if package.develop and not package_poetry.package.build_script:
|
|
from poetry.masonry.builders.editable import EditableBuilder
|
|
|
|
# This is a Poetry package in editable mode
|
|
# we can use the EditableBuilder without going through pip
|
|
# to install it, unless it has a build script.
|
|
builder = EditableBuilder(package_poetry, self._env, NullIO())
|
|
builder.build()
|
|
|
|
return 0
|
|
|
|
if package_poetry.package.build_script:
|
|
from poetry.core.masonry.builders.sdist import SdistBuilder
|
|
|
|
builder = SdistBuilder(package_poetry)
|
|
with builder.setup_py():
|
|
return self.pip_install(req, upgrade=True, editable=package.develop)
|
|
|
|
return self.pip_install(req, upgrade=True, editable=package.develop)
|
|
|
|
def _download(self, operation: Install | Update) -> Path:
|
|
link = self._chooser.choose_for(operation.package)
|
|
|
|
if link.yanked:
|
|
# Store yanked warnings in a list and print after installing, so they can't
|
|
# be overlooked. Further, printing them in the concerning section would have
|
|
# the risk of overwriting the warning, so it is only briefly visible.
|
|
message = (
|
|
f"The file chosen for install of {operation.package.pretty_name} "
|
|
f"{operation.package.pretty_version} ({link.show_url}) is yanked."
|
|
)
|
|
if link.yanked_reason:
|
|
message += f" Reason for being yanked: {link.yanked_reason}"
|
|
self._yanked_warnings.append(message)
|
|
|
|
return self._download_link(operation, link)
|
|
|
|
def _download_link(self, operation: Install | Update, link: Link) -> Path:
|
|
package = operation.package
|
|
|
|
# Get original package for the link provided
|
|
download_func = functools.partial(self._download_archive, operation)
|
|
original_archive = self._artifact_cache.get_cached_archive_for_link(
|
|
link, strict=True, download_func=download_func
|
|
)
|
|
|
|
# Get potential higher prioritized cached archive, otherwise it will fall back
|
|
# to the original archive.
|
|
archive = self._artifact_cache.get_cached_archive_for_link(
|
|
link,
|
|
strict=False,
|
|
env=self._env,
|
|
)
|
|
if archive is None:
|
|
# Since we previously downloaded an archive, we now should have
|
|
# something cached that we can use here. The only case in which
|
|
# archive is None is if the original archive is not valid for the
|
|
# current environment.
|
|
raise RuntimeError(
|
|
f"Package {link.url} cannot be installed in the current environment"
|
|
f" {self._env.marker_env}"
|
|
)
|
|
|
|
if archive.suffix != ".whl":
|
|
message = (
|
|
f" <fg=blue;options=bold>-</> {self.get_operation_message(operation)}:"
|
|
" <info>Preparing...</info>"
|
|
)
|
|
self._write(operation, message)
|
|
|
|
archive = self._chef.prepare(archive, output_dir=original_archive.parent)
|
|
|
|
# Use the original archive to provide the correct hash.
|
|
self._populate_hashes_dict(original_archive, package)
|
|
|
|
return archive
|
|
|
|
def _populate_hashes_dict(self, archive: Path, package: Package) -> None:
|
|
if package.files and archive.name in {f["file"] for f in package.files}:
|
|
archive_hash = self._validate_archive_hash(archive, package)
|
|
self._hashes[package.name] = archive_hash
|
|
|
|
@staticmethod
|
|
def _validate_archive_hash(archive: Path, package: Package) -> str:
|
|
known_hashes = {f["hash"] for f in package.files if f["file"] == archive.name}
|
|
hash_types = {t.split(":")[0] for t in known_hashes}
|
|
hash_type = get_highest_priority_hash_type(hash_types, archive.name)
|
|
|
|
if hash_type is None:
|
|
raise RuntimeError(
|
|
f"No usable hash type(s) for {package} from archive"
|
|
f" {archive.name} found (known hashes: {known_hashes!s})"
|
|
)
|
|
|
|
archive_hash = f"{hash_type}:{get_file_hash(archive, hash_type)}"
|
|
|
|
if archive_hash not in known_hashes:
|
|
raise RuntimeError(
|
|
f"Hash for {package} from archive {archive.name} not found in"
|
|
f" known hashes (was: {archive_hash})"
|
|
)
|
|
|
|
return archive_hash
|
|
|
|
def _download_archive(
|
|
self,
|
|
operation: Install | Update,
|
|
url: str,
|
|
dest: Path,
|
|
) -> None:
|
|
downloader = Downloader(url, dest, self._authenticator)
|
|
wheel_size = downloader.total_size
|
|
|
|
operation_message = self.get_operation_message(operation)
|
|
message = (
|
|
f" <fg=blue;options=bold>-</> {operation_message}: <info>Downloading...</>"
|
|
)
|
|
progress = None
|
|
if self.supports_fancy_output():
|
|
if wheel_size is None:
|
|
self._write(operation, message)
|
|
else:
|
|
from cleo.ui.progress_bar import ProgressBar
|
|
|
|
progress = ProgressBar(
|
|
self._sections[id(operation)], max=int(wheel_size)
|
|
)
|
|
progress.set_format(message + " <b>%percent%%</b>")
|
|
|
|
if progress:
|
|
with self._lock:
|
|
self._sections[id(operation)].clear()
|
|
progress.start()
|
|
|
|
for fetched_size in downloader.download_with_progress(chunk_size=4096):
|
|
if progress:
|
|
with self._lock:
|
|
progress.set_progress(fetched_size)
|
|
|
|
if progress:
|
|
with self._lock:
|
|
progress.finish()
|
|
|
|
def _should_write_operation(self, operation: Operation) -> bool:
|
|
return (
|
|
not operation.skipped or self._dry_run or self._verbose or not self._enabled
|
|
)
|
|
|
|
def _save_url_reference(self, operation: Operation) -> None:
|
|
"""
|
|
Create and store a PEP-610 `direct_url.json` file, if needed.
|
|
"""
|
|
if operation.job_type not in {"install", "update"}:
|
|
return
|
|
|
|
package = operation.package
|
|
|
|
if not package.source_url or package.source_type == "legacy":
|
|
if not self._use_modern_installation:
|
|
# Since we are installing from our own distribution cache pip will write
|
|
# a `direct_url.json` file pointing to the cache distribution.
|
|
#
|
|
# That's not what we want, so we remove the direct_url.json file, if it
|
|
# exists.
|
|
for (
|
|
direct_url_json
|
|
) in self._env.site_packages.find_distribution_direct_url_json_files(
|
|
distribution_name=package.name, writable_only=True
|
|
):
|
|
direct_url_json.unlink(missing_ok=True)
|
|
return
|
|
|
|
url_reference: dict[str, Any] | None = None
|
|
|
|
if package.source_type == "git" and not package.develop:
|
|
url_reference = self._create_git_url_reference(package)
|
|
elif package.source_type in ("directory", "git"):
|
|
url_reference = self._create_directory_url_reference(package)
|
|
elif package.source_type == "url":
|
|
url_reference = self._create_url_url_reference(package)
|
|
elif package.source_type == "file":
|
|
url_reference = self._create_file_url_reference(package)
|
|
|
|
if url_reference:
|
|
for dist in self._env.site_packages.distributions(
|
|
name=package.name, writable_only=True
|
|
):
|
|
dist_path = dist._path # type: ignore[attr-defined]
|
|
assert isinstance(dist_path, Path)
|
|
url = dist_path / "direct_url.json"
|
|
url.write_text(json.dumps(url_reference), encoding="utf-8")
|
|
|
|
record = dist_path / "RECORD"
|
|
if record.exists():
|
|
with record.open(mode="a", encoding="utf-8", newline="") as f:
|
|
writer = csv.writer(f)
|
|
path = url.relative_to(record.parent.parent)
|
|
writer.writerow([str(path), "", ""])
|
|
|
|
def _create_git_url_reference(self, package: Package) -> dict[str, Any]:
|
|
reference = {
|
|
"url": package.source_url,
|
|
"vcs_info": {
|
|
"vcs": "git",
|
|
"requested_revision": package.source_reference,
|
|
"commit_id": package.source_resolved_reference,
|
|
},
|
|
}
|
|
if package.source_subdirectory:
|
|
reference["subdirectory"] = package.source_subdirectory
|
|
|
|
return reference
|
|
|
|
def _create_url_url_reference(self, package: Package) -> dict[str, Any]:
|
|
archive_info = self._get_archive_info(package)
|
|
|
|
return {"url": package.source_url, "archive_info": archive_info}
|
|
|
|
def _create_file_url_reference(self, package: Package) -> dict[str, Any]:
|
|
archive_info = self._get_archive_info(package)
|
|
|
|
assert package.source_url is not None
|
|
return {
|
|
"url": Path(package.source_url).as_uri(),
|
|
"archive_info": archive_info,
|
|
}
|
|
|
|
def _create_directory_url_reference(self, package: Package) -> dict[str, Any]:
|
|
dir_info = {}
|
|
|
|
if package.develop:
|
|
dir_info["editable"] = True
|
|
|
|
assert package.source_url is not None
|
|
return {
|
|
"url": Path(package.source_url).as_uri(),
|
|
"dir_info": dir_info,
|
|
}
|
|
|
|
def _get_archive_info(self, package: Package) -> dict[str, Any]:
|
|
"""
|
|
Create dictionary `archive_info` for file `direct_url.json`.
|
|
|
|
Specification: https://packaging.python.org/en/latest/specifications/direct-url
|
|
(it supersedes PEP 610)
|
|
|
|
:param package: This must be a poetry package instance.
|
|
"""
|
|
archive_info = {}
|
|
|
|
if package.name in self._hashes:
|
|
algorithm, value = self._hashes[package.name].split(":")
|
|
archive_info["hashes"] = {algorithm: value}
|
|
|
|
return archive_info
|