from __future__ import annotations
import dataclasses
import logging
import os
import re
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
from typing import ClassVar
from packaging.utils import canonicalize_name
from poetry.config.dict_config_source import DictConfigSource
from poetry.config.file_config_source import FileConfigSource
from poetry.locations import CONFIG_DIR
from poetry.locations import DEFAULT_CACHE_DIR
from poetry.toml import TOMLFile
from collections.abc import Callable
from poetry.config.config_source import ConfigSource
def boolean_validator(val: str) -> bool:
return val in {"true", "false", "1", "0"}
def boolean_normalizer(val: str) -> bool:
return val in ["true", "1"]
def int_normalizer(val: str) -> int:
return int(val)
class PackageFilterPolicy:
policy: dataclasses.InitVar[str | list[str] | None]
packages: list[str] = dataclasses.field(init=False)
def __post_init__(self, policy: str | list[str] | None) -> None:
if not policy:
policy = []
elif isinstance(policy, str):
policy = self.normalize(policy)
self.packages = policy
def allows(self, package_name: str) -> bool:
if ":all:" in self.packages:
return False
return (
not self.packages
or ":none:" in self.packages
or canonicalize_name(package_name) not in self.packages
def is_reserved(cls, name: str) -> bool:
return bool(re.match(r":(all|none):", name))
def normalize(cls, policy: str) -> list[str]:
if boolean_validator(policy):
if boolean_normalizer(policy):
return [":all:"]
return [":none:"]
return list(
name.strip() if cls.is_reserved(name) else canonicalize_name(name)
for name in policy.strip().split(",")
if name
def validator(cls, policy: str) -> bool:
if boolean_validator(policy):
return True
names = policy.strip().split(",")
for name in names:
if (
not name
or (cls.is_reserved(name) and len(names) == 1)
or re.match(r"^[a-zA-Z\d_-]+$", name)
return False
return True
logger = logging.getLogger(__name__)
_default_config: Config | None = None
class Config:
default_config: ClassVar[dict[str, Any]] = {
"cache-dir": str(DEFAULT_CACHE_DIR),
"virtualenvs": {
"create": True,
"in-project": None,
"path": os.path.join("{cache-dir}", "virtualenvs"),
"options": {
"always-copy": False,
"system-site-packages": False,
# we default to False here in order to prevent development environment
# breakages for IDEs etc. as when working in these environments
# assumptions are often made about virtual environments having pip and
# setuptools.
"no-pip": False,
"no-setuptools": False,
"prefer-active-python": False,
"prompt": "{project_name}-py{python_version}",
"experimental": {
"system-git-client": False,
"installer": {
"modern-installation": True,
"parallel": True,
"max-workers": None,
"no-binary": None,
"solver": {
"lazy-wheel": True,
"warnings": {
"export": True,
"keyring": {
"enabled": True,
def __init__(
self, use_environment: bool = True, base_dir: Path | None = None
) -> None:
self._config = deepcopy(self.default_config)
self._use_environment = use_environment
self._base_dir = base_dir
self._config_source: ConfigSource = DictConfigSource()
self._auth_config_source: ConfigSource = DictConfigSource()
def config(self) -> dict[str, Any]:
return self._config
def config_source(self) -> ConfigSource:
return self._config_source
def auth_config_source(self) -> ConfigSource:
return self._auth_config_source
def set_config_source(self, config_source: ConfigSource) -> Config:
self._config_source = config_source
return self
def set_auth_config_source(self, config_source: ConfigSource) -> Config:
self._auth_config_source = config_source
return self
def merge(self, config: dict[str, Any]) -> None:
from poetry.utils.helpers import merge_dicts
merge_dicts(self._config, config)
def all(self) -> dict[str, Any]:
def _all(config: dict[str, Any], parent_key: str = "") -> dict[str, Any]:
all_ = {}
for key in config:
value = self.get(parent_key + key)
if isinstance(value, dict):
if parent_key != "":
current_parent = parent_key + key + "."
current_parent = key + "."
all_[key] = _all(config[key], parent_key=current_parent)
all_[key] = value
return all_
return _all(self.config)
def raw(self) -> dict[str, Any]:
return self._config
def _get_environment_repositories() -> dict[str, dict[str, str]]:
repositories = {}
pattern = re.compile(r"POETRY_REPOSITORIES_(?P<name>[A-Z_]+)_URL")
for env_key in os.environ:
match = pattern.match(env_key)
if match:
repositories[match.group("name").lower().replace("_", "-")] = {
"url": os.environ[env_key]
return repositories
def repository_cache_directory(self) -> Path:
return Path(self.get("cache-dir")).expanduser() / "cache" / "repositories"
def artifacts_cache_directory(self) -> Path:
return Path(self.get("cache-dir")).expanduser() / "artifacts"
def virtualenvs_path(self) -> Path:
path = self.get("virtualenvs.path")
if path is None:
path = Path(self.get("cache-dir")) / "virtualenvs"
return Path(path).expanduser()
def installer_max_workers(self) -> int:
# This should be directly handled by ThreadPoolExecutor
# however, on some systems the number of CPUs cannot be determined
# (it raises a NotImplementedError), so, in this case, we assume
# that the system only has one CPU.
default_max_workers = (os.cpu_count() or 1) + 4
except NotImplementedError:
default_max_workers = 5
desired_max_workers = self.get("installer.max-workers")
if desired_max_workers is None:
return default_max_workers
return min(default_max_workers, int(desired_max_workers))
def get(self, setting_name: str, default: Any = None) -> Any:
Retrieve a setting value.
keys = setting_name.split(".")
# Looking in the environment if the setting
# is set via a POETRY_* environment variable
if self._use_environment:
if setting_name == "repositories":
# repositories setting is special for now
repositories = self._get_environment_repositories()
if repositories:
return repositories
env = "POETRY_" + "_".join(k.upper().replace("-", "_") for k in keys)
env_value = os.getenv(env)
if env_value is not None:
return self.process(self._get_normalizer(setting_name)(env_value))
value = self._config
for key in keys:
if key not in value:
return self.process(default)
value = value[key]
if self._use_environment and isinstance(value, dict):
# this is a configuration table, it is likely that we missed env vars
# in order to capture them recurse, eg: virtualenvs.options
return {k: self.get(f"{setting_name}.{k}") for k in value}
return self.process(value)
def process(self, value: Any) -> Any:
if not isinstance(value, str):
return value
def resolve_from_config(match: re.Match[str]) -> Any:
key = match.group(1)
config_value = self.get(key)
if config_value:
return config_value
# The key doesn't exist in the config but might be resolved later,
# so we keep it as a format variable.
return f"{{{key}}}"
return re.sub(r"{(.+?)}", resolve_from_config, value)
def _get_normalizer(name: str) -> Callable[[str], Any]:
if name in {
return boolean_normalizer
if name == "virtualenvs.path":
return lambda val: str(Path(val))
if name == "installer.max-workers":
return int_normalizer
if name == "installer.no-binary":
return PackageFilterPolicy.normalize
return lambda val: val
def create(cls, reload: bool = False) -> Config:
global _default_config
if _default_config is None or reload:
_default_config = cls()
# Load global config
config_file = TOMLFile(CONFIG_DIR / "config.toml")
if config_file.exists():
logger.debug("Loading configuration file %s", config_file.path)
# Load global auth config
auth_config_file = TOMLFile(CONFIG_DIR / "auth.toml")
if auth_config_file.exists():
logger.debug("Loading configuration file %s", auth_config_file.path)
return _default_config