Source code for miio.integrations.chuangmi.remote.chuangmi_ir

import base64
import re
from typing import Callable, Set, Tuple

import click
from construct import (
    Adapter,
    Array,
    BitsInteger,
    BitStruct,
    Computed,
    Const,
    Int16ub,
    Int16ul,
    Int32ul,
    Rebuild,
    Struct,
    len_,
    this,
)

from miio.click_common import command, format_output
from miio.device import Device


[docs] class ChuangmiIr(Device): """Main class representing Chuangmi IR Remote Controller.""" _supported_models = [ "chuangmi.ir.v2", "chuangmi.remote.v2", "chuangmi.remote.h102a03", "xiaomi.wifispeaker.l05g", ] PRONTO_RE = re.compile(r"^([\da-f]{4}\s?){3,}([\da-f]{4})$", re.IGNORECASE)
[docs] @command( click.argument("key", type=int), default_output=format_output("Learning command into storage key {key}"), ) def learn(self, key: int = 1): """Learn an infrared command. :param int key: Storage slot, must be between 1 and 1000000 """ if key < 1 or key > 1000000: raise ValueError("Invalid storage slot.") return self.send("miIO.ir_learn", {"key": str(key)})
[docs] @command( click.argument("key", type=int), default_output=format_output("Reading infrared command from storage key {key}"), ) def read(self, key: int = 1): """Read a learned command. Positive response (chuangmi.ir.v2): {'key': '1', 'code': 'Z6WPAasBAAA3BQAA4AwJAEA....AAABAAEBAQAAAQAA=='} Negative response (chuangmi.ir.v2): {'error': {'code': -5002, 'message': 'no code for this key'}, 'id': 5} Negative response (chuangmi.ir.v2): {'error': {'code': -5003, 'message': 'learn timeout'}, 'id': 17} :param int key: Slot to read from """ if key < 1 or key > 1000000: raise ValueError("Invalid storage slot.") return self.send("miIO.ir_read", {"key": str(key)})
[docs] def play_raw(self, command: str, frequency: int = 38400, length: int = -1): """Play a captured command. :param str command: Command to execute :param int frequency: Execution frequency :param int length: Length of the command. -1 means not sending the length parameter. """ if length < 0: return self.send("miIO.ir_play", {"freq": frequency, "code": command}) else: return self.send( "miIO.ir_play", {"freq": frequency, "code": command, "length": length} )
[docs] def play_pronto(self, pronto: str, repeats: int = 1, length: int = -1): """Play a Pronto Hex encoded IR command. Supports only raw Pronto format, starting with 0000. :param str pronto: Pronto Hex string. :param int repeats: Number of extra signal repeats. :param int length: Length of the command. -1 means not sending the length parameter. """ command, frequency = self.pronto_to_raw(pronto, repeats) return self.play_raw(command, frequency, length)
[docs] @classmethod def pronto_to_raw(cls, pronto: str, repeats: int = 1) -> Tuple[str, int]: """Play a Pronto Hex encoded IR command. Supports only raw Pronto format, starting with 0000. :param str pronto: Pronto Hex string. :param int repeats: Number of extra signal repeats. """ if repeats < 0: raise ValueError("Invalid repeats value") try: pronto_data = Pronto.parse(bytearray.fromhex(pronto)) except Exception as ex: raise ValueError("Invalid Pronto command") from ex if len(pronto_data.intro) == 0: repeats += 1 times: Set[int] = set() for pair in pronto_data.intro + pronto_data.repeat * (1 if repeats else 0): times.add(pair.pulse) times.add(pair.gap) times_sorted = sorted(times) times_map = {t: idx for idx, t in enumerate(times_sorted)} edge_pairs = [] for pair in pronto_data.intro + pronto_data.repeat * repeats: edge_pairs.append( {"pulse": times_map[pair.pulse], "gap": times_map[pair.gap]} ) signal_code = base64.b64encode( ChuangmiIrSignal.build( { "times_index": times_sorted + [0] * (16 - len(times)), "edge_pairs": edge_pairs, } ) ).decode() return signal_code, int(round(pronto_data.frequency))
[docs] @command( click.argument("command", type=str), default_output=format_output("Playing the supplied command"), ) def play(self, command: str): """Plays a command in one of the supported formats.""" if ":" not in command: if self.PRONTO_RE.match(command): command_type = "pronto" else: command_type = "raw" command_args = [] else: command_type, command, *command_args = command.split(":") arg_types = [int, int] if len(command_args) > len(arg_types): raise ValueError("Invalid command arguments count") if command_type not in ["raw", "pronto"]: raise ValueError("Invalid command type") play_method: Callable if command_type == "raw": play_method = self.play_raw elif command_type == "pronto": play_method = self.play_pronto try: converted_command_args = [t(v) for v, t in zip(command_args, arg_types)] except Exception as ex: raise ValueError("Invalid command arguments") from ex return play_method(command, *converted_command_args)
[docs] @command( click.argument("indicator_led", type=bool), default_output=format_output( lambda indicator_led: ( "Turning on indicator LED" if indicator_led else "Turning off indicator LED" ) ), ) def set_indicator_led(self, indicator_led: bool): """Set the indicator led on/off.""" if indicator_led: return self.send("set_indicatorLamp", ["on"]) else: return self.send("set_indicatorLamp", ["off"])
[docs] @command(default_output=format_output("Indicator LED status: {result}")) def get_indicator_led(self): """Get the indicator led status.""" return self.send("get_indicatorLamp")
[docs] class ProntoPulseAdapter(Adapter): def _decode(self, obj, context, path): return int(obj * context._.modulation_period) def _encode(self, obj, context, path): raise RuntimeError("Not implemented")
ChuangmiIrSignal = Struct( Const(0xA567, Int16ul), "edge_count" / Rebuild(Int16ul, len_(this.edge_pairs) * 2 - 1), "times_index" / Array(16, Int32ul), "edge_pairs" / Array( (this.edge_count + 1) // 2, BitStruct("gap" / BitsInteger(4), "pulse" / BitsInteger(4)), ), ) ProntoBurstPair = Struct( "pulse" / ProntoPulseAdapter(Int16ub), "gap" / ProntoPulseAdapter(Int16ub) ) Pronto = Struct( Const(0, Int16ub), "_ticks" / Int16ub, "modulation_period" / Computed(this._ticks * 0.241246), "frequency" / Computed(1000000 / this.modulation_period), "intro_len" / Int16ub, "repeat_len" / Int16ub, "intro" / Array(this.intro_len, ProntoBurstPair), "repeat" / Array(this.repeat_len, ProntoBurstPair), )