Source code for miio.devtools.pcapparser

"""Parse PCAP files for miio traffic."""

from collections import Counter, defaultdict
from ipaddress import ip_address
from pprint import pformat as pf
from typing import List

import click

try:
    from rich import print as echo
except ImportError:
    echo = click.echo


from miio import Message


[docs] def read_payloads_from_file(file, tokens: List[str]): """Read the given pcap file and yield src, dst, and result.""" try: import dpkt from dpkt.ethernet import ETH_TYPE_IP, Ethernet except ImportError: echo("You need to install dpkt to use this tool") return pcap = dpkt.pcap.Reader(file) stats: defaultdict[str, Counter] = defaultdict(Counter) for _ts, pkt in pcap: eth = Ethernet(pkt) if eth.type != ETH_TYPE_IP: continue ip = eth.ip if ip.p != 17: continue transport = ip.udp if transport.dport != 54321 and transport.sport != 54321: continue data = transport.data src_addr = str(ip_address(ip.src)) dst_addr = str(ip_address(ip.dst)) decrypted = None for token in tokens: try: decrypted = Message.parse(data, token=bytes.fromhex(token)) break except BaseException: # noqa: B036 continue if decrypted is None: continue stats["stats"]["miio_packets"] += 1 if decrypted.data.length == 0: stats["stats"]["empty_packets"] += 1 continue stats["dst_addr"][dst_addr] += 1 stats["src_addr"][src_addr] += 1 payload = decrypted.data.value if "result" in payload: stats["stats"]["results"] += 1 if "method" in payload: method = payload["method"] stats["commands"][method] += 1 yield src_addr, dst_addr, payload for cat in stats: echo(f"\n== {cat} ==") for stat, value in stats[cat].items(): echo(f"\t{stat}: {value}")
@click.command() @click.argument("file", type=click.File("rb")) @click.argument("token", nargs=-1) def parse_pcap(file, token: List[str]): """Read PCAP file and output decrypted miio communication.""" for src_addr, dst_addr, payload in read_payloads_from_file(file, token): echo(f"{src_addr:<15} -> {dst_addr:<15} {pf(payload)}")