Source code for shippinglabel.requirements

#!/usr/bin/env python3
#
#  requirements.py
"""
Utilities for working with :pep:`508` requirements.
"""
#
#  Copyright © 2020-2021 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
#  OR OTHER DEALINGS IN THE SOFTWARE.
#
#  check_dependencies based on https://stackoverflow.com/a/29044693/3092681
#  Copyright © 2015 TehTechGuy
#  Licensed under CC-BY-SA
#

# stdlib
import copy
import warnings
from abc import ABC
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union, cast, overload

# 3rd party
import deprecation_alias
import dist_meta
import dom_toml
from domdf_python_tools.doctools import prettify_docstrings
from domdf_python_tools.iterative import natmax, natmin
from domdf_python_tools.paths import PathPlus
from domdf_python_tools.stringlist import DelimitedList, StringList
from domdf_python_tools.typing import PathLike
from packaging.markers import default_environment
from packaging.requirements import InvalidRequirement, Requirement
from packaging.specifiers import BaseSpecifier, Specifier, SpecifierSet
from typing_extensions import Literal

# this package
from shippinglabel import __version__, normalize

__all__ = [
		"ComparableRequirement",
		"resolve_specifiers",
		"combine_requirements",
		"read_requirements",
		"RequirementsManager",
		"list_requirements",
		"check_dependencies",
		"parse_requirements",
		"parse_pyproject_dependencies",
		"parse_pyproject_extras",
		]

operator_symbols = ("<=", '<', "!=", "==", ">=", '>', "~=", "===")
_Requirement = Union[str, Requirement]


[docs]@prettify_docstrings class ComparableRequirement(Requirement): """ Represents a :pep:`508` requirement. Can be compared to other requirements. A list of :class:`~.ComparableRequirement` objects can be sorted alphabetically. """ @staticmethod def _check_equal_not_none(left: Optional[Any], right: Optional[Any]) -> bool: if not left or not right: return True else: return left == right @staticmethod def _check_marker_equality(left: Optional[Any], right: Optional[Any]) -> bool: if left is not None and right is not None: for left_mark, right_mark in zip(left._markers, right._markers): if str(left_mark) != str(right_mark): return False return True
[docs] def __eq__(self, other) -> bool: # noqa: MAN001 if isinstance(other, str): try: other = Requirement(other) except InvalidRequirement: return NotImplemented return self == other elif isinstance(other, Requirement): return all(( self._check_equal_not_none(self.name, other.name), self._check_equal_not_none(self.url, other.url), self._check_equal_not_none(self.extras, other.extras), self._check_equal_not_none(self.specifier, other.specifier), self._check_marker_equality(self.marker, other.marker), )) else: # pragma: no cover return NotImplemented
[docs] def __lt__(self, other) -> bool: # noqa: MAN001 if isinstance(other, Requirement): if self.name != other.name: return self.name < other.name else: if str(self.specifier or '') != str(other.specifier or ''): return str(self.specifier or '') > str(other.specifier or '') else: return str(self.marker or '') > str(other.marker or '') elif isinstance(other, str): return self.name < other else: # pragma: no cover return NotImplemented
[docs] def __le__(self, other) -> bool: # noqa: MAN001 if not isinstance(other, (Requirement, str)): # pragma: no cover return NotImplemented if self < other or self == other: return True return False
[docs] def __ge__(self, other) -> bool: # noqa: MAN001 if not isinstance(other, (Requirement, str)): # pragma: no cover return NotImplemented if self > other or self == other: return True return False
[docs] def __gt__(self, other) -> bool: # noqa: MAN001 if isinstance(other, Requirement): if self.name != other.name: return self.name > other.name else: if str(self.specifier or '') != str(other.specifier or ''): return str(self.specifier or '') < str(other.specifier or '') else: return str(self.marker or '') < str(other.marker or '') elif isinstance(other, str): return self.name > other else: # pragma: no cover return NotImplemented
def __hash__(self) -> int: return hash(( self.name or '', self.url or '', str(self.specifier) or '', str(self.marker) or '', *(self.extras or ()), ))
class _OperatorLookup(Dict[str, DelimitedList[Specifier]]): def __setitem__(self, key: str, value: Any) -> None: if key not in operator_symbols: raise KeyError(f"Invalid operator symbol {key!r}") if isinstance(value, DelimitedList): super().__setitem__(key, value) else: super().__setitem__(key, DelimitedList(value)) def __getitem__(self, item: str) -> DelimitedList[Specifier]: if item not in self and item in operator_symbols: self[item] = DelimitedList() return super().__getitem__(item)
[docs]def resolve_specifiers(specifiers: Iterable[BaseSpecifier]) -> SpecifierSet: r""" Resolve duplicated and overlapping requirement specifiers. :param specifiers: :type specifiers: :class:`~typing.Iterable`\[:class:`~.packaging.specifiers.Specifier`] """ operator_lookup = _OperatorLookup() spec: Specifier for spec in cast(Iterable[Specifier], specifiers): if spec.operator in operator_symbols: operator_lookup[spec.operator].append(spec) if operator_lookup["<="]: operator_lookup["<="] = [Specifier(f"<={natmin(spec.version for spec in operator_lookup['<='])}")] if operator_lookup['<']: operator_lookup['<'] = [Specifier(f"<{natmin(spec.version for spec in operator_lookup['<'])}")] if operator_lookup[">="]: operator_lookup[">="] = [Specifier(f">={natmax(spec.version for spec in operator_lookup['>='])}")] if operator_lookup['>']: operator_lookup['>'] = [Specifier(f">{natmax(spec.version for spec in operator_lookup['>'])}")] # merge e.g. >1.2.3 and >=1.2.2 into >1.2.3 if operator_lookup[">="] and operator_lookup['>']: gt_version = operator_lookup['>'][0].version ge_version = operator_lookup[">="][0].version if gt_version > ge_version: del operator_lookup[">="] # merge e.g. >=1.2.2 and ==1.2.3 into ==1.2.3 if operator_lookup[">="] and operator_lookup["=="]: ge_version = operator_lookup[">="][0].version if any([eq_version.version >= ge_version for eq_version in operator_lookup["=="]]): del operator_lookup[">="] # merge e.g. <=1.2.3 and <1.2.2 into <1.2.2 if operator_lookup["<="] and operator_lookup['<']: lt_version = operator_lookup['<'][0].version le_version = operator_lookup["<="][0].version if lt_version < le_version: del operator_lookup["<="] # merge e.g. <=1.2.3 and ==1.2.2 into ==1.2.2 if operator_lookup["<="] and operator_lookup["=="]: le_version = operator_lookup["<="][0].version if any([eq_version.version <= le_version for eq_version in operator_lookup["=="]]): del operator_lookup["<="] specifier_set = SpecifierSet() if operator_lookup["<="]: specifier_set &= SpecifierSet(f"{operator_lookup['<=']:,}") if operator_lookup['<']: specifier_set &= SpecifierSet(f"{operator_lookup['<']:,}") for spec in operator_lookup["!="]: specifier_set &= SpecifierSet(f"!={spec.version}") for spec in operator_lookup["=="]: specifier_set &= SpecifierSet(f"=={spec.version}") if operator_lookup[">="]: specifier_set &= SpecifierSet(f"{operator_lookup['>=']:,}") if operator_lookup['>']: specifier_set &= SpecifierSet(f"{operator_lookup['>']:,}") for spec in operator_lookup["~="]: specifier_set &= SpecifierSet(f"~={spec.version}") for spec in operator_lookup["==="]: specifier_set &= SpecifierSet(f"==={spec.version}") return specifier_set
[docs]def combine_requirements( requirement: Union[_Requirement, Iterable[_Requirement]], *requirements: _Requirement, normalize_func: Callable[[str], str] = normalize ) -> List[ComparableRequirement]: """ Combine duplicated requirements in a list. .. versionchanged:: 0.2.1 Added the ``normalize_func`` keyword-only argument. .. versionchanged:: 0.3.1 Requirements are no longer combined if their markers differ. :param requirement: A single requirement, or an iterable of requirements. :param requirements: Additional requirements. :param normalize_func: Function to use to normalize the names of requirements. """ if isinstance(requirement, Iterable): all_requirements = [*requirement, *requirements] else: all_requirements = [requirement, *requirements] merged_requirements: List[ComparableRequirement] = [] for req in all_requirements: if not isinstance(req, ComparableRequirement): req = ComparableRequirement(str(req)) req.name = normalize_func(req.name) _denormalize_ruamel(req) if req.name in merged_requirements: possible_other_req = [x for x in merged_requirements if x.name == req.name] for other_req in possible_other_req: if str(req.marker) == str(other_req.marker): other_req.specifier &= req.specifier other_req.extras &= req.extras other_req.specifier = resolve_specifiers(other_req.specifier) break else: merged_requirements.append(copy.deepcopy(req)) else: merged_requirements.append(copy.deepcopy(req)) return merged_requirements
_read_requirements_ret_invalid = Tuple[Set[ComparableRequirement], List[str], List[str]] _read_requirements_ret_valid = Tuple[Set[ComparableRequirement], List[str]] _read_requirements_ret = Union[_read_requirements_ret_invalid, _read_requirements_ret_valid] @overload def read_requirements( req_file: PathLike, include_invalid: Literal[True], *, normalize_func: Callable[[str], str] = ... ) -> _read_requirements_ret_invalid: ... @overload def read_requirements( req_file: PathLike, include_invalid: Literal[False] = ..., *, normalize_func: Callable[[str], str] = ... ) -> _read_requirements_ret_valid: ...
[docs]def read_requirements( req_file: PathLike, include_invalid: bool = False, *, normalize_func: Callable[[str], str] = normalize ) -> _read_requirements_ret: """ Reads :pep:`508` requirements from the given file. .. versionchanged:: 0.2.0 Added the ``include_invalid`` option. .. versionchanged:: 0.2.1 Added the ``normalize_func`` keyword-only argument. :param req_file: :param include_invalid: If :py:obj:`True`, include invalid lines as the third element of the tuple. :param normalize_func: Function to use to normalize the names of requirements. :return: The requirements, and a list of commented lines. """ requirements = PathPlus(req_file).read_lines() if include_invalid: return parse_requirements(requirements, include_invalid=True, normalize_func=normalize_func) else: return parse_requirements(requirements, include_invalid=False, normalize_func=normalize_func)
@overload def parse_requirements( requirements: Iterable[str], *, include_invalid: Literal[True], normalize_func: Callable[[str], str] = ... ) -> _read_requirements_ret_invalid: ... @overload def parse_requirements( requirements: Iterable[str], *, include_invalid: Literal[False] = ..., normalize_func: Callable[[str], str] = ... ) -> _read_requirements_ret_valid: ...
[docs]def parse_requirements( requirements: Iterable[str], *, include_invalid: bool = False, normalize_func: Callable[[str], str] = normalize ) -> _read_requirements_ret: """ Parse the given strings as :pep:`508` requirements. .. versionadded:: 0.10.0 :param requirements: :param include_invalid: If :py:obj:`True`, include invalid lines as the third element of the tuple. :param normalize_func: Function to use to normalize the names of requirements. :return: The requirements, and a list of commented lines. .. latex:clearpage:: """ comments = [] invalid_lines: List[str] = [] parsed_requirements: Set[ComparableRequirement] = set() for line in requirements: if line.lstrip().startswith('#'): comments.append(line) elif line: try: req = ComparableRequirement(line) req.name = normalize_func(req.name) _denormalize_ruamel(req) parsed_requirements.add(req) except InvalidRequirement: invalid_lines.append(line) if include_invalid: return parsed_requirements, comments, invalid_lines else: for line in invalid_lines: warnings.warn(f"Ignored invalid requirement {line!r}") return parsed_requirements, comments
[docs]class RequirementsManager(ABC): """ Abstract base class for managing requirements files. When invoked with run, the methods are called in the following order: #. :meth:`~.compile_target_requirements` #. :meth:`~.merge_requirements` #. :meth:`~.remove_library_requirements` #. :meth:`~.write_requirements` :param repo_path: Path to the repository root. .. autosummary-widths:: 4/10 """ target_requirements: Set[ComparableRequirement] """ The static target requirements .. versionchanged:: 0.4.0 Previously this was a set of :class:`packaging.requirements.Requirement`. """ #: The path of the requirements file, relative to the repository root. filename: PathLike def __init__(self, repo_path: PathLike): self.repo_path = PathPlus(repo_path) self.req_file = self.prep_req_file() self.target_requirements = set(self.target_requirements)
[docs] def prep_req_file(self) -> PathPlus: """ Create the requirements file if necessary, and in any case return its filename. """ req_file = PathPlus(self.repo_path / self.filename) req_file.parent.maybe_make(parents=True) if not req_file.is_file(): req_file.touch() return req_file
[docs] def compile_target_requirements(self) -> None: """ Add and remove requirements depending on the configuration by modifying the ``target_requirements`` attribute. This method may not return anything. """ # noqa: D400
[docs] def normalize(self, name: str) -> str: """ Normalize the given name for PyPI et al. .. versionadded:: 0.2.1 :param name: The project name. """ return normalize(name)
[docs] def get_target_requirement_names(self) -> Set[str]: """ Returns a list of normalized names for the target requirements, including any added by ``compile_target_requirements``. """ # noqa: D400 names = set() for req in self.target_requirements: req.name = self.normalize(req.name) names.add(req.name) return names
[docs] def merge_requirements(self) -> List[str]: """ Merge requirements already in the file with the target requirements. Requirements may be added, changed or removed at this stage by modifying the ``target_requirements`` attribute. :return: List of commented lines. """ current_requirements, comments = read_requirements(self.req_file) self.target_requirements = set(combine_requirements(*current_requirements, *self.target_requirements)) return comments
[docs] def remove_library_requirements(self) -> None: """ Remove requirements given in the library's ``requirements.txt`` file. This method may not return anything. """ lib_requirements, _ = read_requirements(self.repo_path / "requirements.txt") lib_requirements_names_extras = {normalize(r.name): r.extras for r in lib_requirements if not r.marker} non_library_requirements = set() for req in self.target_requirements: if req.name in lib_requirements_names_extras: if req.extras != lib_requirements_names_extras[req.name]: non_library_requirements.add(req) if req.marker: non_library_requirements.add(req) else: non_library_requirements.add(req) self.target_requirements = non_library_requirements
[docs] def write_requirements(self, comments: List[str]) -> None: """ Write the list of requirements to the file. :param comments: List of commented lines. This method may not return anything. """ buf = StringList(comments) for req in sorted(self.target_requirements): buf.append(str(req)) self.req_file.write_lines(buf)
[docs] def run(self) -> PathPlus: """ Update the list of requirements and return the name of the requirements file. """ self.compile_target_requirements() comments = self.merge_requirements() self.remove_library_requirements() self.write_requirements(comments) return self.req_file
def marker_environment(extra: str) -> Dict[str, str]: env: Dict[str, str] = default_environment() env["extra"] = extra return env
[docs]def list_requirements( name: str, depth: int = 1, path: Optional[Iterable[PathLike]] = None, ) -> Iterator[Union[str, List[str], List[Union[str, List]]]]: """ Returns an iterator over the requirements of the given library, and the requirements of those requirements. The iterator is structured as follows:: [ <requirement a>, [ <requirement 1 of requirement a>, <requirement 2 of requirement a>, [<requirements of requirement 2>, ...], <requirement 3 of requirement a>, ], <requirement b>, ] :param name: :param depth: :param path: The directories entries to search for distributions in. This can be used to search in a different (virtual) environment. :default path: :py:data:`sys.path` .. versionchanged:: 0.8.2 The requirements are now sorted alphabetically. .. versionchanged:: 1.7.0 Added the ``path`` argument. """ req = ComparableRequirement(name) try: distro = dist_meta.distributions.get_distribution(req.name, path=path) except dist_meta.distributions.DistributionNotFoundError: return raw_deps = distro.get_metadata().get_all("Requires-Dist") or [] for requirement in [ComparableRequirement(r) for r in sorted(raw_deps)]: if requirement.marker: if req.extras: extras = list(req.extras)[0] else: extras = '' if not requirement.marker.evaluate(marker_environment(extras)): continue if depth: yield str(requirement) if depth != 0: deps = list(list_requirements(str(requirement), depth=depth - 1, path=path)) if deps: yield deps
[docs]@deprecation_alias.deprecated(deprecated_in="1.6.0", removed_in="2.0", current_version=__version__) def check_dependencies(dependencies: Iterable[str], prt: bool = True) -> List[str]: """ Check whether one or more dependencies are available to be imported. :param dependencies: The list of dependencies to check the availability of. :param prt: Whether the status should be printed to the terminal. :return: A list of any missing modules. """ # stdlib from pkgutil import iter_modules modules = {x[1] for x in iter_modules()} missing_modules = [] for requirement in dependencies: if requirement not in modules: missing_modules.append(requirement) if prt: if len(missing_modules) == 0: print("All modules installed") else: print("The following modules are missing:") print(missing_modules) print("Please check the documentation.") print('') return missing_modules
[docs]def parse_pyproject_dependencies( pyproject_file: PathLike, flavour: Literal["pep621", "flit", "auto"] = "auto", *, normalize_func: Callable[[str], str] = normalize ) -> Set[ComparableRequirement]: """ Parse the project's dependencies from its ``pyproject.toml`` file. .. versionadded:: 0.10.0 :param pyproject_file: :param flavour: Either ``'pep621'`` to parse from the :pep:`621` ``dependencies`` table, or ``'flit'`` to parse the ``requires`` key in ``tool.flit.metadata`. The string ``'auto`` will use ``'pep621'`` if available, otherwise try ``'flit'``. :param normalize_func: Function to use to normalize the names of dependencies. If no dependencies are defined an empty set is returned. :rtype: .. latex:clearpage:: """ config = dom_toml.load(pyproject_file) dependencies = [] if flavour == "auto": if "project" in config: flavour = "pep621" elif "flit" in config.get("tool", {}): flavour = "flit" if flavour == "pep621": dependencies = config.get("project", {}).get("dependencies", []) elif flavour == "flit": dependencies = config.get("tool", {}).get("flit", {}).get("metadata", {}).get("requires", []) return parse_requirements(dependencies, include_invalid=True, normalize_func=normalize_func)[0]
[docs]def parse_pyproject_extras( pyproject_file: PathLike, flavour: Literal["pep621", "flit", "auto"] = "auto", *, normalize_func: Callable[[str], str] = normalize ) -> Dict[str, Set[ComparableRequirement]]: """ Parse the project's extra dependencies from its ``pyproject.toml`` file. .. versionadded:: 0.10.0 :param pyproject_file: :param flavour: Either ``'pep621'`` to parse from the :pep:`621` ``dependencies`` table, or ``'flit'`` to parse the ``requires-extra`` key in ``tool.flit.metadata`. The string ``'auto`` will use ``'pep621'`` if available, otherwise try ``'flit'``. :param normalize_func: Function to use to normalize the names of dependencies. If no extra dependencies are defined an empty dictionary is returned. """ config = dom_toml.load(pyproject_file) dependencies = {} if flavour == "auto": if "project" in config: flavour = "pep621" elif "flit" in config.get("tool", {}): flavour = "flit" if flavour == "pep621": dependencies = config.get("project", {}).get("optional-dependencies", {}) elif flavour == "flit": dependencies = config.get("tool", {}).get("flit", {}).get("metadata", {}).get("requires-extra", {}) return { k: parse_requirements(v, include_invalid=True, normalize_func=normalize_func)[0] for k, v in dependencies.items() }
def _denormalize_ruamel(req: Requirement) -> None: if req.name in {"ruamel-yaml", "ruamel_yaml"}: # Special case to work around issue with Poetry req.name = "ruamel.yaml"