"""miIO protocol implementation.
This module contains the implementation of the routines to encrypt and decrypt
miIO payloads with a device-specific token.
The payloads to be encrypted (to be passed to a device) are expected to be
JSON objects, the same applies for decryption where they are converted
automatically to JSON objects.
If the decryption fails, raw bytes as returned by the device are returned.
An usage example can be seen in the source of :func:`miio.Device.send`.
If the decryption fails, raw bytes as returned by the device are returned.
"""
import calendar
import datetime
import hashlib
import json
import logging
from typing import Any, Dict, Tuple, Union
from construct import (
Adapter,
Bytes,
Checksum,
Const,
Default,
GreedyBytes,
Hex,
IfThenElse,
Int16ub,
Int32ub,
Pointer,
RawCopy,
Rebuild,
Struct,
)
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from miio.exceptions import PayloadDecodeException
_LOGGER = logging.getLogger(__name__)
[docs]
class Utils:
"""This class is adapted from the original xpn.py code by gst666."""
[docs]
@staticmethod
def verify_token(token: bytes):
"""Checks if the given token is of correct type and length."""
if not isinstance(token, bytes):
raise TypeError("Token must be bytes")
if len(token) != 16:
raise ValueError("Wrong token length")
[docs]
@staticmethod
def md5(data: bytes) -> bytes:
"""Calculates a md5 hashsum for the given bytes object."""
checksum = hashlib.md5() # nosec
checksum.update(data)
return checksum.digest()
[docs]
@staticmethod
def key_iv(token: bytes) -> Tuple[bytes, bytes]:
"""Generate an IV used for encryption based on given token."""
key = Utils.md5(token)
iv = Utils.md5(key + token)
return key, iv
[docs]
@staticmethod
def encrypt(plaintext: bytes, token: bytes) -> bytes:
"""Encrypt plaintext with a given token.
:param bytes plaintext: Plaintext (json) to encrypt
:param bytes token: Token to use
:return: Encrypted bytes
"""
if not isinstance(plaintext, bytes):
raise TypeError("plaintext requires bytes")
Utils.verify_token(token)
key, iv = Utils.key_iv(token)
padder = padding.PKCS7(128).padder()
padded_plaintext = padder.update(plaintext) + padder.finalize()
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
return encryptor.update(padded_plaintext) + encryptor.finalize()
[docs]
@staticmethod
def decrypt(ciphertext: bytes, token: bytes) -> bytes:
"""Decrypt ciphertext with a given token.
:param bytes ciphertext: Ciphertext to decrypt
:param bytes token: Token to use
:return: Decrypted bytes object
"""
if not isinstance(ciphertext, bytes):
raise TypeError("ciphertext requires bytes")
Utils.verify_token(token)
key, iv = Utils.key_iv(token)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
unpadded_plaintext = unpadder.update(padded_plaintext)
unpadded_plaintext += unpadder.finalize()
return unpadded_plaintext
[docs]
@staticmethod
def checksum_field_bytes(ctx: Dict[str, Any]) -> bytearray:
"""Gather bytes for checksum calculation."""
x = bytearray(ctx["header"].data)
x += ctx["_"]["token"]
if "data" in ctx:
x += ctx["data"].data
# print("DATA: %s" % ctx["data"])
return x
[docs]
@staticmethod
def get_length(x) -> int:
"""Return total packet length."""
datalen = x._.data.length # type: int
return datalen + 32
[docs]
@staticmethod
def is_hello(x) -> bool:
"""Return if packet is a hello packet."""
# not very nice, but we know that hellos are 32b of length
val = x.get("length", x.header.value["length"])
return val == 32
[docs]
class TimeAdapter(Adapter):
"""Adapter for timestamp conversion."""
def _encode(self, obj, context, path):
return calendar.timegm(obj.timetuple())
def _decode(self, obj, context, path):
return datetime.datetime.fromtimestamp(obj, tz=datetime.timezone.utc)
[docs]
class EncryptionAdapter(Adapter):
"""Adapter to handle communication encryption."""
def _encode(self, obj, context, path):
"""Encrypt the given payload with the token stored in the context.
:param obj: JSON object to encrypt
"""
# pp(context)
return Utils.encrypt(
json.dumps(obj).encode("utf-8") + b"\x00", context["_"]["token"]
)
def _decode(self, obj, context, path) -> Union[Dict, bytes]:
"""Decrypts the payload using the token stored in the context."""
# Missing payload is expected for discovery messages.
if not obj:
return obj
try:
decrypted = Utils.decrypt(obj, context["_"]["token"])
decrypted = decrypted.rstrip(b"\x00")
except Exception:
_LOGGER.debug("Unable to decrypt, returning raw bytes: %s", obj)
return obj
# list of adaption functions for malformed json payload (quirks)
decrypted_quirks = [
# try without modifications first
lambda decrypted_bytes: decrypted_bytes,
# powerstrip returns malformed JSON if the device is not
# connected to the cloud, so we try to fix it here carefully.
lambda decrypted_bytes: decrypted_bytes.replace(
b',,"otu_stat"', b',"otu_stat"'
),
# xiaomi cloud returns malformed json when answering _sync.batch_gen_room_up_url
# command so try to sanitize it
lambda decrypted_bytes: (
decrypted_bytes[: decrypted_bytes.rfind(b"\x00")]
if b"\x00" in decrypted_bytes
else decrypted_bytes
),
# fix double-oh values for 090615.curtain.jldj03, ##1411
lambda decrypted_bytes: decrypted_bytes.replace(
b'"value":00', b'"value":0'
),
# fix double commas for xiaomi.vacuum.b112, fw: 2.2.4_0049
lambda decrypted_bytes: decrypted_bytes.replace(b",,", b","),
]
for i, quirk in enumerate(decrypted_quirks):
try:
decoded = quirk(decrypted).decode("utf-8")
return json.loads(decoded)
except Exception as ex:
# log the error when decrypted bytes couldn't be loaded
# after trying all quirk adaptions
if i == len(decrypted_quirks) - 1:
_LOGGER.error("Unable to parse json '%s': %s", decrypted, ex)
raise PayloadDecodeException(
"Unable to parse message payload"
) from ex
raise Exception("this should never happen")
Message = Struct(
# for building we need data before anything else.
"data" / Pointer(32, RawCopy(EncryptionAdapter(GreedyBytes))),
"header"
/ RawCopy(
Struct(
Const(0x2131, Int16ub),
"length" / Rebuild(Int16ub, Utils.get_length),
"unknown" / Default(Int32ub, 0x00000000),
"device_id" / Hex(Bytes(4)),
"ts"
/ TimeAdapter(
Default(
Int32ub,
datetime.datetime.now(tz=datetime.timezone.utc),
)
),
)
),
"checksum"
/ IfThenElse(
Utils.is_hello,
Bytes(16),
Checksum(Bytes(16), Utils.md5, Utils.checksum_field_bytes),
),
)