Skip to content

Commit a73745c

Browse files
committed
feature/unit/ac measure: monitor the current, voltage, power
1 parent acba76b commit a73745c

File tree

1 file changed

+195
-0
lines changed

1 file changed

+195
-0
lines changed

m5stack/libs/unit/ac_measure.py

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
from machine import I2C
2+
from micropython import const
3+
import struct
4+
from .pahub import PAHUB
5+
from .unit_helper import UnitError
6+
import time
7+
8+
try:
9+
from typing import Union
10+
except ImportError:
11+
pass
12+
13+
AC_MEASURE_ADDR = 0x42
14+
15+
VOLTAGE_STR_REG = 0x00
16+
CURRENT_STR_REG = 0x10
17+
ACTIVE_POWER_STR_REG = 0x20
18+
APPARENT_POWER_STR_REG = 0x30
19+
POWER_FACTOR_STR_REG = 0x40
20+
KWH_STR_REG = 0x50
21+
VOLTAGE_BYTE_REG = 0x60
22+
CURRENT_BYTE_REG = 0x70
23+
ACTIVE_POWER_BYTE_REG = 0x80
24+
APPARENT_POWER_BYTE_REG = 0x90
25+
POWER_FACTOR_BYTE_REG = 0xA0
26+
KWH_BYTE_REG = 0xB0
27+
VOLTAGE_COEFF_REG = 0xC0
28+
CURRENT_COEFF_REG = 0xD0
29+
SAVE_COEFF_REG = 0xE0
30+
DATA_READY_REG = 0xFC
31+
BOOTLOADER_REG = 0xFD
32+
FIRM_VER_REG = 0xFE
33+
I2C_ADDR_REG = 0xFF
34+
35+
36+
class AC_MEASURE:
37+
def __init__(self, i2c: Union[I2C, PAHUB], slave_addr: int = AC_MEASURE_ADDR) -> None:
38+
"""
39+
AC Measure Initialize Function
40+
Set I2C port, AC Measure Slave Address
41+
"""
42+
self.ac_measure_i2c = i2c
43+
self.init_i2c_address(slave_addr)
44+
45+
def init_i2c_address(self, slave_addr: int = AC_MEASURE_ADDR) -> None:
46+
"""
47+
change the i2c address
48+
slave_addr : 1 to 127
49+
"""
50+
if slave_addr >= 0x01 and slave_addr <= 0x7F:
51+
self.i2c_addr = slave_addr
52+
if not (self.i2c_addr in self.ac_measure_i2c.scan()):
53+
raise UnitError("AC Measure unit maybe not connect")
54+
55+
def get_voltage_str(self) -> str:
56+
"""
57+
get voltage string value
58+
"""
59+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, VOLTAGE_STR_REG, 7).decode()
60+
61+
def get_current_str(self) -> str:
62+
"""
63+
get current string value
64+
"""
65+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, CURRENT_STR_REG, 7).decode()
66+
67+
def get_active_power_str(self) -> str:
68+
"""
69+
get active power string value
70+
"""
71+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, ACTIVE_POWER_STR_REG, 7).decode()
72+
73+
def get_apparent_power_str(self) -> str:
74+
"""
75+
get apparent power string value
76+
"""
77+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, APPARENT_POWER_STR_REG, 7).decode()
78+
79+
def get_power_factor_str(self) -> str:
80+
"""
81+
get power factor string value
82+
"""
83+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, POWER_FACTOR_STR_REG, 4).decode()
84+
85+
def get_kwh_str(self) -> str:
86+
"""
87+
get kwh string value
88+
"""
89+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, KWH_STR_REG, 11).decode()
90+
91+
def get_voltage_byte(self) -> int:
92+
"""
93+
get voltage raw value
94+
"""
95+
buf = self.ac_measure_i2c.readfrom_mem(self.i2c_addr, VOLTAGE_BYTE_REG, 2)
96+
return struct.unpack("<H", buf)[0]
97+
98+
def get_current_byte(self) -> int:
99+
"""
100+
get current raw value
101+
"""
102+
buf = self.ac_measure_i2c.readfrom_mem(self.i2c_addr, CURRENT_BYTE_REG, 2)
103+
return struct.unpack("<H", buf)[0]
104+
105+
def get_active_power_byte(self) -> int:
106+
"""
107+
get active power raw value
108+
"""
109+
buf = self.ac_measure_i2c.readfrom_mem(self.i2c_addr, ACTIVE_POWER_BYTE_REG, 4)
110+
return struct.unpack("<I", buf)[0]
111+
112+
def get_apparent_power_byte(self) -> int:
113+
"""
114+
get apparent power raw value
115+
"""
116+
buf = self.ac_measure_i2c.readfrom_mem(self.i2c_addr, APPARENT_POWER_BYTE_REG, 4)
117+
return struct.unpack("<I", buf)[0]
118+
119+
def get_power_factor_byte(self) -> int:
120+
"""
121+
get power factor raw value
122+
"""
123+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, POWER_FACTOR_BYTE_REG, 1)[0]
124+
125+
def get_kwh_byte(self) -> int:
126+
"""
127+
get kwh raw value
128+
"""
129+
buf = self.ac_measure_i2c.readfrom_mem(self.i2c_addr, KWH_BYTE_REG, 4)
130+
return struct.unpack("<I", buf)[0]
131+
132+
def get_voltage_coeff(self) -> int:
133+
"""
134+
get voltage coefficient value
135+
"""
136+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, VOLTAGE_COEFF_REG, 1)[0]
137+
138+
def set_voltage_coeff(self, value: int) -> None:
139+
"""
140+
set voltage coefficient value
141+
value: 0 - 255
142+
"""
143+
self.ac_measure_i2c.writeto_mem(self.i2c_addr, VOLTAGE_COEFF_REG, bytearray([int(value)]))
144+
145+
def get_current_coeff(self) -> int:
146+
"""
147+
get current coefficient value
148+
"""
149+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, CURRENT_COEFF_REG, 1)[0]
150+
151+
def set_current_coeff(self, value: int) -> None:
152+
"""
153+
set current coefficient value
154+
value: 0 - 255
155+
"""
156+
self.ac_measure_i2c.writeto_mem(self.i2c_addr, CURRENT_COEFF_REG, bytearray([int(value)]))
157+
158+
def set_save_coeff(self) -> None:
159+
"""
160+
to save flash in volt and current coefficient value
161+
"""
162+
self.ac_measure_i2c.writeto_mem(self.i2c_addr, SAVE_COEFF_REG, bytearray([0x01]))
163+
time.sleep_ms(100)
164+
165+
def get_data_ready(self) -> bool:
166+
"""
167+
get data is ready?
168+
"""
169+
status = self.ac_measure_i2c.readfrom_mem(self.i2c_addr, DATA_READY_REG, 1)[0]
170+
return True if status == 1 else False
171+
172+
def set_jump_bootloader(self) -> None:
173+
"""
174+
set jump to bootloader
175+
"""
176+
self.ac_measure_i2c.writeto_mem(self.i2c_addr, BOOTLOADER_REG, bytearray([0x01]))
177+
178+
def get_device_status(self, mode: int) -> int:
179+
"""
180+
get firmware version and i2c address.
181+
mode : 0xFE and 0xFF
182+
"""
183+
if mode >= FIRM_VER_REG and mode <= I2C_ADDR_REG:
184+
return self.ac_measure_i2c.readfrom_mem(self.i2c_addr, mode, 1)[0]
185+
186+
def set_i2c_address(self, addr: int) -> None:
187+
"""
188+
set i2c address.
189+
addr: 0x01 to 0x7F
190+
"""
191+
if addr >= 0x01 and addr <= 0x7F:
192+
if addr != self.i2c_addr:
193+
self.ac_measure_i2c.writeto_mem(self.i2c_addr, I2C_ADDR_REG, bytearray([addr]))
194+
self.i2c_addr = addr
195+
time.sleep_ms(100)

0 commit comments

Comments
 (0)