|
1 | 1 | """Meter parser.""" |
2 | 2 | from __future__ import annotations |
3 | 3 |
|
| 4 | +import struct |
4 | 5 | from typing import Any |
5 | 6 |
|
| 7 | +CO2_UNPACK = struct.Struct(">H").unpack_from |
| 8 | + |
6 | 9 |
|
7 | 10 | def process_wosensorth(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]: |
8 | 11 | """Process woSensorTH/Temp sensor services data.""" |
9 | | - temp_data = None |
10 | | - battery = None |
| 12 | + temp_data: bytes | None = None |
| 13 | + battery: bytes | None = None |
11 | 14 |
|
12 | 15 | if mfr_data: |
13 | 16 | temp_data = mfr_data[8:11] |
@@ -45,38 +48,8 @@ def process_wosensorth(data: bytes | None, mfr_data: bytes | None) -> dict[str, |
45 | 48 |
|
46 | 49 | def process_wosensorth_c(data: bytes | None, mfr_data: bytes | None) -> dict[str, Any]: |
47 | 50 | """Process woSensorTH/Temp sensor services data with CO2.""" |
48 | | - temp_data = None |
49 | | - battery = None |
50 | | - |
51 | | - if mfr_data: |
52 | | - temp_data = mfr_data[8:11] |
53 | | - |
54 | | - if data: |
55 | | - if not temp_data: |
56 | | - temp_data = data[3:6] |
57 | | - battery = data[2] & 0b01111111 |
58 | | - |
59 | | - if not temp_data: |
60 | | - return {} |
61 | | - |
62 | | - _temp_sign = 1 if temp_data[1] & 0b10000000 else -1 |
63 | | - _temp_c = _temp_sign * ( |
64 | | - (temp_data[1] & 0b01111111) + ((temp_data[0] & 0b00001111) / 10) |
65 | | - ) |
66 | | - _temp_f = (_temp_c * 9 / 5) + 32 |
67 | | - _temp_f = (_temp_f * 10) / 10 |
68 | | - humidity = temp_data[2] & 0b01111111 |
69 | | - |
70 | | - if _temp_c == 0 and humidity == 0 and battery == 0: |
71 | | - return {} |
72 | | - |
73 | | - _wosensorth_data = { |
74 | | - # Data should be flat, but we keep the original structure for now |
75 | | - "temp": {"c": _temp_c, "f": _temp_f}, |
76 | | - "temperature": _temp_c, |
77 | | - "fahrenheit": bool(temp_data[2] & 0b10000000), |
78 | | - "humidity": humidity, |
79 | | - "battery": battery, |
80 | | - } |
81 | | - |
| 51 | + _wosensorth_data = process_wosensorth(data, mfr_data) |
| 52 | + if _wosensorth_data and mfr_data and len(mfr_data) >= 15: |
| 53 | + co2_data = mfr_data[13:15] |
| 54 | + _wosensorth_data["co2"] = CO2_UNPACK(co2_data)[0] |
82 | 55 | return _wosensorth_data |
0 commit comments