Source code for miio.devtools.simulators.miotsimulator

import asyncio
import json
import logging
import random
from collections import defaultdict
from typing import List, Union

import click

try:
    from pydantic.v1 import Field, validator
except ImportError:
    from pydantic import Field, validator
from miio import PushServer
from miio.miot_cloud import MiotCloud
from miio.miot_models import DeviceModel, MiotAccess, MiotProperty, MiotService

from .common import create_info_response, did_and_mac_for_model

_LOGGER = logging.getLogger(__name__)
UNSET = -10000

ERR_INVALID_SETTING = -1000


[docs] def create_random(values): """Create random value for the given mapping.""" piid = values["piid"] if values["format"] == str: return f"piid {piid}" if values["choices"] is not None: choices = values["choices"] choice = choices[random.randint(0, len(choices) - 1)] # nosec _LOGGER.debug("Got enum %r for %s", choice, piid) return choice.value if values["range"] is not None: range = values["range"] value = random.randint(range[0], range[1]) # nosec _LOGGER.debug("Got value %r from %s for piid %s", value, range, piid) return value if values["format"] == bool: value = bool(random.randint(0, 1)) # nosec _LOGGER.debug("Got bool %r for piid %s", value, piid) return value
[docs] class SimulatedMiotProperty(MiotProperty): """Simulates a property. * Creates dummy values based on the property information. * Validates inputs for set_properties """ current_value: Union[int, str, bool] = Field(default=UNSET)
[docs] @validator("current_value", pre=True, always=True) def verify_value(cls, v, values): """This verifies that the type of the value conforms with the mapping definition. This will also create random values for the mapping when the device is initialized. """ if v == UNSET: return create_random(values) if MiotAccess.Write not in values["access"]: raise ValueError("Tried to set read-only property") try: casted_value = values["format"](v) except Exception as ex: raise TypeError("Invalid type") from ex range = values["range"] if range is not None and not (range[0] <= casted_value <= range[1]): raise ValueError(f"{casted_value} not in range {range}") choices = values["choices"] if choices is not None and not any(c.value == casted_value for c in choices): raise ValueError(f"{casted_value} not found in {choices}") return casted_value
[docs] class Config: validate_assignment = True smart_union = True # try all types before coercing
[docs] class SimulatedMiotService(MiotService): """Overridden to allow simulated properties.""" properties: List[SimulatedMiotProperty] = Field(default=[], repr=False)
[docs] class SimulatedDeviceModel(DeviceModel): """Overridden to allow simulated properties.""" services: List[SimulatedMiotService]
[docs] class MiotSimulator: """MiOT device simulator. This class implements a barebone simulator for a given devicemodel instance created from a miot schema file. """ def __init__(self, device_model): self._model: SimulatedDeviceModel = device_model self._state = defaultdict(defaultdict) self.initialize_state()
[docs] def initialize_state(self): """Create initial state for the device.""" for serv in self._model.services: _LOGGER.debug("Found service: %s", serv) for act in serv.actions: _LOGGER.debug("Found action: %s", act) for prop in serv.properties: self._state[serv.siid][prop.piid] = prop _LOGGER.debug("Found property: %s", prop)
[docs] def get_properties(self, payload): """Handle get_properties method.""" _LOGGER.info("Got get_properties call with %s", payload) response = [] params = payload["params"] for p in params: res = p.copy() try: res["value"] = self._state[res["siid"]][res["piid"]].current_value res["code"] = 0 except Exception as ex: res["value"] = "" res["code"] = ERR_INVALID_SETTING res["exception"] = str(ex) response.append(res) return {"result": response}
[docs] def set_properties(self, payload): """Handle set_properties method.""" _LOGGER.info("Received set_properties call with %s", payload) params = payload["params"] for param in params: siid = param["siid"] piid = param["piid"] value = param["value"] self._state[siid][piid].current_value = value _LOGGER.info("Set %s:%s to %s", siid, piid, self._state[siid][piid]) return {"result": 0}
[docs] def dump_services(self, payload): """Dumps the available services.""" servs = {} for serv in self._model.services: servs[serv.siid] = {"siid": serv.siid, "description": serv.description} return {"services": servs}
[docs] def dump_properties(self, payload): """Dumps the available properties. This is not implemented on real devices, but can be used for debugging. """ props = [] params = payload["params"] if "siid" not in params: raise ValueError("missing 'siid'") siid = params["siid"] if siid not in self._state: raise ValueError(f"non-existing 'siid' {siid}") for piid, prop in self._state[siid].items(): props.append( { "siid": siid, "piid": piid, "prop": prop.description, "value": prop.current_value, } ) return {"result": props}
[docs] def action(self, payload): """Handle action method.""" params = payload["params"] if ( "did" not in params or "siid" not in params or "aiid" not in params or "in" not in params ): raise ValueError("did, siid, or aiid missing") siid = params["siid"] aiid = params["aiid"] inputs = params["in"] service = self._model.get_service_by_siid(siid) action = service.get_action_by_id(aiid) action_inputs = action.inputs if len(inputs) != len(action_inputs): raise ValueError( "Invalid parameter count, was expecting %s params, got %s" % (len(inputs), len(action_inputs)) ) for idx, param in enumerate(inputs): wanted_input = action_inputs[idx] if wanted_input.choices: if not isinstance(param, int): raise TypeError( "Param #%s: enum value expects an integer %s, got %s" % (idx, wanted_input, param) ) for choice in wanted_input.choices: if param == choice.value: break else: raise ValueError( "Param #%s: invalid value '%s' for %s" % (idx, param, wanted_input.choices) ) elif wanted_input.range: if not isinstance(param, int): raise TypeError( "Param #%s: ranged value expects an integer %s, got %s" % (idx, wanted_input, param) ) min, max, step = wanted_input.range if param < min or param > max: raise ValueError( "Param #%s: value '%s' out of range [%s, %s]" % (idx, param, min, max) ) elif wanted_input.format == str and not isinstance(param, str): raise TypeError(f"Param #{idx}: expected string but got {type(param)}") _LOGGER.info("Got called %s", payload) return {"result": ["ok"]}
[docs] async def main(dev, model): device_id, mac = did_and_mac_for_model(model) server = PushServer(device_id=device_id) simulator = MiotSimulator(device_model=dev) server.add_method("miIO.info", create_info_response(model, "127.0.0.1", mac)) server.add_method("action", simulator.action) server.add_method("get_properties", simulator.get_properties) server.add_method("set_properties", simulator.set_properties) server.add_method("dump_properties", simulator.dump_properties) server.add_method("dump_services", simulator.dump_services) transport, proto = await server.start()
@click.command() @click.option("--file", type=click.File("r"), required=False) @click.option("--model", type=str, required=True, default=None) def miot_simulator(file, model): """Simulate miot device.""" if file is not None: data = file.read() dev = SimulatedDeviceModel.parse_raw(data) else: cloud = MiotCloud() try: schema = cloud.get_model_schema(model) except Exception as ex: _LOGGER.error("Unable to get schema: %s" % ex) return try: dev = SimulatedDeviceModel.parse_obj(schema) except Exception as ex: # this is far from optimal, but considering this is a developer tool it can be fixed later fn = f"/tmp/pythonmiio_unparseable_{model}.json" # nosec with open(fn, "w") as f: json.dump(schema, f, indent=4) _LOGGER.error("Unable to parse the schema, see %s: %s", fn, ex) return loop = asyncio.get_event_loop() random.seed(1) # nosec loop.run_until_complete(main(dev, model=model)) loop.run_forever()