"""Click commons.
This file contains common functions for cli tools.
"""
import ast
import ipaddress
import json
import logging
import re
from functools import partial, wraps
from typing import Any, Callable, ClassVar, Dict, List, Optional, Set, Type, Union
import click
from .exceptions import DeviceError
try:
from rich import print as echo
except ImportError:
echo = click.echo
_LOGGER = logging.getLogger(__name__)
[docs]
def validate_ip(ctx, param, value):
if value is None:
return None
try:
ipaddress.ip_address(value)
return value
except ValueError as ex:
raise click.BadParameter("Invalid IP: %s" % ex)
[docs]
def validate_token(ctx, param, value):
if value is None:
return None
token_len = len(value)
if token_len != 32:
raise click.BadParameter("Token length != 32 chars: %s" % token_len)
return value
[docs]
class ExceptionHandlerGroup(click.Group):
"""Add a simple group for catching the miio-related exceptions.
This simplifies catching the exceptions from different click commands.
Idea from https://stackoverflow.com/a/44347763
"""
def __call__(self, *args, **kwargs):
try:
return self.main(*args, **kwargs)
except Exception as ex:
_LOGGER.exception("Exception: %s", ex)
[docs]
class EnumType(click.Choice):
def __init__(self, enumcls, casesensitive=False):
choices = enumcls.__members__
if not casesensitive:
choices = (_.lower() for _ in choices)
self._enumcls = enumcls
self._casesensitive = casesensitive
super().__init__(list(sorted(set(choices))))
[docs]
def convert(self, value, param, ctx):
if not self._casesensitive:
value = value.lower()
value = super().convert(value, param, ctx)
if not self._casesensitive:
return next(_ for _ in self._enumcls if _.name.lower() == value.lower())
else:
return next(_ for _ in self._enumcls if _.name == value)
[docs]
class LiteralParamType(click.ParamType):
name = "literal"
[docs]
def convert(self, value, param, ctx):
try:
return ast.literal_eval(value)
except ValueError:
self.fail("%s is not a valid literal" % value, param, ctx)
[docs]
class GlobalContextObject:
def __init__(self, debug: int = 0, output: Optional[Callable] = None):
self.debug = debug
self.output = output
[docs]
class DeviceGroup(click.MultiCommand):
[docs]
class Command:
def __init__(self, name, decorators, *, default_output=None, **kwargs):
self.name = name
self.decorators = list(decorators)
self.decorators.reverse()
self.default_output = default_output
self.kwargs = kwargs
def __call__(self, func):
self.func = func
func._device_group_command = self
self.kwargs.setdefault("help", self.func.__doc__)
def _autodetect_model_if_needed(func):
@wraps(func)
def _wrap(self, *args, **kwargs):
skip_autodetect = func._device_group_command.kwargs.pop(
"skip_autodetect", False
)
if (
not skip_autodetect
and self._model is None
and self._info is None
):
_LOGGER.debug("Unknown model, trying autodetection")
self._fetch_info()
return func(self, *args, **kwargs)
# TODO HACK to make the command visible to cli
_wrap._device_group_command = func._device_group_command
return _wrap
func = _autodetect_model_if_needed(func)
return func
@property
def command_name(self):
return self.name or self.func.__name__.lower()
[docs]
def wrap(self, ctx, func):
gco = ctx.find_object(GlobalContextObject)
if gco is not None and gco.output is not None:
output = gco.output
elif self.default_output:
output = self.default_output
else:
output = format_output(f"Running command {self.command_name}")
# Remove skip_autodetect before constructing the click.command
self.kwargs.pop("skip_autodetect", None)
func = output(func)
for decorator in self.decorators:
func = decorator(func)
return click.command(self.command_name, **self.kwargs)(func)
[docs]
def call(self, owner, *args, **kwargs):
method = getattr(owner, self.func.__name__)
return method(*args, **kwargs)
DEFAULT_PARAMS = [
click.Option(["--ip"], required=True, callback=validate_ip),
click.Option(["--token"], required=True, callback=validate_token),
click.Option(["--model"], required=False),
]
def __init__(
self,
device_class,
name=None,
invoke_without_command=False,
no_args_is_help=None,
subcommand_metavar=None,
chain=False,
result_callback=None,
result_callback_pass_device=True,
**attrs,
):
self.commands = getattr(device_class, "_device_group_commands", None)
if self.commands is None:
raise RuntimeError(
"Class {} doesn't use DeviceGroupMeta meta class."
" It can't be used with DeviceGroup."
)
self.device_class = device_class
self.device_pass = click.make_pass_decorator(device_class)
attrs.setdefault("params", self.DEFAULT_PARAMS)
attrs.setdefault("callback", click.pass_context(self.group_callback))
if result_callback_pass_device and callable(result_callback):
result_callback = self.device_pass(result_callback)
super().__init__(
name or device_class.__name__.lower(),
invoke_without_command,
no_args_is_help,
subcommand_metavar,
chain,
result_callback,
**attrs,
)
[docs]
def group_callback(self, ctx, *args, **kwargs):
gco = ctx.find_object(GlobalContextObject)
if gco:
kwargs["debug"] = gco.debug
ctx.obj = self.device_class(*args, **kwargs)
[docs]
def command_callback(self, miio_command, miio_device, *args, **kwargs):
return miio_command.call(miio_device, *args, **kwargs)
[docs]
def get_command(self, ctx, cmd_name):
if cmd_name not in self.commands:
ctx.fail("Unknown command (%s)" % cmd_name)
cmd = self.commands[cmd_name]
return self.commands[cmd_name].wrap(
ctx, self.device_pass(partial(self.command_callback, cmd))
)
[docs]
def list_commands(self, ctx):
return sorted(self.commands.keys())
[docs]
def command(*decorators, name=None, default_output=None, **kwargs):
return DeviceGroup.Command(
name, decorators, default_output=default_output, **kwargs
)
[docs]
def json_output(pretty=False):
indent = 2 if pretty else None
def decorator(func):
@wraps(func)
def wrap(*args, **kwargs):
try:
result = func(*args, **kwargs)
except DeviceError as ex:
echo(json.dumps(ex.args[0], indent=indent))
return
# TODO: __json__ is not used anywhere and could be removed
get_json_data_func = getattr(result, "__json__", None)
data_variable = getattr(result, "data", None)
if get_json_data_func is not None:
result = get_json_data_func()
elif data_variable is not None:
result = data_variable
echo(json.dumps(result, indent=indent))
return wrap
return decorator