Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions wyzesense2mqtt/bridge_tool_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,20 @@
from datetime import datetime

def on_event(ws, e):
s = f"[{datetime.fromtimestamp(e.timestamp).strftime('%Y-%m-%d %H:%M:%S')}][{e.mac}]: {e}"
if isinstance(e, str):
# Handle error string
print(f"Event: {e}")
return
s = f"[{datetime.fromtimestamp(e.timestamp).strftime('%Y-%m-%d %H:%M:%S')}][{e.mac}]: "
s += f"type={e.event}"
if hasattr(e, 'sensor_type'):
s += f", sensor_type={e.sensor_type}"
if hasattr(e, 'state'):
s += f", state={e.state}"
if hasattr(e, 'battery'):
s += f", battery={e.battery}"
if hasattr(e, 'signal_strength'):
s += f", signal={e.signal_strength}"
print(s)

def main(args):
Expand All @@ -54,17 +67,26 @@ def main(args):
return 2

def List(unused_args):
result = ws.List()
print(f"{len(result)} sensors paired:")
logging.debug(f"{len(result)} sensors paired:")
for mac in result:
print(f"\tSensor: {mac}")
logging.debug(f"\tSensor: {mac}")
try:
result = ws.List()
print(f"{len(result)} sensors paired:")
logging.debug(f"{len(result)} sensors paired:")
for mac in result:
# Display corrupted MACs (non-ASCII) in hex format
try:
mac.encode('ascii')
display_mac = mac
except UnicodeEncodeError:
display_mac = ''.join(f"{ord(c):02x}" for c in mac)
print(f"\tSensor: {display_mac}")
logging.debug(f"\tSensor: {display_mac}")
except TimeoutError:
print("Error: Timeout while retrieving sensor list.")

def Pair(unused_args):
result = ws.Scan()
(s_mac, s_type, s_version) = result
if result:
(s_mac, s_type, s_version) = result
print(f"Sensor found: mac={s_mac}, type={s_type}, version={s_version}")
logging.debug(f"Sensor found: mac={s_mac}, type={s_type}, version={s_version}")
else:
Expand All @@ -73,11 +95,20 @@ def Pair(unused_args):

def Unpair(mac_list):
for mac in mac_list:
if len(mac) != 8:
print(f"Invalid mac address, must be 8 characters: {mac}")
logging.debug(f"Invalid mac address, must be 8 characters: {mac}")
# Handle both ASCII MACs (8 chars) and hex-encoded corrupted MACs (16 chars)
if len(mac) == 16:
try:
mac_bytes = bytes.fromhex(mac)
mac = mac_bytes.decode('latin-1')
except (ValueError, UnicodeDecodeError) as e:
print(f"Invalid hex MAC address: {mac}")
logging.debug(f"Invalid hex MAC address: {mac}: {e}")
continue
elif len(mac) != 8:
print(f"Invalid mac address, must be 8 or 16 characters: {mac}")
logging.debug(f"Invalid mac address, must be 8 or 16 characters: {mac}")
continue
print(f"Un-pairing sensor {mac}:")
print(f"Un-pairing sensor {mac if len(mac) == 8 and mac.isascii() else ''.join(f'{ord(c):02x}' for c in mac)}:")
logging.debug(f"Un-pairing sensor {mac}:")
result = ws.Delete(mac)
if result is not None:
Expand All @@ -90,15 +121,19 @@ def Fix(unused_args):
invalid_mac_list = [
"00000000",
"\0\0\0\0\0\0\0\0",
"\x00\x00\x00\x00\x00\x00\x00\x00"
"\x00\x00\x00\x00\x00\x00\x00\x00",
"ffffffffffffffff"
]
print("Un-pairing bad sensors")
logging.debug("Un-pairing bad sensors")
for mac in invalid_mac_list:
result = ws.Delete(mac)
if result is not None:
print(f"Result: {result}")
logging.debug(f"Result: {result}")
try:
result = ws.Delete(mac)
if result is not None:
print(f"Removed sensor: {mac}")
logging.debug(f"Removed sensor: {mac}")
except Exception as e:
logging.debug(f"Could not remove {mac}: {e}")
print("Bad sensors removed")
logging.debug("Bad sensors removed")

Expand Down
110 changes: 93 additions & 17 deletions wyzesense2mqtt/wyzesense.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,15 @@ def FinishAuth(cls):

@classmethod
def DelSensor(cls, mac):
if isinstance(mac, bytes):
assert len(mac) == 8
return cls(cls.CMD_DEL_SENSOR, mac)
assert isinstance(mac, str)
assert len(mac) == 8
return cls(cls.CMD_DEL_SENSOR, mac.encode('ascii'))
try:
return cls(cls.CMD_DEL_SENSOR, mac.encode('ascii'))
except UnicodeEncodeError:
return cls(cls.CMD_DEL_SENSOR, mac.encode('latin-1'))

@classmethod
def DelAllSensor(cls):
Expand Down Expand Up @@ -344,11 +350,11 @@ def _HeartbeatParser(cls, mac, event, sensor_type, timestamp, data):

@classmethod
def _ClimateParser(cls, mac, event, sensor_type, timestamp, data):
_, battery, _, _, temp_hi, temp_lo, humidity, _, seq, signal_strength = struct.unpack_from(">BBBBBBBBBB", data)
_, battery, _, _, temp_hi, temp_lo, humidity, signal_strength = struct.unpack_from(">BBBBBBBB", data)

if sensor_type != SENSOR_TYPE_CLIMATE:
LOGGER.warn(f"Unexpected sensor ({sensor_type:02X}) for event {event:02X}")
return cls._UnknownParser(mac, event, timestamp, data)
return cls._UnknownParser(mac, event, sensor_type, timestamp, data)

temperature = f"{temp_hi + (temp_lo / 100.0):.2f}"
return cls(
Expand Down Expand Up @@ -443,6 +449,8 @@ def _OnEventLog(self, pkt):
tm = datetime.datetime.fromtimestamp(ts / 1000.0)
msg = pkt.Payload[9:]
LOGGER.info("LOG: time=%s, data=%s", tm.isoformat(), bytes_to_hex(msg))
if ts == 0:
self.__on_event(self, "Dongle sent event log with timestamp=0 (clock not yet synchronized)")
# Check if we have a message after, length includes the msglen byte
# if ((len(msg) + 1) >= msg_len and msg_len >= 13):
# event, mac, type, state, counter = struct.unpack(">B8sBBH", msg)
Expand Down Expand Up @@ -599,7 +607,11 @@ def _GetMac(self):
LOGGER.debug("Start GetMAC...")
resp = self._DoSimpleCommand(Packet.GetMAC())
assert len(resp.Payload) == 8
mac = resp.Payload.decode('ascii')
try:
mac = resp.Payload.decode('ascii')
except UnicodeDecodeError:
LOGGER.warning("Invalid MAC address data (non-ASCII bytes): %s", bytes_to_hex(resp.Payload))
mac = bytes_to_hex(resp.Payload)
LOGGER.debug("GetMAC returns %s", mac)
return mac

Expand All @@ -613,7 +625,11 @@ def _GetKey(self):
def _GetVersion(self):
LOGGER.debug("Start GetVersion...")
resp = self._DoSimpleCommand(Packet.GetVersion())
version = resp.Payload.decode('ascii')
try:
version = resp.Payload.decode('ascii')
except UnicodeDecodeError:
LOGGER.warning("Invalid version data (non-ASCII bytes): %s", bytes_to_hex(resp.Payload))
version = bytes_to_hex(resp.Payload)
LOGGER.debug("GetVersion returns %s", version)
return version

Expand Down Expand Up @@ -649,15 +665,28 @@ def _GetSensors(self):

def cmd_handler(pkt, e):
assert len(pkt.Payload) == 8
mac = pkt.Payload.decode('ascii')
LOGGER.info("Sensor %d/%d, MAC:%s", ctx.index + 1, ctx.count, mac)
try:
mac = pkt.Payload.decode('ascii')
LOGGER.info("Sensor %d/%d, MAC:%s", ctx.index + 1, ctx.count, mac)
except UnicodeDecodeError:
# Use latin-1 which can encode any byte value
mac = pkt.Payload.decode('latin-1')
LOGGER.warning("Sensor %d/%d: Invalid MAC address data (non-ASCII bytes): %s",
ctx.index + 1, ctx.count, bytes_to_hex(pkt.Payload))
LOGGER.info("Sensor %d/%d, MAC (hex): %s", ctx.index + 1, ctx.count,
''.join(f"{b:02x}" for b in pkt.Payload))

ctx.sensors.append(mac)
ctx.index += 1
if ctx.index == ctx.count:
e.set()

self._DoCommand(Packet.GetSensorList(count), cmd_handler, timeout=10)
try:
self._DoCommand(Packet.GetSensorList(count), cmd_handler, timeout=10)
except TimeoutError:
LOGGER.error("Timeout waiting for sensor list. Dongle may have corrupted sensor data.")
LOGGER.error("Received %d of %d expected sensors: %s", ctx.index, ctx.count, ctx.sensors)
raise
else:
LOGGER.info("No sensors bond yet...")
return ctx.sensors
Expand Down Expand Up @@ -701,22 +730,52 @@ def Stop(self, timeout=_CMD_TIMEOUT):
def Scan(self, timeout=60):
LOGGER.info("Start Scan...")

# Check if worker thread has crashed
self.CheckError()

ctx = self.CmdContext(evt=threading.Event(), result=None)

def scan_handler(pkt):
assert len(pkt.Payload) == 11
ctx.result = (pkt.Payload[1:9].decode('ascii'), pkt.Payload[9], pkt.Payload[10])
mac_bytes = pkt.Payload[1:9]

# Check for invalid MAC addresses (all 0x00 or all 0xFF)
if mac_bytes == b'\xff\xff\xff\xff\xff\xff\xff\xff':
LOGGER.warning("Received invalid MAC (all 0xFF).")
ctx.result = None
ctx.evt.set()
return
elif mac_bytes == b'\x00\x00\x00\x00\x00\x00\x00\x00':
LOGGER.warning("Received invalid MAC (all 0x00).")
ctx.result = None
ctx.evt.set()
return

try:
mac = mac_bytes.decode('ascii')
except UnicodeDecodeError:
LOGGER.warning("Invalid MAC address in scan response (non-ASCII bytes): %s",
bytes_to_hex(mac_bytes))
# Don't try to pair sensors with non-ASCII MAC addresses
ctx.result = None
ctx.evt.set()
return

ctx.result = (mac, pkt.Payload[9], pkt.Payload[10])
ctx.evt.set()

old_handler = self._SetHandler(Packet.NOTIFY_SENSOR_SCAN, scan_handler)
try:
self._DoSimpleCommand(Packet.EnableScan())

if ctx.evt.wait(timeout):
s_mac, s_type, s_ver = ctx.result
LOGGER.info("Sensor found: mac=[%s], type=%d, version=%d", s_mac, s_type, s_ver)
r1 = self._GetSensorR1(s_mac, b'Ok5HPNQ4lf77u754')
LOGGER.debug("Sensor R1: %r", bytes_to_hex(r1))
if ctx.result is not None:
s_mac, s_type, s_ver = ctx.result
LOGGER.info("Sensor found: mac=[%s], type=%d, version=%d", s_mac, s_type, s_ver)
r1 = self._GetSensorR1(s_mac, b'Ok5HPNQ4lf77u754')
LOGGER.debug("Sensor R1: %r", bytes_to_hex(r1))
else:
LOGGER.info("Invalid sensor response received")
else:
LOGGER.info("Sensor discovery timeout...")

Expand All @@ -735,14 +794,31 @@ def scan_handler(pkt):
return ctx.result

def Delete(self, mac):
resp = self._DoSimpleCommand(Packet.DelSensor(str(mac)))
if ',' in mac:
# Convert "ff,ff,ff,ff,ff,ff,ff,ff" back to bytes
mac_bytes = bytes([int(x, 16) for x in mac.split(',')])
resp = self._DoSimpleCommand(Packet.DelSensor(mac_bytes))
else:
resp = self._DoSimpleCommand(Packet.DelSensor(str(mac)))
LOGGER.debug("CmdDelSensor returns %s", bytes_to_hex(resp.Payload))
assert len(resp.Payload) == 9
ack_mac = resp.Payload[:8].decode('ascii')
try:
ack_mac = resp.Payload[:8].decode('ascii')
except UnicodeDecodeError:
LOGGER.warning("Invalid MAC address in delete response (non-ASCII bytes): %s",
bytes_to_hex(resp.Payload[:8]))
ack_mac = resp.Payload[:8].decode('latin-1')
ack_code = resp.Payload[8]
assert ack_code == 0xFF, f"CmdDelSensor: Unexpected ACK code: 0x{ack_code:02X}"
assert ack_mac == mac, f"CmdDelSensor: MAC mismatch, requested:{mac}, returned:{ack_mac}"
LOGGER.info("CmdDelSensor: %s deleted", mac)

ack_mac_bytes = ack_mac.encode('latin-1') if len(ack_mac) == 8 else ack_mac.encode('ascii')
try:
mac_bytes = mac.encode('ascii') if len(mac) == 8 else mac.encode('latin-1')
except UnicodeEncodeError:
mac_bytes = mac.encode('latin-1')

assert ack_mac_bytes == mac_bytes, f"CmdDelSensor: MAC mismatch, requested:{bytes_to_hex(mac_bytes)}, returned:{bytes_to_hex(ack_mac_bytes)}"
LOGGER.info("CmdDelSensor: %s deleted", bytes_to_hex(mac_bytes) if not mac.isascii() else mac)

def DeleteAll(self):
resp = self._DoSimpleCommand(Packet.DelAllSensor())
Expand Down