355 lines
12 KiB
Python
355 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import io
|
|
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
from poetry.core.masonry.metadata import Metadata
|
|
from poetry.core.masonry.utils.helpers import distribution_name
|
|
from requests.exceptions import ConnectionError
|
|
from requests.exceptions import HTTPError
|
|
from requests_toolbelt import user_agent
|
|
from requests_toolbelt.multipart import MultipartEncoder
|
|
from requests_toolbelt.multipart import MultipartEncoderMonitor
|
|
|
|
from poetry.__version__ import __version__
|
|
from poetry.utils.constants import REQUESTS_TIMEOUT
|
|
from poetry.utils.patterns import wheel_file_re
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from cleo.io.io import IO
|
|
|
|
from poetry.poetry import Poetry
|
|
|
|
|
|
class UploadError(Exception):
|
|
def __init__(self, error: ConnectionError | HTTPError | str) -> None:
|
|
if isinstance(error, HTTPError):
|
|
if error.response is None:
|
|
message = "HTTP Error connecting to the repository"
|
|
else:
|
|
message = (
|
|
f"HTTP Error {error.response.status_code}: "
|
|
f"{error.response.reason} | {error.response.content!r}"
|
|
)
|
|
elif isinstance(error, ConnectionError):
|
|
message = (
|
|
"Connection Error: We were unable to connect to the repository, "
|
|
"ensure the url is correct and can be reached."
|
|
)
|
|
else:
|
|
message = error
|
|
super().__init__(message)
|
|
|
|
|
|
class Uploader:
|
|
def __init__(self, poetry: Poetry, io: IO, dist_dir: Path | None = None) -> None:
|
|
self._poetry = poetry
|
|
self._package = poetry.package
|
|
self._io = io
|
|
self._dist_dir = dist_dir or self.default_dist_dir
|
|
self._username: str | None = None
|
|
self._password: str | None = None
|
|
|
|
@property
|
|
def user_agent(self) -> str:
|
|
agent: str = user_agent("poetry", __version__)
|
|
return agent
|
|
|
|
@property
|
|
def default_dist_dir(self) -> Path:
|
|
return self._poetry.file.path.parent / "dist"
|
|
|
|
@property
|
|
def dist_dir(self) -> Path:
|
|
if not self._dist_dir.is_absolute():
|
|
return self._poetry.file.path.parent / self._dist_dir
|
|
|
|
return self._dist_dir
|
|
|
|
@property
|
|
def files(self) -> list[Path]:
|
|
dist = self.dist_dir
|
|
version = self._package.version.to_string()
|
|
escaped_name = distribution_name(self._package.name)
|
|
|
|
wheels = list(dist.glob(f"{escaped_name}-{version}-*.whl"))
|
|
tars = list(dist.glob(f"{escaped_name}-{version}.tar.gz"))
|
|
|
|
return sorted(wheels + tars)
|
|
|
|
def auth(self, username: str | None, password: str | None) -> None:
|
|
self._username = username
|
|
self._password = password
|
|
|
|
def make_session(self) -> requests.Session:
|
|
session = requests.Session()
|
|
auth = self.get_auth()
|
|
if auth is not None:
|
|
session.auth = auth
|
|
|
|
session.headers["User-Agent"] = self.user_agent
|
|
return session
|
|
|
|
def get_auth(self) -> tuple[str, str] | None:
|
|
if self._username is None or self._password is None:
|
|
return None
|
|
|
|
return (self._username, self._password)
|
|
|
|
def upload(
|
|
self,
|
|
url: str,
|
|
cert: Path | bool = True,
|
|
client_cert: Path | None = None,
|
|
dry_run: bool = False,
|
|
skip_existing: bool = False,
|
|
) -> None:
|
|
session = self.make_session()
|
|
|
|
session.verify = str(cert) if isinstance(cert, Path) else cert
|
|
|
|
if client_cert:
|
|
session.cert = str(client_cert)
|
|
|
|
with session:
|
|
self._upload(session, url, dry_run, skip_existing)
|
|
|
|
def post_data(self, file: Path) -> dict[str, Any]:
|
|
meta = Metadata.from_package(self._package)
|
|
|
|
file_type = self._get_type(file)
|
|
|
|
blake2_256_hash = hashlib.blake2b(digest_size=256 // 8)
|
|
|
|
md5_hash = hashlib.md5()
|
|
sha256_hash = hashlib.sha256()
|
|
with file.open("rb") as fp:
|
|
for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""):
|
|
md5_hash.update(content)
|
|
sha256_hash.update(content)
|
|
blake2_256_hash.update(content)
|
|
|
|
md5_digest = md5_hash.hexdigest()
|
|
sha2_digest = sha256_hash.hexdigest()
|
|
blake2_256_digest = blake2_256_hash.hexdigest()
|
|
|
|
py_version: str | None = None
|
|
if file_type == "bdist_wheel":
|
|
wheel_info = wheel_file_re.match(file.name)
|
|
if wheel_info is not None:
|
|
py_version = wheel_info.group("pyver")
|
|
|
|
data = {
|
|
# identify release
|
|
"name": meta.name,
|
|
"version": meta.version,
|
|
# file content
|
|
"filetype": file_type,
|
|
"pyversion": py_version,
|
|
# additional meta-data
|
|
"metadata_version": meta.metadata_version,
|
|
"summary": meta.summary,
|
|
"home_page": meta.home_page,
|
|
"author": meta.author,
|
|
"author_email": meta.author_email,
|
|
"maintainer": meta.maintainer,
|
|
"maintainer_email": meta.maintainer_email,
|
|
"license": meta.license,
|
|
"description": meta.description,
|
|
"keywords": meta.keywords,
|
|
"platform": meta.platforms,
|
|
"classifiers": meta.classifiers,
|
|
"download_url": meta.download_url,
|
|
"supported_platform": meta.supported_platforms,
|
|
"comment": None,
|
|
"md5_digest": md5_digest,
|
|
"sha256_digest": sha2_digest,
|
|
"blake2_256_digest": blake2_256_digest,
|
|
# PEP 314
|
|
"provides": meta.provides,
|
|
"requires": meta.requires,
|
|
"obsoletes": meta.obsoletes,
|
|
# Metadata 1.2
|
|
"project_urls": meta.project_urls,
|
|
"provides_dist": meta.provides_dist,
|
|
"obsoletes_dist": meta.obsoletes_dist,
|
|
"requires_dist": meta.requires_dist,
|
|
"requires_external": meta.requires_external,
|
|
"requires_python": meta.requires_python,
|
|
}
|
|
|
|
# Metadata 2.1
|
|
if meta.description_content_type:
|
|
data["description_content_type"] = meta.description_content_type
|
|
|
|
# TODO: Provides extra
|
|
|
|
return data
|
|
|
|
def _upload(
|
|
self,
|
|
session: requests.Session,
|
|
url: str,
|
|
dry_run: bool = False,
|
|
skip_existing: bool = False,
|
|
) -> None:
|
|
for file in self.files:
|
|
self._upload_file(session, url, file, dry_run, skip_existing)
|
|
|
|
def _upload_file(
|
|
self,
|
|
session: requests.Session,
|
|
url: str,
|
|
file: Path,
|
|
dry_run: bool = False,
|
|
skip_existing: bool = False,
|
|
) -> None:
|
|
from cleo.ui.progress_bar import ProgressBar
|
|
|
|
if not file.is_file():
|
|
raise UploadError(f"Archive ({file}) does not exist")
|
|
|
|
data = self.post_data(file)
|
|
data.update(
|
|
{
|
|
# action
|
|
":action": "file_upload",
|
|
"protocol_version": "1",
|
|
}
|
|
)
|
|
|
|
data_to_send: list[tuple[str, Any]] = self._prepare_data(data)
|
|
|
|
with file.open("rb") as fp:
|
|
data_to_send.append(
|
|
("content", (file.name, fp, "application/octet-stream"))
|
|
)
|
|
encoder = MultipartEncoder(data_to_send)
|
|
bar = ProgressBar(self._io, max=encoder.len)
|
|
bar.set_format(f" - Uploading <c1>{file.name}</c1> <b>%percent%%</b>")
|
|
monitor = MultipartEncoderMonitor(
|
|
encoder, lambda monitor: bar.set_progress(monitor.bytes_read)
|
|
)
|
|
|
|
bar.start()
|
|
|
|
resp = None
|
|
|
|
try:
|
|
if not dry_run:
|
|
resp = session.post(
|
|
url,
|
|
data=monitor,
|
|
allow_redirects=False,
|
|
headers={"Content-Type": monitor.content_type},
|
|
timeout=REQUESTS_TIMEOUT,
|
|
)
|
|
if resp is None or 200 <= resp.status_code < 300:
|
|
bar.set_format(
|
|
f" - Uploading <c1>{file.name}</c1> <fg=green>%percent%%</>"
|
|
)
|
|
bar.finish()
|
|
elif 300 <= resp.status_code < 400:
|
|
if self._io.output.is_decorated():
|
|
self._io.overwrite(
|
|
f" - Uploading <c1>{file.name}</c1> <error>FAILED</>"
|
|
)
|
|
raise UploadError(
|
|
"Redirects are not supported. "
|
|
"Is the URL missing a trailing slash?"
|
|
)
|
|
elif resp.status_code == 400 and "was ever registered" in resp.text:
|
|
self._register(session, url)
|
|
resp.raise_for_status()
|
|
elif skip_existing and self._is_file_exists_error(resp):
|
|
bar.set_format(
|
|
f" - Uploading <c1>{file.name}</c1> <warning>File exists."
|
|
" Skipping</>"
|
|
)
|
|
bar.display()
|
|
else:
|
|
resp.raise_for_status()
|
|
except (requests.ConnectionError, requests.HTTPError) as e:
|
|
if self._io.output.is_decorated():
|
|
self._io.overwrite(
|
|
f" - Uploading <c1>{file.name}</c1> <error>FAILED</>"
|
|
)
|
|
raise UploadError(e)
|
|
finally:
|
|
self._io.write_line("")
|
|
|
|
def _register(self, session: requests.Session, url: str) -> requests.Response:
|
|
"""
|
|
Register a package to a repository.
|
|
"""
|
|
dist = self.dist_dir
|
|
escaped_name = distribution_name(self._package.name)
|
|
file = dist / f"{escaped_name}-{self._package.version.to_string()}.tar.gz"
|
|
|
|
if not file.exists():
|
|
raise RuntimeError(f'"{file.name}" does not exist.')
|
|
|
|
data = self.post_data(file)
|
|
data.update({":action": "submit", "protocol_version": "1"})
|
|
|
|
data_to_send = self._prepare_data(data)
|
|
encoder = MultipartEncoder(data_to_send)
|
|
resp = session.post(
|
|
url,
|
|
data=encoder,
|
|
allow_redirects=False,
|
|
headers={"Content-Type": encoder.content_type},
|
|
timeout=REQUESTS_TIMEOUT,
|
|
)
|
|
|
|
resp.raise_for_status()
|
|
|
|
return resp
|
|
|
|
def _prepare_data(self, data: dict[str, Any]) -> list[tuple[str, str]]:
|
|
data_to_send = []
|
|
for key, value in data.items():
|
|
if not isinstance(value, (list, tuple)):
|
|
data_to_send.append((key, value))
|
|
else:
|
|
for item in value:
|
|
data_to_send.append((key, item))
|
|
|
|
return data_to_send
|
|
|
|
def _get_type(self, file: Path) -> str:
|
|
exts = file.suffixes
|
|
if exts[-1] == ".whl":
|
|
return "bdist_wheel"
|
|
elif len(exts) >= 2 and "".join(exts[-2:]) == ".tar.gz":
|
|
return "sdist"
|
|
|
|
raise ValueError("Unknown distribution format " + "".join(exts))
|
|
|
|
def _is_file_exists_error(self, response: requests.Response) -> bool:
|
|
# based on https://github.com/pypa/twine/blob/a6dd69c79f7b5abfb79022092a5d3776a499e31b/twine/commands/upload.py#L32
|
|
status = response.status_code
|
|
reason = response.reason.lower()
|
|
text = response.text.lower()
|
|
reason_and_text = reason + text
|
|
|
|
return (
|
|
# pypiserver (https://pypi.org/project/pypiserver)
|
|
status == 409
|
|
# PyPI / TestPyPI / GCP Artifact Registry
|
|
or (status == 400 and "already exist" in reason_and_text)
|
|
# Nexus Repository OSS (https://www.sonatype.com/nexus-repository-oss)
|
|
or (status == 400 and "updating asset" in reason_and_text)
|
|
# Artifactory (https://jfrog.com/artifactory/)
|
|
or (status == 403 and "overwrite artifact" in reason_and_text)
|
|
# Gitlab Enterprise Edition (https://about.gitlab.com)
|
|
or (status == 400 and "already been taken" in reason_and_text)
|
|
)
|