Skip to content

Commit 40e94cd

Browse files
committed
Zigpy packet capture interface
1 parent e13497f commit 40e94cd

File tree

2 files changed

+131
-0
lines changed

2 files changed

+131
-0
lines changed

zigpy_cli/helpers.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import struct
2+
3+
import zigpy.types as t
4+
5+
6+
class PcapWriter:
7+
"""Class responsible to write in PCAP format."""
8+
9+
def __init__(self, file):
10+
"""Initialize PCAP file and write global header."""
11+
self.file = file
12+
13+
def write_header(self):
14+
self.file.write(
15+
struct.pack("<L", 0xA1B2C3D4)
16+
+ struct.pack("<H", 2)
17+
+ struct.pack("<H", 4)
18+
+ struct.pack("<L", 0)
19+
+ struct.pack("<L", 0)
20+
+ struct.pack("<L", 65535)
21+
+ struct.pack("<L", 283) # LINKTYPE_IEEE802_15_4_TAP
22+
)
23+
24+
def write_packet(self, packet: t.CapturedPacket) -> None:
25+
"""Write a packet with its header and TLV metadata."""
26+
timestamp_sec = int(packet.timestamp.timestamp())
27+
timestamp_usec = int(packet.timestamp.microsecond)
28+
29+
sub_tlvs = b""
30+
31+
# RSSI
32+
sub_tlvs += (
33+
t.uint16_t(1).serialize()
34+
+ t.uint16_t(4).serialize()
35+
+ t.Single(packet.rssi).serialize()
36+
)
37+
38+
# LQI
39+
sub_tlvs += (
40+
t.uint16_t(10).serialize()
41+
+ t.uint16_t(1).serialize()
42+
+ t.uint8_t(packet.lqi).serialize()
43+
+ b"\x00\x00\x00"
44+
)
45+
46+
# Channel Assignment
47+
sub_tlvs += (
48+
t.uint16_t(3).serialize()
49+
+ t.uint16_t(3).serialize()
50+
+ t.uint16_t(packet.channel).serialize()
51+
+ t.uint8_t(0).serialize() # page 0
52+
+ b"\x00"
53+
)
54+
55+
# FCS type
56+
sub_tlvs += (
57+
t.uint16_t(0).serialize()
58+
+ t.uint16_t(1).serialize()
59+
+ t.uint8_t(1).serialize() # FCS type 1
60+
+ b"\x00\x00\x00"
61+
)
62+
63+
tlvs = b""
64+
65+
# TAP header: version:u8, reserved: u8, length: u16
66+
tlvs += struct.pack("<BBH", 0, 0, 4 + len(sub_tlvs))
67+
assert len(sub_tlvs) % 4 == 0
68+
69+
data = tlvs + sub_tlvs + packet.data + packet.compute_fcs()
70+
71+
self.file.write(
72+
struct.pack("<L", timestamp_sec)
73+
+ struct.pack("<L", timestamp_usec)
74+
+ struct.pack("<L", len(data))
75+
+ struct.pack("<L", len(data))
76+
+ data
77+
)

zigpy_cli/radio.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,19 @@
77
import itertools
88
import json
99
import logging
10+
import random
1011

1112
import click
1213
import zigpy.state
1314
import zigpy.types
1415
import zigpy.zdo
1516
import zigpy.zdo.types
17+
from zigpy.application import ControllerApplication
1618

1719
from zigpy_cli.cli import cli, click_coroutine
20+
from zigpy_cli.common import CHANNELS_LIST
1821
from zigpy_cli.const import RADIO_LOGGING_CONFIGS, RADIO_TO_PACKAGE, RADIO_TO_PYPI
22+
from zigpy_cli.helpers import PcapWriter
1923

2024
LOGGER = logging.getLogger(__name__)
2125

@@ -234,3 +238,53 @@ async def change_channel(app, channel):
234238
LOGGER.info("Current channel is %s", app.state.network_info.channel)
235239

236240
await app.move_network_to_channel(channel)
241+
242+
243+
@radio.command()
244+
@click.pass_obj
245+
@click.option("-r", "--randomize", is_flag=True, type=bool, default=False)
246+
@click.option(
247+
"-c",
248+
"--channels",
249+
type=CHANNELS_LIST,
250+
default=zigpy.types.Channels.ALL_CHANNELS,
251+
)
252+
@click.option("-h", "--channel-hop-period", type=int, default=5)
253+
@click.option("-o", "--output", type=click.File("wb"), required=True)
254+
@click_coroutine
255+
async def packet_capture(app, randomize, channels, channel_hop_period, output):
256+
if not randomize:
257+
channels_iter = itertools.cycle(channels)
258+
else:
259+
260+
def channels_iter_func():
261+
while True:
262+
yield random.choice(channels)
263+
264+
channels_iter = channels_iter_func()
265+
266+
if app._packet_capture is ControllerApplication._packet_capture:
267+
raise click.ClickException("Packet capture is not supported by this radio")
268+
269+
await app.connect()
270+
271+
async with app.packet_capture(channel=next(channels_iter)) as capture:
272+
async with asyncio.TaskGroup() as tg:
273+
274+
async def channel_hopper():
275+
for channel in channels_iter:
276+
await asyncio.sleep(channel_hop_period)
277+
LOGGER.debug("Changing channel to %s", channel)
278+
await capture.change_channel(channel)
279+
280+
tg.create_task(channel_hopper())
281+
282+
writer = PcapWriter(output)
283+
writer.write_header()
284+
285+
async for packet in capture:
286+
LOGGER.debug("Got a packet %s", packet)
287+
writer.write_packet(packet)
288+
289+
if output.name == "<stdout>": # Surely there's a better way?
290+
output.flush()

0 commit comments

Comments
 (0)