ArchiveBox/archivebox/pkgs/abx/abx.py

518 lines
20 KiB
Python

__package__ = 'abx'
__id__ = 'abx'
__label__ = 'ABX'
__author__ = 'Nick Sweeting'
__homepage__ = 'https://github.com/ArchiveBox/ArchiveBox'
__order__ = 0
import sys
import inspect
import importlib
import itertools
from pathlib import Path
from typing import Dict, Callable, List, Set, Tuple, Iterable, Any, TypeVar, TypedDict, Type, cast, Generic, Mapping, overload, Final, ParamSpec, Literal, Protocol
from types import ModuleType
from typing_extensions import Annotated
from functools import cache
from benedict import benedict
from pydantic import AfterValidator
from pluggy import HookimplMarker, PluginManager, HookimplOpts, HookspecOpts, HookCaller
ParamsT = ParamSpec("ParamsT")
ReturnT = TypeVar('ReturnT')
class HookSpecDecoratorThatReturnsFirstResult(Protocol):
"""Type of a plugin method decorated with @hookspec(firstresult=True), which returns a single result (from the first plugin that implements the hook)"""
def __call__(self, func: Callable[ParamsT, ReturnT]) -> Callable[ParamsT, ReturnT]: ...
class HookSpecDecoratorThatReturnsListResults(Protocol):
"""Type of a plugin method decorated with @hookspec(firstresult=False), which returns a list of results (one for each plugin that implements the hook)"""
def __call__(self, func: Callable[ParamsT, ReturnT]) -> Callable[ParamsT, List[ReturnT]]: ...
class TypedHookspecMarker:
"""
Improved version of pluggy.HookspecMarker that supports type inference of hookspecs with firstresult=True|False correctly
https://github.com/pytest-dev/pluggy/issues/191
"""
__slots__ = ('project_name',)
def __init__(self, project_name: str) -> None:
self.project_name: Final[str] = project_name
# handle @hookspec(firstresult=False) -> List[ReturnT] (test_firstresult_False_hookspec)
@overload
def __call__(
self,
function: None = ...,
firstresult: Literal[False] = ...,
historic: bool = ...,
warn_on_impl: Warning | None = ...,
warn_on_impl_args: Mapping[str, Warning] | None = ...,
) -> HookSpecDecoratorThatReturnsListResults: ...
# handle @hookspec(firstresult=True) -> ReturnT (test_firstresult_True_hookspec)
@overload
def __call__(
self,
function: None = ...,
firstresult: Literal[True] = ...,
historic: bool = ...,
warn_on_impl: Warning | None = ...,
warn_on_impl_args: Mapping[str, Warning] | None = ...,
) -> HookSpecDecoratorThatReturnsFirstResult: ...
# handle @hookspec -> List[ReturnT] (test_normal_hookspec)
# order matters!!! this one has to come last
@overload
def __call__(
self,
function: Callable[ParamsT, ReturnT] = ...,
firstresult: Literal[False] = ...,
historic: bool = ...,
warn_on_impl: None = ...,
warn_on_impl_args: None = ...,
) -> Callable[ParamsT, List[ReturnT]]: ...
def __call__(
self,
function: Callable[ParamsT, ReturnT] | None = None,
firstresult: bool = False,
historic: bool = False,
warn_on_impl: Warning | None = None,
warn_on_impl_args: Mapping[str, Warning] | None = None,
) -> Callable[ParamsT, List[ReturnT]] | HookSpecDecoratorThatReturnsListResults | HookSpecDecoratorThatReturnsFirstResult:
def setattr_hookspec_opts(func) -> Callable:
if historic and firstresult:
raise ValueError("cannot have a historic firstresult hook")
opts: HookspecOpts = {
"firstresult": firstresult,
"historic": historic,
"warn_on_impl": warn_on_impl,
"warn_on_impl_args": warn_on_impl_args,
}
setattr(func, self.project_name + "_spec", opts)
return func
if function is not None:
return setattr_hookspec_opts(function)
else:
return setattr_hookspec_opts
spec = hookspec = TypedHookspecMarker("abx")
impl = hookimpl = HookimplMarker("abx")
def is_valid_attr_name(x: str) -> str:
"""Check if a string is a valid attribute name (used to validate hook method names on a plugin)"""
assert x.isidentifier() and not x.startswith('_')
return x
def is_valid_module_name(x: str) -> str:
"""Check if a string e.g. "some_pkg.some_plugin_name" is a valid module name (used to validate plugin IDs)"""
assert x.isidentifier() and not x.startswith('_') and x.islower()
return x
AttrName = Annotated[str, AfterValidator(is_valid_attr_name)]
PluginId = Annotated[str, AfterValidator(is_valid_module_name)]
class PluginInfo(TypedDict, total=True):
"""Full Metadata Dictionary containing all info about a plugin, returned by abx.get_plugin()"""
id: PluginId
package: AttrName
label: str
version: str
author: str
homepage: str
dependencies: List[str]
source_code: str
hooks: Dict[AttrName, Callable]
module: ModuleType
PluginSpec = TypeVar("PluginSpec")
class ABXPluginManager(PluginManager, Generic[PluginSpec]):
"""
Patch to fix pluggy's PluginManager to work with pydantic models.
See: https://github.com/pytest-dev/pluggy/pull/536
"""
# enable static type checking of pm.hook.call() calls
# https://stackoverflow.com/a/62871889/2156113
# https://github.com/pytest-dev/pluggy/issues/191
hook: PluginSpec
def create_typed_hookcaller(self, name: str, module_or_class: Type[PluginSpec], spec_opts: HookspecOpts) -> HookCaller:
"""
create a new HookCaller subclass with a modified __signature__
so that the return type is correct and args are converted to kwargs
"""
TypedHookCaller = type('TypedHookCaller', (HookCaller,), {})
hookspec_signature = inspect.signature(getattr(module_or_class, name))
hookspec_return_type = hookspec_signature.return_annotation
# replace return type with list if firstresult=False
hookcall_return_type = hookspec_return_type if spec_opts['firstresult'] else List[hookspec_return_type]
# replace each arg with kwarg equivalent (pm.hook.call() only accepts kwargs)
args_as_kwargs = [
param.replace(kind=inspect.Parameter.KEYWORD_ONLY) if param.name != 'self' else param
for param in hookspec_signature.parameters.values()
]
TypedHookCaller.__signature__ = hookspec_signature.replace(parameters=args_as_kwargs, return_annotation=hookcall_return_type)
TypedHookCaller.__name__ = f'{name}_HookCaller'
return TypedHookCaller(name, self._hookexec, module_or_class, spec_opts)
def add_hookspecs(self, module_or_class: Type[PluginSpec]) -> None:
"""Add HookSpecs from the given class, (generic type allows us to enforce types of pm.hook.call() statically)"""
names = []
for name in dir(module_or_class):
spec_opts = self.parse_hookspec_opts(module_or_class, name)
if spec_opts is not None:
hc: HookCaller | None = getattr(self.hook, name, None)
if hc is None:
hc = self.create_typed_hookcaller(name, module_or_class, spec_opts)
setattr(self.hook, name, hc)
else:
# Plugins registered this hook without knowing the spec.
hc.set_specification(module_or_class, spec_opts)
for hookfunction in hc.get_hookimpls():
self._verify_hook(hc, hookfunction)
names.append(name)
if not names:
raise ValueError(
f"did not find any {self.project_name!r} hooks in {module_or_class!r}"
)
def parse_hookimpl_opts(self, plugin, name: str) -> HookimplOpts | None:
# IMPORTANT: @property methods can have side effects, and are never hookimpl
# if attr is a property, skip it in advance
# plugin_class = plugin if inspect.isclass(plugin) else type(plugin)
if isinstance(getattr(plugin, name, None), property):
return None
try:
return super().parse_hookimpl_opts(plugin, name)
except AttributeError:
return None
pm = ABXPluginManager("abx")
def get_plugin_order(plugin: PluginId | Path | ModuleType | Type) -> Tuple[int, Path]:
"""Get the order a plugin should be loaded in by reading its ./.plugin_order file or .__order__ attr"""
assert plugin
plugin_module = None
plugin_dir = None
if isinstance(plugin, str) or isinstance(plugin, Path):
if str(plugin).endswith('.py'):
plugin_dir = Path(plugin).parent
elif '/' in str(plugin):
# assume it's a path to a plugin directory
plugin_dir = Path(plugin)
elif str(plugin).isidentifier():
pass
elif inspect.ismodule(plugin):
plugin_module = plugin
plugin_dir = Path(str(plugin_module.__file__)).parent
elif inspect.isclass(plugin):
plugin_module = plugin
plugin_dir = Path(inspect.getfile(plugin)).parent
else:
raise ValueError(f'Invalid plugin, cannot get order: {plugin}')
if plugin_dir:
try:
# if .plugin_order file exists, use it to set the load priority
order = int((plugin_dir / '.plugin_order').read_text())
assert -1000000 < order < 100000000
return (order, plugin_dir)
except FileNotFoundError:
pass
default_order = 10 if '_spec_' in str(plugin_dir).lower() else 999
if plugin_module:
order = getattr(plugin_module, '__order__', default_order)
else:
order = default_order
assert order is not None
assert plugin_dir
return (order, plugin_dir)
# @cache
def get_plugin(plugin: PluginId | ModuleType | Type) -> PluginInfo:
"""Get the full PluginInfo metadata for a plugin, given its plugin ID, module, or class"""
assert plugin
# import the plugin module by its name
if isinstance(plugin, str):
module = importlib.import_module(plugin)
# print('IMPORTED PLUGIN:', plugin)
plugin = getattr(module, 'PLUGIN_SPEC', getattr(module, 'PLUGIN', module))
elif inspect.ismodule(plugin):
module = plugin
plugin = getattr(module, 'PLUGIN_SPEC', getattr(module, 'PLUGIN', module))
elif inspect.isclass(plugin):
module = inspect.getmodule(plugin)
else:
plugin = type(plugin)
module = inspect.getmodule(plugin)
# raise ValueError(f'Invalid plugin, must be a module, class, or plugin ID (package name): {plugin}')
assert module and hasattr(module, '__package__')
plugin_file = Path(inspect.getfile(module))
plugin_package = module.__package__ or module.__name__
plugin_id = plugin_package.replace('.', '_')
# load the plugin info from the plugin/__init__.py __attr__s if they exist
plugin_module_attrs = {
'label': getattr(module, '__label__', plugin_id),
'version': getattr(module, '__version__', '0.0.1'),
'author': getattr(module, '__author__', 'ArchiveBox'),
'homepage': getattr(module, '__homepage__', 'https://github.com/ArchiveBox'),
'dependencies': getattr(module, '__dependencies__', []),
}
# load the plugin info from the plugin/pyproject.toml file if it has one
plugin_toml_info = {}
try:
# try loading ./pyproject.toml first in case the plugin is a bare python file not inside a package dir
plugin_toml_info = benedict.from_toml((plugin_file.parent / 'pyproject.toml').read_text()).project
except Exception:
try:
# try loading ../pyproject.toml next in case the plugin is in a packge dir
plugin_toml_info = benedict.from_toml((plugin_file.parent.parent / 'pyproject.toml').read_text()).project
except Exception:
# print('WARNING: could not detect pyproject.toml for PLUGIN:', plugin_id, plugin_file.parent, 'ERROR:', e)
pass
assert plugin_id
assert plugin_package
assert module.__file__
# merge the plugin info from all sources + add dyanmically calculated info
return cast(PluginInfo, benedict(PluginInfo(**{
'id': plugin_id,
**plugin_module_attrs,
**plugin_toml_info,
'package': plugin_package,
'source_code': module.__file__,
'order': get_plugin_order(plugin),
'hooks': get_plugin_hooks(plugin),
'module': module,
'plugin': plugin,
})))
def get_all_plugins() -> Dict[PluginId, PluginInfo]:
"""Get the PluginInfo metadata for all the loaded plugins"""
plugins = {}
for plugin_module in pm.get_plugins():
plugin_info = get_plugin(plugin=plugin_module)
assert 'id' in plugin_info
plugins[plugin_info['id']] = plugin_info
return benedict(plugins)
def get_all_hook_names() -> Set[str]:
"""Get the names of all hookspec/hookimpl methods available across all loaded plugins"""
return {
hook_name
for plugin_module in pm.get_plugins()
for hook_name in get_plugin_hooks(plugin_module)
}
def get_all_hook_specs() -> Dict[str, Dict[str, Any]]:
"""Get a set of all hookspec methods defined in all plugins (useful for type checking if a pm.hook.call() is valid)"""
hook_specs = {}
for hook_name in get_all_hook_names():
for plugin_module in pm.get_plugins():
if inspect.ismodule(plugin_module):
plugin = plugin_module
plugin_module = plugin_module
elif inspect.isclass(plugin_module):
plugin = plugin_module
plugin_module = inspect.getmodule(plugin)
else:
plugin = type(plugin_module)
plugin_module = inspect.getmodule(plugin)
assert plugin and plugin_module and hasattr(plugin_module, '__package__')
if hasattr(plugin, hook_name):
hookspecopts = pm.parse_hookspec_opts(plugin, hook_name)
if hookspecopts:
method = getattr(plugin, hook_name)
signature = inspect.signature(method)
return_type = signature.return_annotation if signature.return_annotation != inspect._empty else None
if hookspecopts.get('firstresult'):
return_type = return_type
else:
# if not firstresult, return_type is a sequence
return_type = List[return_type]
call_signature = signature.replace(return_annotation=return_type)
method = lambda *args, **kwargs: getattr(pm.hook, hook_name)(*args, **kwargs)
method.__signature__ = call_signature
method.__name__ = hook_name
method.__package__ = plugin_module.__package__
hook_specs[hook_name] = {
'name': hook_name,
'method': method,
'signature': call_signature,
'hookspec_opts': hookspecopts,
'hookspec_signature': signature,
'hookspec_plugin': method.__package__,
}
return benedict(hook_specs)
###### PLUGIN DISCOVERY AND LOADING ########################################################
def find_plugins_in_dir(plugins_dir: Path) -> Dict[PluginId, Path]:
"""
Find all the plugins in a given directory. Just looks for an __init__.py file.
"""
python_dirs = plugins_dir.glob("*/__init__.py")
sorted_python_dirs = sorted(python_dirs, key=lambda p: get_plugin_order(plugin=p) or 500)
return {
plugin_entrypoint.parent.name: plugin_entrypoint.parent
for plugin_entrypoint in sorted_python_dirs
if plugin_entrypoint.parent.name not in ('abx', 'core')
}
def get_pip_installed_plugins(group: PluginId='abx') -> Dict[PluginId, Path]:
"""replaces pm.load_setuptools_entrypoints("abx"), finds plugins that registered entrypoints via pip"""
import importlib.metadata
DETECTED_PLUGINS = {} # module_name: module_dir_path
for dist in list(importlib.metadata.distributions()):
for entrypoint in dist.entry_points:
if entrypoint.group != group or pm.is_blocked(entrypoint.name):
continue
DETECTED_PLUGINS[entrypoint.name] = Path(entrypoint.load().__file__).parent
# pm.register(plugin, name=ep.name)
# pm._plugin_distinfo.append((plugin, DistFacade(dist)))
return DETECTED_PLUGINS
# Load all plugins from pip packages, archivebox built-ins, and user plugins
def load_plugins(plugins: Iterable[PluginId | ModuleType | Type] | Dict[PluginId, Path]):
"""
Load all the plugins from a dictionary of module names and directory paths.
"""
PLUGINS_TO_LOAD = []
LOADED_PLUGINS = {}
plugin_infos = sorted([
get_plugin(plugin)
for plugin in plugins
], key=lambda plugin: plugin.get('order', 999))
for plugin_info in plugin_infos:
assert plugin_info, 'No plugin metadata found for plugin'
assert 'id' in plugin_info and 'module' in plugin_info
if plugin_info['module'] in pm.get_plugins():
LOADED_PLUGINS[plugin_info['id']] = plugin_info
continue
else:
PLUGINS_TO_LOAD.append(plugin_info)
PLUGINS_TO_LOAD = sorted(PLUGINS_TO_LOAD, key=lambda x: x['order'])
for plugin_info in PLUGINS_TO_LOAD:
# if '--version' not in sys.argv and '--help' not in sys.argv:
# print(f'🧩 Loading plugin: {plugin_info["id"]}...', end='\r', flush=True, file=sys.stderr)
pm.register(plugin_info['module'])
LOADED_PLUGINS[plugin_info['id']] = plugin_info
# print('\x1b[2K', end='\r', flush=True, file=sys.stderr)
return benedict(LOADED_PLUGINS)
@cache
def get_plugin_hooks(plugin: PluginId | ModuleType | Type | None) -> Dict[AttrName, Callable]:
"""Get all the functions marked with @hookimpl on a plugin module or class."""
if not plugin:
return {}
hooks = {}
if isinstance(plugin, str):
plugin_module = importlib.import_module(plugin)
elif inspect.ismodule(plugin) or inspect.isclass(plugin):
plugin_module = plugin
else:
plugin_module = type(plugin)
# raise ValueError(f'Invalid plugin, cannot get hooks: {plugin}')
for attr_name in dir(plugin_module):
if attr_name.startswith('_'):
continue
try:
attr = getattr(plugin_module, attr_name)
if isinstance(attr, Callable):
if pm.parse_hookimpl_opts(plugin_module, attr_name):
hooks[attr_name] = attr
except Exception as e:
print(f'Error getting hookimpls for {plugin}: {e}')
return hooks
ReturnT = TypeVar('ReturnT')
def as_list(results: List[List[ReturnT]]) -> List[ReturnT]:
"""Flatten a list of lists returned by a pm.hook.call() into a single list of [result1, result2, ...]"""
return list(itertools.chain(*results))
def as_dict(results: List[Dict[PluginId, ReturnT]]) -> Dict[PluginId, ReturnT]:
"""Flatten a list of dicts returned by a pm.hook.call() into a single dict of {plugin_id1: result1, plugin_id2: result2, ...}"""
if isinstance(results, (dict, benedict)):
results_list = results.values()
else:
results_list = results
return benedict({
result_id: result
for plugin_results in results_list
for result_id, result in plugin_results.items()
})