-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsend_uart.py
More file actions
executable file
·112 lines (86 loc) · 2.78 KB
/
send_uart.py
File metadata and controls
executable file
·112 lines (86 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import serial
import time
# #设置单个轮子速度
# #$S 0,500.0*44\r\n (设置0号轮目标速度为 500 RPM)
# msg_0 = "$S 0,500.0*44\r\n"
# msg_1 = "$S 1,-300.5*6B\r\n"
# msg_2 = "$S 2,500*58\r\n"
# 打开串口
ser = serial.Serial(
port="/dev/ttyTHS1", # 串口号
baudrate=921600, # 波特率(根据你的设备要求修改)
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1 # 读超时
)
#计算校验和:
def add_checksum(cmd: str) -> str:
"""
输入: 形如 "$S 0,500.0*" 的字符串
输出: 自动加上正确校验和的完整命令,结尾带 \r\n
"""
if not cmd.startswith("$"):
raise ValueError("命令必须以 $ 开头")
if "*" not in cmd:
raise ValueError("命令里必须带 *")
# 截取 $ 和 * 之间的内容
content = cmd[1:cmd.index("*")]
# 逐字符异或
checksum = 0
for c in content:
checksum ^= ord(c)
# 拼接完整命令
return f"${content}*{checksum:02X}\r\n"
# 需要发送的数据
msg = add_checksum("$A 0,-300,0*")
stop_ms = "$A 0,0,0*51\r\n"
# 使用正确的查询命令
read_msg = "$Q*51\r\n"
def parse_response(response_str):
"""解析响应数据"""
try:
if response_str.startswith("$R "):
# 提取速度数据 $R vx,vy,vz*checksum
data_part = response_str[3:response_str.rfind("*")]
vx, vy, vz = data_part.split(",")
return {
"vx": float(vx),
"vy": float(vy),
"vz": float(vz)
}
except Exception as e:
print(f"解析响应错误: {e}")
return None
while True:
ser.write(msg.encode("ascii")) # 写入数据
print(f"Sent: {msg.strip()}")
# 等待一下确保命令发送完成
time.sleep(0.1)
# 清空缓冲区
ser.flushInput()
ser.flushOutput()
# 发送查询命令
ser.write(read_msg.encode("ascii"))
print(f"Query sent: {read_msg.strip()}")
# 等待响应
time.sleep(0.2)
# 读取响应
response = ser.read(200)
print(f"Raw response: {response}")
if response:
try:
decoded = response.decode("ascii")
print(f"Received: {decoded.strip()}")
# 解析速度数据
speed_data = parse_response(decoded.strip())
if speed_data:
print(f"速度数据: vx={speed_data['vx']}, vy={speed_data['vy']}, vz={speed_data['vz']}")
except UnicodeDecodeError as e:
print(f"Decode error: {e}")
print(f"Raw bytes: {response}")
else:
print("No response received")
time.sleep(10)
ser.write(stop_ms.encode("ascii"))
time.sleep(3)