Skip to content

Commit 25ae06b

Browse files
authored
Add helpers for BLE WiFi scanning and provisioning (#976)
1 parent 473cdf4 commit 25ae06b

File tree

8 files changed

+704
-83
lines changed

8 files changed

+704
-83
lines changed

aioshelly/ble/manufacturer_data.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
"""Shelly BLE manufacturer data parsing."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from typing import TYPE_CHECKING
7+
8+
LOGGER = logging.getLogger(__name__)
9+
10+
ALLTERCO_MFID = 0x0BA9
11+
12+
# Block types in manufacturer data
13+
BLOCK_TYPE_FLAGS = 0x01
14+
BLOCK_TYPE_MAC = 0x0A
15+
BLOCK_TYPE_MODEL = 0x0B
16+
17+
# Shelly bitfield flags (block type 0x01)
18+
FLAG_DISCOVERABLE = 1 << 0
19+
FLAG_AUTH_ENABLED = 1 << 1
20+
FLAG_RPC_OVER_BLE_ENABLED = 1 << 2
21+
FLAG_BUZZER_ENABLED = 1 << 3
22+
FLAG_IN_PAIRING_MODE = 1 << 4
23+
24+
25+
def parse_shelly_manufacturer_data(
26+
manufacturer_data: dict[int, bytes],
27+
) -> dict[str, int | str] | None:
28+
"""Parse Shelly manufacturer data from BLE advertisement.
29+
30+
Args:
31+
manufacturer_data: Manufacturer data from BLE advertisement
32+
33+
Returns:
34+
Dict with parsed data (flags, mac, model) or None if invalid
35+
36+
"""
37+
if ALLTERCO_MFID not in manufacturer_data:
38+
return None
39+
40+
data = manufacturer_data[ALLTERCO_MFID]
41+
if len(data) < 1:
42+
return None
43+
44+
result: dict[str, int | str] = {}
45+
offset = 0
46+
47+
# Parse blocks
48+
while offset < len(data):
49+
block_type = data[offset]
50+
offset += 1
51+
52+
if block_type == BLOCK_TYPE_FLAGS:
53+
# 2 bytes of flags
54+
if offset + 2 > len(data):
55+
break
56+
flags = int.from_bytes(data[offset : offset + 2], byteorder="little")
57+
result["flags"] = flags
58+
offset += 2
59+
60+
elif block_type == BLOCK_TYPE_MAC:
61+
# 6 bytes MAC address
62+
if offset + 6 > len(data):
63+
break
64+
mac_bytes = data[offset : offset + 6]
65+
# Format as standard MAC address
66+
result["mac"] = ":".join(f"{b:02X}" for b in mac_bytes)
67+
offset += 6
68+
69+
elif block_type == BLOCK_TYPE_MODEL:
70+
# 2 bytes model ID
71+
if offset + 2 > len(data):
72+
break
73+
model_id = int.from_bytes(data[offset : offset + 2], byteorder="little")
74+
result["model_id"] = model_id
75+
offset += 2
76+
77+
else:
78+
# Unknown block type - can't continue parsing
79+
LOGGER.debug("Unknown block type in manufacturer data: 0x%02X", block_type)
80+
break
81+
82+
return result if result else None
83+
84+
85+
def has_rpc_over_ble(manufacturer_data: dict[int, bytes]) -> bool:
86+
"""Check if device has RPC-over-BLE enabled.
87+
88+
Args:
89+
manufacturer_data: Manufacturer data from BLE advertisement
90+
91+
Returns:
92+
True if RPC-over-BLE is enabled
93+
94+
"""
95+
parsed = parse_shelly_manufacturer_data(manufacturer_data)
96+
if not parsed or "flags" not in parsed:
97+
return False
98+
99+
flags = parsed["flags"]
100+
if TYPE_CHECKING:
101+
assert isinstance(flags, int)
102+
103+
return bool(flags & FLAG_RPC_OVER_BLE_ENABLED)

aioshelly/ble/provisioning.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
"""WiFi provisioning via BLE for Shelly devices."""
2+
3+
from __future__ import annotations
4+
5+
from contextlib import asynccontextmanager
6+
from typing import TYPE_CHECKING, Any, cast
7+
8+
from ..common import ConnectionOptions
9+
from ..rpc_device import RpcDevice
10+
11+
if TYPE_CHECKING:
12+
from collections.abc import AsyncIterator
13+
14+
from bleak.backends.device import BLEDevice
15+
16+
17+
@asynccontextmanager
18+
async def ble_rpc_device(ble_device: BLEDevice) -> AsyncIterator[RpcDevice]:
19+
"""Create and manage BLE RPC device connection.
20+
21+
Args:
22+
ble_device: BLE device to connect to
23+
24+
Yields:
25+
Initialized RPC device
26+
27+
Raises:
28+
DeviceConnectionError: If connection to device fails
29+
30+
"""
31+
options = ConnectionOptions(ble_device=ble_device)
32+
device = await RpcDevice.create(
33+
aiohttp_session=None,
34+
ws_context=None,
35+
ip_or_options=options,
36+
)
37+
38+
try:
39+
await device.initialize()
40+
yield device
41+
finally:
42+
await device.shutdown()
43+
44+
45+
async def async_scan_wifi_networks(ble_device: BLEDevice) -> list[dict[str, Any]]:
46+
"""Scan for WiFi networks via BLE.
47+
48+
Args:
49+
ble_device: BLE device to connect to
50+
51+
Returns:
52+
List of WiFi networks with ssid, rssi, auth fields
53+
54+
Raises:
55+
DeviceConnectionError: If connection to device fails
56+
RpcCallError: If RPC call fails
57+
58+
"""
59+
async with ble_rpc_device(ble_device) as device:
60+
# WiFi scan can take up to 20 seconds - use 30s timeout to be safe
61+
scan_result = await device.call_rpc("WiFi.Scan", timeout=30)
62+
return cast(list[dict[str, Any]], scan_result.get("results", []))
63+
64+
65+
async def async_provision_wifi(ble_device: BLEDevice, ssid: str, password: str) -> None:
66+
"""Provision WiFi credentials to device via BLE.
67+
68+
Args:
69+
ble_device: BLE device to connect to
70+
ssid: WiFi network SSID
71+
password: WiFi network password
72+
73+
Raises:
74+
DeviceConnectionError: If connection to device fails
75+
RpcCallError: If RPC call fails
76+
77+
"""
78+
async with ble_rpc_device(ble_device) as device:
79+
await device.call_rpc(
80+
"WiFi.SetConfig",
81+
{
82+
"config": {
83+
"sta": {
84+
"ssid": ssid,
85+
"pass": password,
86+
"enable": True,
87+
}
88+
}
89+
},
90+
)

aioshelly/zeroconf.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Zeroconf helper functions for Shelly devices."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
7+
from zeroconf import IPVersion
8+
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
9+
10+
LOGGER = logging.getLogger(__name__)
11+
12+
13+
async def async_lookup_device_by_name(
14+
aiozc: AsyncZeroconf, device_name: str
15+
) -> tuple[str, int] | None:
16+
"""Look up a Shelly device by name via zeroconf.
17+
18+
Args:
19+
aiozc: AsyncZeroconf instance
20+
device_name: Device name (e.g., "ShellyPlugUS-C049EF8873E8")
21+
22+
Returns:
23+
Tuple of (host, port) if found, None otherwise
24+
25+
"""
26+
service_name = f"{device_name}._http._tcp.local."
27+
28+
LOGGER.debug("Active lookup for: %s", service_name)
29+
service_info = AsyncServiceInfo("_http._tcp.local.", service_name)
30+
31+
if not await service_info.async_request(aiozc.zeroconf, 5000):
32+
LOGGER.debug("Active lookup did not find service")
33+
return None
34+
35+
addresses = service_info.parsed_addresses(IPVersion.V4Only)
36+
if not addresses or not service_info.port:
37+
LOGGER.debug("Active lookup found service but no IPv4 addresses or port")
38+
return None
39+
40+
host = addresses[0]
41+
port = service_info.port
42+
LOGGER.debug("Found device via active lookup at %s:%s", host, port)
43+
return (host, port)

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ dependencies = [
2828
"habluetooth>=3.42.0",
2929
"orjson>=3.8.1",
3030
"yarl",
31+
"zeroconf>=0.148.0"
3132
]
3233

3334
[project.optional-dependencies]

0 commit comments

Comments
 (0)