459 lines
16 KiB
Python
459 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import dataclasses
|
|
import functools
|
|
import logging
|
|
import time
|
|
import urllib.parse
|
|
|
|
from os.path import commonprefix
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
from typing import Any
|
|
|
|
import requests
|
|
import requests.adapters
|
|
import requests.auth
|
|
import requests.exceptions
|
|
|
|
from cachecontrol import CacheControlAdapter
|
|
from cachecontrol.caches import FileCache
|
|
from requests_toolbelt import user_agent
|
|
|
|
from poetry.__version__ import __version__
|
|
from poetry.config.config import Config
|
|
from poetry.exceptions import PoetryException
|
|
from poetry.utils.constants import REQUESTS_TIMEOUT
|
|
from poetry.utils.constants import RETRY_AFTER_HEADER
|
|
from poetry.utils.constants import STATUS_FORCELIST
|
|
from poetry.utils.password_manager import HTTPAuthCredential
|
|
from poetry.utils.password_manager import PasswordManager
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from cleo.io.io import IO
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class RepositoryCertificateConfig:
|
|
cert: Path | None = dataclasses.field(default=None)
|
|
client_cert: Path | None = dataclasses.field(default=None)
|
|
verify: bool = dataclasses.field(default=True)
|
|
|
|
@classmethod
|
|
def create(
|
|
cls, repository: str, config: Config | None
|
|
) -> RepositoryCertificateConfig:
|
|
config = config if config else Config.create()
|
|
|
|
verify: str | bool = config.get(
|
|
f"certificates.{repository}.verify",
|
|
config.get(f"certificates.{repository}.cert", True),
|
|
)
|
|
client_cert: str = config.get(f"certificates.{repository}.client-cert")
|
|
|
|
return cls(
|
|
cert=Path(verify) if isinstance(verify, str) else None,
|
|
client_cert=Path(client_cert) if client_cert else None,
|
|
verify=verify if isinstance(verify, bool) else True,
|
|
)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class AuthenticatorRepositoryConfig:
|
|
name: str
|
|
url: str
|
|
netloc: str = dataclasses.field(init=False)
|
|
path: str = dataclasses.field(init=False)
|
|
|
|
def __post_init__(self) -> None:
|
|
parsed_url = urllib.parse.urlsplit(self.url)
|
|
self.netloc = parsed_url.netloc
|
|
self.path = parsed_url.path
|
|
|
|
def certs(self, config: Config) -> RepositoryCertificateConfig:
|
|
return RepositoryCertificateConfig.create(self.name, config)
|
|
|
|
@property
|
|
def http_credential_keys(self) -> list[str]:
|
|
return [self.url, self.netloc, self.name]
|
|
|
|
def get_http_credentials(
|
|
self, password_manager: PasswordManager, username: str | None = None
|
|
) -> HTTPAuthCredential:
|
|
# try with the repository name via the password manager
|
|
credential = HTTPAuthCredential(
|
|
**(password_manager.get_http_auth(self.name) or {})
|
|
)
|
|
|
|
if credential.password is not None:
|
|
return credential
|
|
|
|
if password_manager.use_keyring:
|
|
# fallback to url and netloc based keyring entries
|
|
credential = password_manager.get_credential(
|
|
self.url, self.netloc, username=credential.username
|
|
)
|
|
|
|
return credential
|
|
|
|
|
|
class Authenticator:
|
|
def __init__(
|
|
self,
|
|
config: Config | None = None,
|
|
io: IO | None = None,
|
|
cache_id: str | None = None,
|
|
disable_cache: bool = False,
|
|
pool_size: int = requests.adapters.DEFAULT_POOLSIZE,
|
|
) -> None:
|
|
self._config = config or Config.create()
|
|
self._io = io
|
|
self._sessions_for_netloc: dict[str, requests.Session] = {}
|
|
self._credentials: dict[str, HTTPAuthCredential] = {}
|
|
self._certs: dict[str, RepositoryCertificateConfig] = {}
|
|
self._configured_repositories: (
|
|
dict[str, AuthenticatorRepositoryConfig] | None
|
|
) = None
|
|
self._password_manager = PasswordManager(self._config)
|
|
self._cache_control = (
|
|
FileCache(
|
|
self._config.repository_cache_directory
|
|
/ (cache_id or "_default_cache")
|
|
/ "_http"
|
|
)
|
|
if not disable_cache
|
|
else None
|
|
)
|
|
self.get_repository_config_for_url = functools.lru_cache(maxsize=None)(
|
|
self._get_repository_config_for_url
|
|
)
|
|
self._pool_size = pool_size
|
|
self._user_agent = user_agent("poetry", __version__)
|
|
|
|
def create_session(self) -> requests.Session:
|
|
session = requests.Session()
|
|
session.headers["User-Agent"] = self._user_agent
|
|
|
|
if self._cache_control is None:
|
|
return session
|
|
|
|
adapter = CacheControlAdapter(
|
|
cache=self._cache_control,
|
|
pool_maxsize=self._pool_size,
|
|
)
|
|
session.mount("http://", adapter)
|
|
session.mount("https://", adapter)
|
|
|
|
return session
|
|
|
|
def get_session(self, url: str | None = None) -> requests.Session:
|
|
if not url:
|
|
return self.create_session()
|
|
|
|
parsed_url = urllib.parse.urlsplit(url)
|
|
netloc = parsed_url.netloc
|
|
|
|
if netloc not in self._sessions_for_netloc:
|
|
logger.debug("Creating new session for %s", netloc)
|
|
self._sessions_for_netloc[netloc] = self.create_session()
|
|
|
|
return self._sessions_for_netloc[netloc]
|
|
|
|
def close(self) -> None:
|
|
for session in self._sessions_for_netloc.values():
|
|
if session is not None:
|
|
with contextlib.suppress(AttributeError):
|
|
session.close()
|
|
|
|
def __del__(self) -> None:
|
|
self.close()
|
|
|
|
def delete_cache(self, url: str) -> None:
|
|
if self._cache_control is not None:
|
|
self._cache_control.delete(key=url)
|
|
|
|
def authenticated_url(self, url: str) -> str:
|
|
parsed = urllib.parse.urlparse(url)
|
|
credential = self.get_credentials_for_url(url)
|
|
|
|
if credential.username is not None and credential.password is not None:
|
|
username = urllib.parse.quote(credential.username, safe="")
|
|
password = urllib.parse.quote(credential.password, safe="")
|
|
|
|
return (
|
|
f"{parsed.scheme}://{username}:{password}@{parsed.netloc}{parsed.path}"
|
|
)
|
|
|
|
return url
|
|
|
|
def request(
|
|
self, method: str, url: str, raise_for_status: bool = True, **kwargs: Any
|
|
) -> requests.Response:
|
|
headers = kwargs.get("headers")
|
|
request = requests.Request(method, url, headers=headers)
|
|
credential = self.get_credentials_for_url(url)
|
|
|
|
if credential.username is not None or credential.password is not None:
|
|
request = requests.auth.HTTPBasicAuth(
|
|
credential.username or "", credential.password or ""
|
|
)(request)
|
|
|
|
session = self.get_session(url=url)
|
|
prepared_request = session.prepare_request(request)
|
|
|
|
proxies: dict[str, str] = kwargs.get("proxies", {})
|
|
stream: bool | None = kwargs.get("stream")
|
|
|
|
certs = self.get_certs_for_url(url)
|
|
verify: bool | str | Path = kwargs.get("verify") or certs.cert or certs.verify
|
|
cert: str | Path | None = kwargs.get("cert") or certs.client_cert
|
|
|
|
if cert is not None:
|
|
cert = str(cert)
|
|
|
|
verify = str(verify) if isinstance(verify, Path) else verify
|
|
|
|
settings = session.merge_environment_settings(
|
|
prepared_request.url, proxies, stream, verify, cert
|
|
)
|
|
|
|
# Send the request.
|
|
send_kwargs = {
|
|
"timeout": kwargs.get("timeout", REQUESTS_TIMEOUT),
|
|
"allow_redirects": kwargs.get("allow_redirects", True),
|
|
}
|
|
send_kwargs.update(settings)
|
|
|
|
attempt = 0
|
|
resp = None
|
|
|
|
while True:
|
|
is_last_attempt = attempt >= 5
|
|
try:
|
|
resp = session.send(prepared_request, **send_kwargs)
|
|
except (requests.exceptions.ConnectionError, OSError) as e:
|
|
if is_last_attempt:
|
|
raise e
|
|
else:
|
|
if resp.status_code not in STATUS_FORCELIST or is_last_attempt:
|
|
if raise_for_status:
|
|
resp.raise_for_status()
|
|
return resp
|
|
|
|
if not is_last_attempt:
|
|
attempt += 1
|
|
delay = self._get_backoff(resp, attempt)
|
|
logger.debug("Retrying HTTP request in %s seconds.", delay)
|
|
time.sleep(delay)
|
|
continue
|
|
|
|
# this should never really be hit under any sane circumstance
|
|
raise PoetryException("Failed HTTP {} request", method.upper())
|
|
|
|
def _get_backoff(self, response: requests.Response | None, attempt: int) -> float:
|
|
if response is not None:
|
|
retry_after = response.headers.get(RETRY_AFTER_HEADER, "")
|
|
if retry_after:
|
|
return float(retry_after)
|
|
|
|
return 0.5 * attempt
|
|
|
|
def get(self, url: str, **kwargs: Any) -> requests.Response:
|
|
return self.request("get", url, **kwargs)
|
|
|
|
def head(self, url: str, **kwargs: Any) -> requests.Response:
|
|
kwargs.setdefault("allow_redirects", False)
|
|
return self.request("head", url, **kwargs)
|
|
|
|
def post(self, url: str, **kwargs: Any) -> requests.Response:
|
|
return self.request("post", url, **kwargs)
|
|
|
|
def _get_credentials_for_repository(
|
|
self, repository: AuthenticatorRepositoryConfig, username: str | None = None
|
|
) -> HTTPAuthCredential:
|
|
# cache repository credentials by repository url to avoid multiple keyring
|
|
# backend queries when packages are being downloaded from the same source
|
|
key = f"{repository.url}#username={username or ''}"
|
|
|
|
if key not in self._credentials:
|
|
self._credentials[key] = repository.get_http_credentials(
|
|
password_manager=self._password_manager, username=username
|
|
)
|
|
|
|
return self._credentials[key]
|
|
|
|
def _get_credentials_for_url(
|
|
self, url: str, exact_match: bool = False
|
|
) -> HTTPAuthCredential:
|
|
repository = self.get_repository_config_for_url(url, exact_match)
|
|
|
|
credential = (
|
|
self._get_credentials_for_repository(repository=repository)
|
|
if repository is not None
|
|
else HTTPAuthCredential()
|
|
)
|
|
|
|
if credential.password is None:
|
|
parsed_url = urllib.parse.urlsplit(url)
|
|
netloc = parsed_url.netloc
|
|
credential = self._password_manager.get_credential(
|
|
url, netloc, username=credential.username
|
|
)
|
|
|
|
return HTTPAuthCredential(
|
|
username=credential.username, password=credential.password
|
|
)
|
|
|
|
return credential
|
|
|
|
def get_credentials_for_git_url(self, url: str) -> HTTPAuthCredential:
|
|
parsed_url = urllib.parse.urlsplit(url)
|
|
|
|
if parsed_url.scheme not in {"http", "https"}:
|
|
return HTTPAuthCredential()
|
|
|
|
key = f"git+{url}"
|
|
|
|
if key not in self._credentials:
|
|
self._credentials[key] = self._get_credentials_for_url(url, True)
|
|
|
|
return self._credentials[key]
|
|
|
|
def get_credentials_for_url(self, url: str) -> HTTPAuthCredential:
|
|
parsed_url = urllib.parse.urlsplit(url)
|
|
netloc = parsed_url.netloc
|
|
|
|
if url not in self._credentials:
|
|
if "@" not in netloc:
|
|
# no credentials were provided in the url, try finding the
|
|
# best repository configuration
|
|
self._credentials[url] = self._get_credentials_for_url(url)
|
|
else:
|
|
# Split from the right because that's how urllib.parse.urlsplit()
|
|
# behaves if more than one @ is present (which can be checked using
|
|
# the password attribute of urlsplit()'s return value).
|
|
auth, netloc = netloc.rsplit("@", 1)
|
|
# Split from the left because that's how urllib.parse.urlsplit()
|
|
# behaves if more than one : is present (which again can be checked
|
|
# using the password attribute of the return value)
|
|
user, password = auth.split(":", 1) if ":" in auth else (auth, "")
|
|
self._credentials[url] = HTTPAuthCredential(
|
|
urllib.parse.unquote(user),
|
|
urllib.parse.unquote(password),
|
|
)
|
|
|
|
return self._credentials[url]
|
|
|
|
def get_pypi_token(self, name: str) -> str | None:
|
|
return self._password_manager.get_pypi_token(name)
|
|
|
|
def get_http_auth(
|
|
self, name: str, username: str | None = None
|
|
) -> HTTPAuthCredential | None:
|
|
if name == "pypi":
|
|
repository = AuthenticatorRepositoryConfig(
|
|
name, "https://upload.pypi.org/legacy/"
|
|
)
|
|
else:
|
|
if name not in self.configured_repositories:
|
|
return None
|
|
repository = self.configured_repositories[name]
|
|
|
|
return self._get_credentials_for_repository(
|
|
repository=repository, username=username
|
|
)
|
|
|
|
def get_certs_for_repository(self, name: str) -> RepositoryCertificateConfig:
|
|
if name.lower() == "pypi" or name not in self.configured_repositories:
|
|
return RepositoryCertificateConfig()
|
|
return self.configured_repositories[name].certs(self._config)
|
|
|
|
@property
|
|
def configured_repositories(self) -> dict[str, AuthenticatorRepositoryConfig]:
|
|
if self._configured_repositories is None:
|
|
self._configured_repositories = {}
|
|
for repository_name in self._config.get("repositories", []):
|
|
url = self._config.get(f"repositories.{repository_name}.url")
|
|
self._configured_repositories[repository_name] = (
|
|
AuthenticatorRepositoryConfig(repository_name, url)
|
|
)
|
|
|
|
return self._configured_repositories
|
|
|
|
def reset_credentials_cache(self) -> None:
|
|
self.get_repository_config_for_url.cache_clear()
|
|
self._credentials = {}
|
|
|
|
def add_repository(self, name: str, url: str) -> None:
|
|
self.configured_repositories[name] = AuthenticatorRepositoryConfig(name, url)
|
|
self.reset_credentials_cache()
|
|
|
|
def get_certs_for_url(self, url: str) -> RepositoryCertificateConfig:
|
|
if url not in self._certs:
|
|
self._certs[url] = self._get_certs_for_url(url)
|
|
return self._certs[url]
|
|
|
|
def _get_repository_config_for_url(
|
|
self, url: str, exact_match: bool = False
|
|
) -> AuthenticatorRepositoryConfig | None:
|
|
parsed_url = urllib.parse.urlsplit(url)
|
|
candidates_netloc_only = []
|
|
candidates_path_match = []
|
|
|
|
for repository in self.configured_repositories.values():
|
|
if exact_match:
|
|
if parsed_url.path == repository.path:
|
|
return repository
|
|
continue
|
|
|
|
if repository.netloc == parsed_url.netloc:
|
|
if parsed_url.path.startswith(repository.path) or commonprefix(
|
|
(parsed_url.path, repository.path)
|
|
):
|
|
candidates_path_match.append(repository)
|
|
continue
|
|
candidates_netloc_only.append(repository)
|
|
|
|
if candidates_path_match:
|
|
candidates = candidates_path_match
|
|
elif candidates_netloc_only:
|
|
candidates = candidates_netloc_only
|
|
else:
|
|
return None
|
|
|
|
if len(candidates) > 1:
|
|
logger.debug(
|
|
"Multiple source configurations found for %s - %s",
|
|
parsed_url.netloc,
|
|
", ".join(c.name for c in candidates),
|
|
)
|
|
# prefer the more specific path
|
|
candidates.sort(
|
|
key=lambda c: len(commonprefix([parsed_url.path, c.path])), reverse=True
|
|
)
|
|
|
|
return candidates[0]
|
|
|
|
def _get_certs_for_url(self, url: str) -> RepositoryCertificateConfig:
|
|
selected = self.get_repository_config_for_url(url)
|
|
if selected:
|
|
return selected.certs(config=self._config)
|
|
return RepositoryCertificateConfig()
|
|
|
|
|
|
_authenticator: Authenticator | None = None
|
|
|
|
|
|
def get_default_authenticator() -> Authenticator:
|
|
global _authenticator
|
|
|
|
if _authenticator is None:
|
|
_authenticator = Authenticator()
|
|
|
|
return _authenticator
|