poetry/src/poetry/packages/locker.py

502 lines
18 KiB
Python

from __future__ import annotations
import json
import logging
import os
import re
from hashlib import sha256
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from typing import cast
from packaging.utils import canonicalize_name
from poetry.core.constraints.version import Version
from poetry.core.constraints.version import parse_constraint
from poetry.core.packages.dependency import Dependency
from poetry.core.packages.package import Package
from poetry.core.version.markers import parse_marker
from poetry.core.version.requirements import InvalidRequirement
from tomlkit import array
from tomlkit import comment
from tomlkit import document
from tomlkit import inline_table
from tomlkit import table
from poetry.__version__ import __version__
from poetry.toml.file import TOMLFile
from poetry.utils._compat import tomllib
if TYPE_CHECKING:
from packaging.utils import NormalizedName
from poetry.core.packages.directory_dependency import DirectoryDependency
from poetry.core.packages.file_dependency import FileDependency
from poetry.core.packages.url_dependency import URLDependency
from poetry.core.packages.vcs_dependency import VCSDependency
from tomlkit.toml_document import TOMLDocument
from poetry.repositories.lockfile_repository import LockfileRepository
logger = logging.getLogger(__name__)
_GENERATED_IDENTIFIER = "@" + "generated"
GENERATED_COMMENT = (
f"This file is automatically {_GENERATED_IDENTIFIER} by Poetry"
f" {__version__} and should not be changed by hand."
)
class Locker:
_VERSION = "2.0"
_READ_VERSION_RANGE = ">=1,<3"
_legacy_keys: ClassVar[list[str]] = [
"dependencies",
"source",
"extras",
"dev-dependencies",
]
_relevant_keys: ClassVar[list[str]] = [*_legacy_keys, "group"]
def __init__(self, lock: Path, local_config: dict[str, Any]) -> None:
self._lock = lock
self._local_config = local_config
self._lock_data: dict[str, Any] | None = None
self._content_hash = self._get_content_hash()
@property
def lock(self) -> Path:
return self._lock
@property
def lock_data(self) -> dict[str, Any]:
if self._lock_data is None:
self._lock_data = self._get_lock_data()
return self._lock_data
def is_locked(self) -> bool:
"""
Checks whether the locker has been locked (lockfile found).
"""
return self._lock.exists()
def is_fresh(self) -> bool:
"""
Checks whether the lock file is still up to date with the current hash.
"""
with self.lock.open("rb") as f:
lock = tomllib.load(f)
metadata = lock.get("metadata", {})
if "content-hash" in metadata:
fresh: bool = self._content_hash == metadata["content-hash"]
return fresh
return False
def set_local_config(self, local_config: dict[str, Any]) -> None:
self._local_config = local_config
self._content_hash = self._get_content_hash()
def locked_repository(self) -> LockfileRepository:
"""
Searches and returns a repository of locked packages.
"""
from poetry.factory import Factory
from poetry.repositories.lockfile_repository import LockfileRepository
repository = LockfileRepository()
if not self.is_locked():
return repository
lock_data = self.lock_data
locked_packages = cast("list[dict[str, Any]]", lock_data["package"])
if not locked_packages:
return repository
for info in locked_packages:
source = info.get("source", {})
source_type = source.get("type")
url = source.get("url")
if source_type in ["directory", "file"]:
url = self.lock.parent.joinpath(url).resolve().as_posix()
name = info["name"]
package = Package(
name,
info["version"],
source_type=source_type,
source_url=url,
source_reference=source.get("reference"),
source_resolved_reference=source.get("resolved_reference"),
source_subdirectory=source.get("subdirectory"),
)
package.description = info.get("description", "")
package.optional = info["optional"]
metadata = cast("dict[str, Any]", lock_data["metadata"])
# Storing of package files and hashes has been through a few generations in
# the lockfile, we can read them all:
#
# - latest and preferred is that this is read per package, from
# package.files
# - oldest is that hashes were stored in metadata.hashes without filenames
# - in between those two, hashes were stored alongside filenames in
# metadata.files
package_files = info.get("files")
if package_files is not None:
package.files = package_files
elif "hashes" in metadata:
hashes = cast("dict[str, Any]", metadata["hashes"])
package.files = [{"name": h, "hash": h} for h in hashes[name]]
elif source_type in {"git", "directory", "url"}:
package.files = []
else:
files = metadata["files"][name]
if source_type == "file":
filename = Path(url).name
package.files = [item for item in files if item["file"] == filename]
else:
# Strictly speaking, this is not correct, but we have no chance
# to always determine which are the correct files because the
# lockfile doesn't keep track which files belong to which package.
package.files = files
package.python_versions = info["python-versions"]
package_extras: dict[NormalizedName, list[Dependency]] = {}
extras = info.get("extras", {})
if extras:
for name, deps in extras.items():
name = canonicalize_name(name)
package_extras[name] = []
for dep in deps:
try:
dependency = Dependency.create_from_pep_508(dep)
except InvalidRequirement:
# handle lock files with invalid PEP 508
m = re.match(r"^(.+?)(?:\[(.+?)])?(?:\s+\((.+)\))?$", dep)
if not m:
raise
dep_name = m.group(1)
extras = m.group(2) or ""
constraint = m.group(3) or "*"
dependency = Dependency(
dep_name, constraint, extras=extras.split(",")
)
package_extras[name].append(dependency)
package.extras = package_extras
if "marker" in info:
package.marker = parse_marker(info["marker"])
else:
# Compatibility for old locks
if "requirements" in info:
dep = Dependency("foo", "0.0.0")
for name, value in info["requirements"].items():
if name == "python":
dep.python_versions = value
elif name == "platform":
dep.platform = value
split_dep = dep.to_pep_508(False).split(";")
if len(split_dep) > 1:
package.marker = parse_marker(split_dep[1].strip())
for dep_name, constraint in info.get("dependencies", {}).items():
root_dir = self.lock.parent
if package.source_type == "directory":
# root dir should be the source of the package relative to the lock
# path
assert package.source_url is not None
root_dir = Path(package.source_url)
if isinstance(constraint, list):
for c in constraint:
package.add_dependency(
Factory.create_dependency(dep_name, c, root_dir=root_dir)
)
continue
package.add_dependency(
Factory.create_dependency(dep_name, constraint, root_dir=root_dir)
)
if "develop" in info:
package.develop = info["develop"]
repository.add_package(package)
return repository
def set_lock_data(self, root: Package, packages: list[Package]) -> bool:
"""Store lock data and eventually persist to the lock file"""
lock = self._compute_lock_data(root, packages)
if self._should_write(lock):
self._write_lock_data(lock)
return True
return False
def _compute_lock_data(
self, root: Package, packages: list[Package]
) -> TOMLDocument:
package_specs = self._lock_packages(packages)
# Retrieving hashes
for package in package_specs:
files = array()
for f in package["files"]:
file_metadata = inline_table()
for k, v in sorted(f.items()):
file_metadata[k] = v
files.append(file_metadata)
package["files"] = files.multiline(True)
lock = document()
lock.add(comment(GENERATED_COMMENT))
lock["package"] = package_specs
if root.extras:
lock["extras"] = {
extra: sorted(dep.pretty_name for dep in deps)
for extra, deps in sorted(root.extras.items())
}
lock["metadata"] = {
"lock-version": self._VERSION,
"python-versions": root.python_versions,
"content-hash": self._content_hash,
}
return lock
def _should_write(self, lock: TOMLDocument) -> bool:
# if lock file exists: compare with existing lock data
do_write = True
if self.is_locked():
try:
lock_data = self.lock_data
except RuntimeError:
# incompatible, invalid or no lock file
pass
else:
do_write = lock != lock_data
return do_write
def _write_lock_data(self, data: TOMLDocument) -> None:
lockfile = TOMLFile(self.lock)
lockfile.write(data)
self._lock_data = None
def _get_content_hash(self) -> str:
"""
Returns the sha256 hash of the sorted content of the pyproject file.
"""
content = self._local_config
relevant_content = {}
for key in self._relevant_keys:
data = content.get(key)
if data is None and key not in self._legacy_keys:
continue
relevant_content[key] = data
return sha256(json.dumps(relevant_content, sort_keys=True).encode()).hexdigest()
def _get_lock_data(self) -> dict[str, Any]:
if not self.lock.exists():
raise RuntimeError("No lockfile found. Unable to read locked packages")
with self.lock.open("rb") as f:
try:
lock_data = tomllib.load(f)
except tomllib.TOMLDecodeError as e:
raise RuntimeError(f"Unable to read the lock file ({e}).")
# if the lockfile doesn't contain a metadata section at all,
# it probably needs to be rebuilt completely
if "metadata" not in lock_data:
raise RuntimeError(
"The lock file does not have a metadata entry.\n"
"Regenerate the lock file with the `poetry lock` command."
)
metadata = lock_data["metadata"]
lock_version = Version.parse(metadata.get("lock-version", "1.0"))
current_version = Version.parse(self._VERSION)
accepted_versions = parse_constraint(self._READ_VERSION_RANGE)
lock_version_allowed = accepted_versions.allows(lock_version)
if lock_version_allowed and current_version < lock_version:
logger.warning(
"The lock file might not be compatible with the current version of"
" Poetry.\nUpgrade Poetry to ensure the lock file is read properly or,"
" alternatively, regenerate the lock file with the `poetry lock`"
" command."
)
elif not lock_version_allowed:
raise RuntimeError(
"The lock file is not compatible with the current version of Poetry.\n"
"Upgrade Poetry to be able to read the lock file or, alternatively, "
"regenerate the lock file with the `poetry lock` command."
)
return lock_data
def _lock_packages(self, packages: list[Package]) -> list[dict[str, Any]]:
locked = []
for package in sorted(
packages,
key=lambda x: (
x.name,
x.version,
x.source_type or "",
x.source_url or "",
x.source_subdirectory or "",
x.source_reference or "",
x.source_resolved_reference or "",
),
):
spec = self._dump_package(package)
locked.append(spec)
return locked
def _dump_package(self, package: Package) -> dict[str, Any]:
dependencies: dict[str, list[Any]] = {}
for dependency in sorted(
package.requires,
key=lambda d: d.name,
):
dependencies.setdefault(dependency.pretty_name, [])
constraint = inline_table()
if dependency.is_directory():
dependency = cast("DirectoryDependency", dependency)
constraint["path"] = dependency.path.as_posix()
if dependency.develop:
constraint["develop"] = True
elif dependency.is_file():
dependency = cast("FileDependency", dependency)
constraint["path"] = dependency.path.as_posix()
elif dependency.is_url():
dependency = cast("URLDependency", dependency)
constraint["url"] = dependency.url
elif dependency.is_vcs():
dependency = cast("VCSDependency", dependency)
constraint[dependency.vcs] = dependency.source
if dependency.branch:
constraint["branch"] = dependency.branch
elif dependency.tag:
constraint["tag"] = dependency.tag
elif dependency.rev:
constraint["rev"] = dependency.rev
if dependency.directory:
constraint["subdirectory"] = dependency.directory
else:
constraint["version"] = str(dependency.pretty_constraint)
if dependency.extras:
constraint["extras"] = sorted(dependency.extras)
if dependency.is_optional():
constraint["optional"] = True
if not dependency.marker.is_any():
constraint["markers"] = str(dependency.marker)
dependencies[dependency.pretty_name].append(constraint)
# All the constraints should have the same type,
# but we want to simplify them if it's possible
for dependency_name, constraints in dependencies.items():
if all(
len(constraint) == 1 and "version" in constraint
for constraint in constraints
):
dependencies[dependency_name] = [
constraint["version"] for constraint in constraints
]
data: dict[str, Any] = {
"name": package.pretty_name,
"version": package.pretty_version,
"description": package.description or "",
"optional": package.optional,
"python-versions": package.python_versions,
"files": sorted(package.files, key=lambda x: x["file"]),
}
if dependencies:
data["dependencies"] = table()
for k, constraints in dependencies.items():
if len(constraints) == 1:
data["dependencies"][k] = constraints[0]
else:
data["dependencies"][k] = array().multiline(True)
for constraint in constraints:
data["dependencies"][k].append(constraint)
if package.extras:
extras = {}
for name, deps in sorted(package.extras.items()):
extras[name] = sorted(dep.base_pep_508_name for dep in deps)
data["extras"] = extras
if package.source_url:
url = package.source_url
if package.source_type in ["file", "directory"]:
# The lock file should only store paths relative to the root project
url = Path(
os.path.relpath(
Path(url).resolve(),
Path(self.lock.parent).resolve(),
)
).as_posix()
data["source"] = {}
if package.source_type:
data["source"]["type"] = package.source_type
data["source"]["url"] = url
if package.source_reference:
data["source"]["reference"] = package.source_reference
if package.source_resolved_reference:
data["source"]["resolved_reference"] = package.source_resolved_reference
if package.source_subdirectory:
data["source"]["subdirectory"] = package.source_subdirectory
if package.source_type in ["directory", "git"]:
data["develop"] = package.develop
return data