Source code for clapper.click

# SPDX-FileCopyrightText: Copyright © 2022 Idiap Research Institute <contact@idiap.ch>
# SPDX-FileContributor: Amir Mohammadi  <amir.mohammadi@idiap.ch>
#
# SPDX-License-Identifier: BSD-3-Clause

"""Helpers to build command-line interfaces (CLI) via :py:mod:`click`."""

import functools
import inspect
import logging
import pathlib
import pprint
import shutil
import time
import typing

from importlib.metadata import EntryPoint

import click
import tomli

from click.core import ParameterSource

from .config import load, mod_to_context, resource_keys
from .rc import UserDefaults

module_logger = logging.getLogger(__name__)
"""Module logger."""

_COMMON_CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
"""Common click context settings."""


[docs] def verbosity_option( logger: logging.Logger, short_name: str = "v", name: str = "verbose", dflt: int = 0, **kwargs: typing.Any, ) -> typing.Callable[..., typing.Any]: """Click-option decorator that adds a ``-v``/``--verbose`` option to a cli. This decorator adds a click option to your CLI to set the log-level on a provided :py:class:`logging.Logger`. You must specifically determine the logger that will be affected by this CLI option, via the ``logger`` option. .. code-block:: python @verbosity_option(logger=logger) The verbosity option has the "count" type, and has a default value of 0. At each time you provide ``-v`` options on the command-line, this value is increased by one. For example, a CLI setting of ``-vvv`` will set the value of this option to 3. This is the mapping between the value of this option (count of ``-v`` CLI options passed) and the log-level set at the provided logger: * 0 (no ``-v`` option provided): ``logger.setLevel(logging.ERROR)`` * 1 (``-v``): ``logger.setLevel(logging.WARNING)`` * 2 (``-vv``): ``logger.setLevel(logging.INFO)`` * 3 (``-vvv`` or more): ``logger.setLevel(logging.DEBUG)`` The verbosity level specified in this option will also be set during the loading of the ``ResourceOption`` or ``CommandConfig`` configuration files **unless** ``verbosity_option`` is a ``ResourceOption`` itself. If this is the case, the logging level during the configuration loading will be the default level of the logger and the option will only effect the logging after the options handling. Arguments: logger: The :py:class:`logging.Logger` to be set. short_name: Short name of the option. If not set, then use ``v`` name: Long name of the option. If not set, then use ``verbose`` -- this will also become the name of the contextual parameter for click. dflt: The default verbosity level to use (defaults to 0). **kwargs: Further keyword-arguments to be forwarded to the underlying :py:func:`click.option` Returns ------- A callable, that follows the :py:mod:`click`-framework policy for option decorators. Use it accordingly. """ def custom_verbosity_option(f): def callback(ctx, param, value): ctx.meta[name] = value log_level: int = { # type: ignore 0: logging.ERROR, 1: logging.WARNING, 2: logging.INFO, 3: logging.DEBUG, }[value] logger.setLevel(log_level) logger.debug(f'Level of Logger("{logger.name}") was set to {log_level}') return value return click.option( f"-{short_name}", f"--{name}", count=True, type=click.IntRange(min=0, max=3, clamp=True), default=dflt, show_default=True, help=( f"Increase the verbosity level from 0 (only error and critical) " f"messages will be displayed, to 1 (like 0, but adds warnings), 2 " f"(like 1, but adds info messages), and 3 (like 2, but also adds " f"debugging messages) by adding the --{name} option as often as " f"desired (e.g. '-vvv' for debug)." ), callback=callback, is_eager=kwargs.get("cls", None) is not ResourceOption, **kwargs, )(f) return custom_verbosity_option
[docs] class ConfigCommand(click.Command): """A :py:class:`click.Command` that can read options from config files. .. warning:: In order to use this class, you **have to** use the :py:class:`ResourceOption` class also. Arguments: name: The name to be used for the configuration argument *args: Unnamed parameters passed to :py:class:`click.Command` help: Help message associated with this command entry_point_group: Name of the entry point group from which entry-points will be searched **kwargs: Named parameters passed to :py:class:`click.Command` """ config_argument_name: str """The name of the config argument.""" entry_point_group: str """The name of entry point that will be used to load the config files.""" def __init__( self, name: str, *args: tuple, help: str | None = None, # noqa: A002 entry_point_group: str | None = None, **kwargs: typing.Any, ) -> None: self.entry_point_group = entry_point_group configs_argument_name = "CONFIG" # Augment help for the config file argument self.extra_help = f"""\n\nIt is possible to pass one or several Python files (or names of ``{entry_point_group}`` entry points or module names) as {configs_argument_name} arguments to the command line which contain the parameters listed below as Python variables. The options through the command-line (see below) will override the values of configuration files. You can run this command with ``<COMMAND> -H example_config.py`` to create a template config file.""" help = (help or "").rstrip() + self.extra_help # noqa: A001 super().__init__(name, *args, help=help, **kwargs) # Add the config argument to the command def configs_argument_callback(ctx, param, value): config_context = load(value, entry_point_group=self.entry_point_group) config_context = mod_to_context(config_context) ctx.config_context = config_context module_logger.debug("Augmenting click context with config context") return value click.argument( configs_argument_name, nargs=-1, callback=configs_argument_callback, is_eager=True, # runs first and unconditionally )(self) # Option for config file generation click.option( "-H", "--dump-config", type=click.File(mode="wt"), help="Name of the config file to be generated", is_eager=True, callback=self.dump_config, )(self)
[docs] def dump_config( self, ctx: typing.Any, param: typing.Any, value: typing.TextIO | None, ) -> None: """Generate configuration file from parameters and context. Using this function will conclude the command-line execution. Arguments: ctx: Click context """ config_file = value if config_file is None: return module_logger.debug(f"Generating configuration file `{config_file}'...") config_file.write('"""') config_file.write( f"Configuration file automatically generated at " f"{time.strftime('%d/%m/%Y')}.\n\n{ctx.command_path}\n" ) if self.help: h = self.help.replace(self.extra_help, "").replace("\b\n", "") config_file.write(f"\n{h.rstrip()}") if self.epilog: config_file.write("\n\n{}".format(self.epilog.replace("\b\n", ""))) config_file.write('\n"""\n') for param in self.params: if not isinstance(param, ResourceOption): # we can only handle ResourceOptions continue config_file.write(f"\n# {param.name} = {str(param.default)}\n") config_file.write('"""') if param.required: begin, dflt = "Required parameter", "" else: begin, dflt = ( "Optional parameter", f" [default: {param.default}]", ) config_file.write(f"{begin}: {param.name} ({', '.join(param.opts)}){dflt}") if param.help is not None: config_file.write(f"\n{param.help}") if ( isinstance(param, ResourceOption) and param.entry_point_group is not None ): config_file.write( f"\nRegistered entries are: " f"{resource_keys(param.entry_point_group)}" ) config_file.write('"""\n') click.echo(f"Configuration file `{config_file.name}' was written; exiting") config_file.close() ctx.exit()
[docs] class CustomParamType(click.ParamType): """Custom parameter class allowing click to receive complex Python types as parameters. """ name = "custom"
[docs] class ResourceOption(click.Option): """An extended :py:class:`click.Option` that automatically loads resources from config files. This class comes with two different functionalities that are independent and could be combined: 1. If used in commands that are inherited from :py:class:`ConfigCommand`, it will lookup inside the config files (that are provided as argument to the command) to resolve its value. Values given explicitly in the command line take precedence. 2. If ``entry_point_group`` is provided, it will treat values given to it (by any means) as resources to be loaded. Loading is done using :py:func:`.config.load`. Check :ref:`clapper.config.resource` for more details on this topic. The final value cannot be a string. You may use this class in three ways: 1. Using this class (without using :py:class:`ConfigCommand`) AND (providing ``entry_point_group``). 2. Using this class (with :py:class:`ConfigCommand`) AND (providing `entry_point_group`). 3. Using this class (with :py:class:`ConfigCommand`) AND (without providing `entry_point_group`). Using this class without :py:class:`ConfigCommand` and without providing `entry_point_group` does nothing and is not allowed. """ entry_point_group: str | None """If provided, the strings values to this option are assumed to be entry points from ``entry_point_group`` that need to be loaded. This may be different than the wrapping :py:class:`ConfigCommand`. """ string_exceptions: list[str] | None """If provided and ``entry_point_group`` is provided, the code will not treat strings in ``string_exceptions`` as entry points and does not try to load them.""" def __init__( self, param_decls=None, show_default=False, prompt=False, confirmation_prompt=False, hide_input=False, is_flag=None, flag_value=None, multiple=False, count=False, allow_from_autoenv=True, type=None, # noqa: A002 help=None, # noqa: A002 entry_point_group=None, required=False, string_exceptions=None, **kwargs, ) -> None: # By default, if unspecified, click options are converted to strings. # By using ResourceOption's, however, we allow for complex user types # to be set into options. So, if no specific ``type``, a ``default``, # the ``count`` flag, or ``is_flag`` is given, we assume this is a # "custom" parameter type, and do not convert values to strings. if ( (type is None) and (kwargs.get("default") is None) and (count is False) and (is_flag is None) ): type = CustomParamType() # noqa: A001 self.entry_point_group = entry_point_group if entry_point_group is not None: name, _, _ = self._parse_decls(param_decls, kwargs.get("expose_value")) help = help or "" # noqa: A001 help += ( # noqa: A001 f" Can be a `{entry_point_group}' entry point, a module name, or " f"a path to a Python file which contains a variable named `{name}'." ) help = help.format(entry_point_group=entry_point_group, name=name) # noqa: A001 super().__init__( param_decls=param_decls, show_default=show_default, prompt=prompt, confirmation_prompt=confirmation_prompt, hide_input=hide_input, is_flag=is_flag, flag_value=flag_value, multiple=multiple, count=count, allow_from_autoenv=allow_from_autoenv, type=type, help=help, required=required, **kwargs, ) self.string_exceptions = string_exceptions or []
[docs] def consume_value( self, ctx: click.Context, opts: dict ) -> tuple[typing.Any, ParameterSource]: """Retrieve value for parameter from appropriate context. This method will retrieve the value of its own parameter from the appropriate context, by trying various sources. Parameters ---------- ctx The click context to retrieve the value from opts command-line options, eventually passed by the user Returns ------- A tuple containing the parameter value (of any type) and the source it used to retrieve it. """ if (not hasattr(ctx, "config_context")) and self.entry_point_group is None: raise TypeError( "The ResourceOption class is not meant to be used this way. " "See package documentation for details." ) module_logger.debug(f"consuming resource option for {self.name}") value = opts.get(self.name) source = ParameterSource.COMMANDLINE # if value is not given from command line, lookup the config files given as # arguments (not options). if value is None: # if this class is used with the ConfigCommand class. This is not always # true. if hasattr(ctx, "config_context"): value = ctx.config_context.get(self.name) # if not from config files, lookup the environment variables if value is None: value = self.value_from_envvar(ctx) source = ParameterSource.ENVIRONMENT # if not from environment variables, lookup the default value if value is None: value = ctx.lookup_default(self.name) source = ParameterSource.DEFAULT_MAP if value is None: value = self.get_default(ctx) source = ParameterSource.DEFAULT return value, source
[docs] def type_cast_value(self, ctx: click.Context, value: typing.Any) -> typing.Any: """Convert and validate a value against the option's type. This method considers the option's ``type``, ``multiple``, and ``nargs``. Furthermore, if the an ``entry_point_group`` is provided, it will load it. Arguments: ctx: The click context to be used for casting the value value: The actual value, that needs to be cast Returns ------- The cast value """ value = super().type_cast_value(ctx, value) # if the value is a string and an entry_point_group is provided, load it if self.entry_point_group is not None: while isinstance(value, str) and value not in self.string_exceptions: value = load( [value], entry_point_group=self.entry_point_group, attribute_name=self.name, ) return value
[docs] class AliasedGroup(click.Group): """Class that handles prefix aliasing for commands. Basically just implements get_command that is used by click to choose the command based on the name. Example ------- To enable prefix aliasing of commands for a given group, just set ``cls=AliasedGroup`` parameter in click.group decorator. """
[docs] def get_command(self, ctx, cmd_name): # noqa: RET503 """get_command with prefix aliasing.""" rv = click.Group.get_command(self, ctx, cmd_name) if rv is not None: return rv matches = [x for x in self.list_commands(ctx) if x.startswith(cmd_name)] if not matches: return None if len(matches) == 1: return click.Group.get_command(self, ctx, matches[0]) ctx.fail(f"Too many matches: {', '.join(sorted(matches))}")
[docs] def user_defaults_group( logger: logging.Logger, config: UserDefaults, ) -> typing.Callable[..., typing.Any]: """Add a command group to read/write RC configuration. This decorator adds a whole command group to a user predefined function which is part of the user's CLI. The command group allows the user to get and set options through the command-line interface: .. code-block:: python import logging from expose.rc import UserDefaults from expose.click import user_defaults_group logger = logging.getLogger(__name__) user_defaults = UserDefaults("~/.myapprc") ... @user_defaults_group(logger=logger, config=user_defaults) def rc(**kwargs): '''Use this command to affect the global user configuration.''' pass Then use it like this: .. code-block:: shell $ user-cli rc --help usage: ... """ def group_decorator( func: typing.Callable[..., typing.Any], ) -> typing.Callable[..., typing.Any]: @click.group( cls=AliasedGroup, no_args_is_help=True, context_settings=_COMMON_CONTEXT_SETTINGS, ) @verbosity_option(logger=logger) @functools.wraps(func) def group_wrapper(**kwargs): return func(**kwargs) @group_wrapper.command(context_settings=_COMMON_CONTEXT_SETTINGS) @verbosity_option(logger=logger) def show(**_: typing.Any) -> None: """Show the user-defaults file contents.""" click.echo(str(config).strip()) @group_wrapper.command( no_args_is_help=True, context_settings=_COMMON_CONTEXT_SETTINGS, ) @click.argument("key") @verbosity_option(logger=logger) def get(key: str, **_: typing.Any) -> None: """Print a key from the user-defaults file. Retrieves the value of the requested KEY and displays it. The KEY may contain dots (``.``) to access values from subsections in the TOML_ document. """ try: click.echo(config[key]) except KeyError: raise click.ClickException( f"Cannot find object named `{key}' at `{config.path}'", ) @group_wrapper.command( name="set", no_args_is_help=True, context_settings=_COMMON_CONTEXT_SETTINGS, ) @click.argument("key") @click.argument("value") @verbosity_option(logger=logger) def set_(key: str, value: str, **_: typing.Any) -> None: """Set the value for a key on the user-defaults file. If ``key`` contains dots (``.``), then this sets nested subsection variables on the configuration file. Values are parsed and translated following the rules of TOML_. .. warning:: This command will override the current configuration file and my erase any user comments added by hand. To avoid this, simply edit your configuration file by hand. """ try: tmp = tomli.loads(f"v = {value}") value = tmp["v"] except tomli.TOMLDecodeError: pass try: config[key] = value config.write() except KeyError: logger.error( f"Cannot set object named `{key}' at `{config.path}'", exc_info=True, ) raise click.ClickException( f"Cannot set object named `{key}' at `{config.path}'", ) @group_wrapper.command( no_args_is_help=True, context_settings=_COMMON_CONTEXT_SETTINGS ) @click.argument("key") @verbosity_option(logger=logger) def rm(key: str, **_: typing.Any) -> None: """Remove the given key from the configuration file. This command will remove the KEY from the configuration file. If the input key corresponds to a section in the configuration file, then the whole configuration section will be removed. .. warning:: This command will override the current configuration file and my erase any user comments added by hand. To avoid this, simply edit your configuration file by hand. """ try: del config[key] config.write() except KeyError: logger.error( f"Cannot delete object named `{key}' at `{config.path}'", exc_info=True, ) raise click.ClickException( f"Cannot delete object named `{key}' at `{config.path}'", ) return group_wrapper return group_decorator
[docs] def config_group( logger: logging.Logger, entry_point_group: str, ) -> typing.Callable[..., typing.Any]: """Add a command group to list/describe/copy job configurations. This decorator adds a whole command group to a user predefined function which is part of the user's CLI. The command group provides an interface to list, fully describe or locally copy configuration files distributed with the package. Commands accept both entry-point or module names to be provided as input. .. code-block:: python import logging from expose.click import config_group logger = logging.getLogger(__name__) ... @config_group(logger=logger, entry_point_group="mypackage.config") def config(**kwargs): '''Use this command to list/describe/copy config files.''' pass Then use it like this: .. code-block:: shell $ user-cli config --help usage: ... """ def group_decorator( func: typing.Callable[..., typing.Any], ) -> typing.Callable[..., typing.Any]: @click.group(cls=AliasedGroup, context_settings=_COMMON_CONTEXT_SETTINGS) @verbosity_option(logger=logger) @functools.wraps(func) def group_wrapper(**kwargs): return func(**kwargs) @group_wrapper.command( name="list", context_settings=_COMMON_CONTEXT_SETTINGS, ) @click.pass_context @verbosity_option(logger=logger) def list_(ctx, **_: typing.Any): """List installed configuration resources.""" from importlib.metadata import entry_points # type: ignore entry_points: dict[str, EntryPoint] = { # type: ignore e.name: e for e in entry_points(group=entry_point_group) } # all modules with configuration resources modules: set[str] = { # note: k.module does not exist on Python < 3.9 k.value.split(":")[0].rsplit(".", 1)[0] for k in entry_points.values() # type: ignore } keep_modules: set[str] = set() for k in sorted(modules): if k not in keep_modules and not any( k.startswith(to_keep) for to_keep in keep_modules ): keep_modules.add(k) modules = keep_modules # sort data entries by originating module entry_points_by_module: dict[str, dict[str, EntryPoint]] = {} for k in modules: entry_points_by_module[k] = {} for name, ep in entry_points.items(): # type: ignore # note: ep.module does not exist on Python < 3.9 module = ep.value.split(":", 1)[0] # works on Python 3.8 if module.startswith(k): entry_points_by_module[k][name] = ep for config_type in sorted(entry_points_by_module): # calculates the longest config name so we offset the printing longest_name_length = max( len(k) for k in entry_points_by_module[config_type].keys() ) # set-up printing options print_string = f" %-{longest_name_length}s %s" # 79 - 4 spaces = 75 (see string above) description_leftover = 75 - longest_name_length click.echo(f"module: {config_type}") for name in sorted(entry_points_by_module[config_type]): ep = entry_points[name] # type: ignore if (ctx.parent.params["verbose"] >= 1) or ( ctx.params["verbose"] >= 1 ): try: obj = ep.load() if ":" in ep.value: # it's an object summary = ( f"[{type(obj).__name__}] {pprint.pformat(obj)}" ) summary = summary.replace("\n", " ") else: # it's a whole module summary = "[module] " doc = inspect.getdoc(obj) if doc is not None: summary += doc.split("\n\n")[0] summary = summary.replace("\n", " ") else: summary += "[undocumented]" except Exception as ex: summary = "(cannot be loaded; add another -v for details)" if (ctx.parent.params["verbose"] >= 2) or ( ctx.params["verbose"] >= 2 ): logger.exception(ex) else: summary = "" summary = ( (summary[: (description_leftover - 3)] + "...") if len(summary) > (description_leftover - 3) else summary ) click.echo(print_string % (name, summary)) @group_wrapper.command( no_args_is_help=True, context_settings=_COMMON_CONTEXT_SETTINGS ) @click.pass_context @click.argument( "name", required=True, nargs=-1, ) @verbosity_option(logger=logger) def describe(ctx, name, **_: typing.Any): """Describe a specific configuration resource.""" from importlib.metadata import entry_points # type: ignore entry_points: dict[str, EntryPoint] = { # type: ignore e.name: e for e in entry_points(group=entry_point_group) } for k in name: if k not in entry_points: # type: ignore logger.error(f"Cannot find configuration resource `{k}'") continue ep = entry_points[k] # type: ignore click.echo(f"Configuration: {ep.name}") click.echo(f"Python object: {ep.value}") click.echo("") mod = ep.load() if ":" not in ep.value: if (ctx.parent.params["verbose"] >= 1) or ( ctx.params["verbose"] >= 1 ): fname = inspect.getfile(mod) click.echo("Contents:") with pathlib.Path(fname).open() as f: click.echo(f.read()) else: # only output documentation, if module doc = inspect.getdoc(mod) if doc and doc.strip(): click.echo("Documentation:") click.echo(doc) @group_wrapper.command( no_args_is_help=True, context_settings=_COMMON_CONTEXT_SETTINGS ) @click.argument( "source", required=True, nargs=1, ) @click.argument( "destination", required=True, nargs=1, ) @verbosity_option(logger=logger) def copy(source, destination, **_: typing.Any): """Copy a specific configuration resource so it can be modified locally. """ from importlib.metadata import entry_points # type: ignore entry_points: dict[str, EntryPoint] = { # type: ignore e.name: e for e in entry_points(group=entry_point_group) } if source not in entry_points: # type: ignore logger.error(f"Cannot find configuration resource `{source}'") return 1 ep = entry_points[source] # type: ignore mod = ep.load() src_name = inspect.getfile(mod) logger.info(f"cp {src_name} -> {destination}") shutil.copyfile(src_name, destination) return None return group_wrapper return group_decorator
[docs] def log_parameters(logger_handle: logging.Logger, ignore: tuple[str] | None = None): """Log the click parameters with the logging module. Parameters ---------- logger The :py:class:`logging.Logger` handle to write debug information into. ignore List of the parameters to ignore when logging. (Tuple) """ ignore = ignore or tuple() ctx = click.get_current_context() # do not sort the ctx.params dict. The insertion order is kept in Python 3 # and is useful (but not necessary so works on Python 2 too). for k, v in ctx.params.items(): if k in ignore: continue logger_handle.debug("%s: %s", k, v)