Skip to content

Commit fb64fb9

Browse files
committed
feature/unit: Add UWB Unit[Indoor wireless tracking system]
1 parent 04c3560 commit fb64fb9

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed

m5stack/libs/unit/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@
2424
from .gps import GPS
2525
from .hbridge import HBRIDGE
2626
from .pbhub import PBHUB
27+
from .uwb import UWB
28+
from .ac_measure import AC_MEASURE

m5stack/libs/unit/uwb.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
from machine import UART, Pin
2+
from .unit_helper import UnitError
3+
import time
4+
5+
UWB_RESULT_HEADER = "an"
6+
UWB_GET_TIMEOUT = 12
7+
8+
9+
class UWB:
10+
def __init__(self, port, id=None, debug=False):
11+
self._debug = debug
12+
Pinx = Pin(port[0], Pin.IN, Pin.PULL_UP)
13+
self._uart = UART(1, tx=port[1], rx=port[0])
14+
self._uart.init(115200, bits=8, parity=None, stop=1)
15+
self.tx = port[1]
16+
self.rx = port[0]
17+
self.get_distance_measure = [0.0, 0.0, 0.0, 0.0]
18+
self.continuous_op = False
19+
self.device_id = ""
20+
self.reset()
21+
time.sleep(0.2)
22+
if self.check_device is False:
23+
raise UnitError("UWB unit maybe not connect")
24+
self.continuous_output_value(0)
25+
if id == None:
26+
self.set_mode()
27+
self.set_range_interval(5)
28+
self.continuous_output_value(1)
29+
else:
30+
self.set_mode(id)
31+
32+
def uart_port_id(self, id_num):
33+
self._uart = UART(id_num, tx=self.tx, rx=self.rx)
34+
self._uart.init(115200, bits=8, parity=None, stop=1)
35+
36+
@property
37+
def check_device(self):
38+
cmd = "AT\r\n"
39+
resp = self.at_cmd_send(cmd, keyword="OK")
40+
if len(resp):
41+
if self.remove_lfcr(resp)[0] == "OK":
42+
return True
43+
else:
44+
return False
45+
46+
def get_version(self):
47+
cmd = "AT+version?\r\n"
48+
response = self.at_cmd_send(cmd, keyword="OK")
49+
if len(response) > 2:
50+
if "OK" in response[2]:
51+
return response[0]
52+
53+
def reset(self):
54+
cmd = "AT+RST\r\n"
55+
resp = self.at_cmd_send(cmd, keyword="OK")
56+
resp = ",".join(self.remove_lfcr(resp))
57+
if resp.find("device:") != -1:
58+
self.device_id = resp[resp.find("device:") + 7 :]
59+
60+
def set_range_interval(self, interval):
61+
cmd = "AT+interval=" + str(interval) + "\r\n"
62+
self.at_cmd_send(cmd, keyword="OK")
63+
64+
def set_mode(self, id=None):
65+
cmd = "AT+anchor_tag=1," + str(id) + "\r\n" if (id != None) else "AT+anchor_tag=0\r\n"
66+
self.at_cmd_send(cmd, keyword="OK")
67+
time.sleep(0.1)
68+
self.reset()
69+
70+
def continuous_output_value(self, start=1):
71+
cmd = "AT+switchdis=" + str(start) + "\r\n"
72+
self.at_cmd_send(cmd, keyword="OK")
73+
self.continuous_op = bool(start)
74+
75+
def at_cmd_send(self, cmd, keyword=None, timeout=2):
76+
if self._debug:
77+
print("AT command:" + cmd[:-2])
78+
self._uart.read()
79+
time.sleep(0.1)
80+
self._uart.write(cmd)
81+
wait_time = time.time() + timeout
82+
msgs = []
83+
find_keyword = False
84+
time.sleep(0.1)
85+
while wait_time > time.time():
86+
time.sleep(0.05)
87+
line = self._uart.readline()
88+
if line is not None:
89+
line = line.decode()
90+
msgs.append(line)
91+
elif line is None or line == "":
92+
continue
93+
if keyword is not None and keyword in line:
94+
if self._debug:
95+
print("Got KEYWORD")
96+
find_keyword = True
97+
if find_keyword == True and self._uart.any() == 0:
98+
break
99+
100+
if self._debug:
101+
print(msgs)
102+
return msgs
103+
104+
def remove_lfcr(self, msgs):
105+
# remove "\r\n"
106+
respon = []
107+
for i in msgs:
108+
if i != "\r\n":
109+
if len(i) > 2 and i[-2:] == "\r\n":
110+
i = i[:-2]
111+
respon.append(i)
112+
if self._debug:
113+
print(respon)
114+
return respon
115+
116+
def update_new_value_loop(self):
117+
rx_times = 0
118+
while True:
119+
if self._uart.any() < 40:
120+
break
121+
while (rx_times < 4) and self.continuous_op and ("TAG ID:" in self.device_id):
122+
if self._uart.any():
123+
line = self._uart.readline()
124+
if line is not None:
125+
if UWB_RESULT_HEADER in line:
126+
line = line.decode()
127+
separate = line.find(":")
128+
if rx_times == int(line[separate - 1]):
129+
self.get_distance_measure[int(line[separate - 1])] = float(
130+
line[separate + 1 : -3]
131+
)
132+
rx_times += 1
133+

0 commit comments

Comments
 (0)