Skip to content

Commit c2e4439

Browse files
committed
add fixture driver for p1k
1 parent 88127a3 commit c2e4439

File tree

1 file changed

+139
-0
lines changed

1 file changed

+139
-0
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# encoding:utf-8
2+
3+
import time
4+
from typing import Union
5+
import serial
6+
import serial.tools.list_ports
7+
8+
ReceiveBuffer = 100
9+
10+
11+
class SerialDriver:
12+
13+
@classmethod
14+
def get_com_list(cls):
15+
port_list = serial.tools.list_ports.comports()
16+
return port_list
17+
18+
def __init__(self):
19+
self.device = None
20+
self.com = None
21+
22+
def get_device(self):
23+
"""
24+
select device
25+
:return:
26+
"""
27+
port_list = SerialDriver.get_com_list()
28+
print("=" * 5 + "PORT LIST" + "=" * 5)
29+
for index, p in enumerate(port_list):
30+
print(f"{index + 1} >>{p.device}")
31+
select = input("Select Port Number(输入串口号对应的数字):")
32+
self.device = port_list[int(select.strip()) - 1].device
33+
34+
def init_serial(self, baud):
35+
36+
"""
37+
init connection
38+
:param baud:
39+
:return:
40+
"""
41+
self.com = serial.Serial(self.device, baud, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
42+
bytesize=serial.EIGHTBITS, timeout=1)
43+
if self.com.isOpen():
44+
print(f"{self.device} Opened! \n")
45+
# settings
46+
self.com.bytesize = serial.EIGHTBITS # 数据位 8
47+
self.com.parity = serial.PARITY_NONE # 无校验
48+
self.com.stopbits = serial.STOPBITS_ONE # 停止位 1
49+
50+
def close(self):
51+
"""
52+
close com
53+
:return:
54+
"""
55+
self.com.close()
56+
print(f"{self.device} Closed! \n")
57+
58+
def init(self, baud):
59+
"""
60+
main
61+
:return:
62+
"""
63+
self.get_device()
64+
try:
65+
self.init_serial(baud)
66+
except:
67+
print("Can't find device")
68+
69+
def write_and_get_buffer(self, send: Union[str, int, bytes], only_write=False, delay=None, times=30):
70+
"""
71+
send cmd
72+
:return:
73+
"""
74+
if self.com is None:
75+
return
76+
if type(send) is not bytes:
77+
send = (send + "\r\n").encode('utf-8')
78+
self.com.flushInput()
79+
self.com.flushOutput()
80+
self.com.write(send)
81+
time.sleep(0.1)
82+
if delay is None:
83+
pass
84+
else:
85+
time.sleep(delay)
86+
if only_write is True:
87+
return
88+
for i in range(times):
89+
data = self.com.read(ReceiveBuffer)
90+
if type(data) is not bytes:
91+
if "OK" not in data.decode('utf-8') or "busy" in data.decode('utf-8'):
92+
time.sleep(1)
93+
continue
94+
else:
95+
return data
96+
return data.decode('utf-8')
97+
98+
def read_buffer(self):
99+
"""
100+
读取缓存
101+
:return:
102+
"""
103+
try:
104+
self.com.flushInput()
105+
self.com.flushOutput()
106+
except:
107+
pass
108+
time.sleep(3)
109+
length = ReceiveBuffer
110+
data = self.com.read(length)
111+
self.com.flushInput()
112+
self.com.flushOutput()
113+
return data.decode('utf-8')
114+
115+
def get_pressure(self):
116+
"""
117+
analyze pressure value
118+
"""
119+
for _i in range(5):
120+
try:
121+
respond = self.read_buffer()
122+
respond_list = respond.split('|')
123+
respond_value = respond_list[1]
124+
125+
average_value = respond_value.split('\r\n')[0].split('\t')[1].strip()
126+
average_value = float(average_value)
127+
return average_value
128+
except:
129+
print(f"get pressure fail at {_i} times")
130+
pass
131+
132+
133+
134+
if __name__ == '__main__':
135+
s = SerialDriver()
136+
s.init(9600)
137+
for i in range(100):
138+
result = s.get_pressure()
139+
print(result)

0 commit comments

Comments
 (0)