Skip to content

Commit 18b2092

Browse files
committed
raw subscribe test
1 parent d453a34 commit 18b2092

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed

tests/rawsubscribe.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""
2+
This is a script to prove
3+
rosserial topic negotiation
4+
and publish
5+
"""
6+
7+
import machine as m
8+
import uio
9+
import struct
10+
from time import sleep, sleep_ms, sleep_us
11+
12+
#rosserial protocol header
13+
header=bytearray([0xff,0xfe])
14+
15+
#little-endian function
16+
def le(h):
17+
h &= 0xffff
18+
return [h & 0xff, h >> 8]
19+
20+
#generic checksum
21+
def checksum(arr):
22+
return 255-((sum(arr))%256)
23+
24+
#defined baudrate to use
25+
baudrate=57600
26+
27+
#uart definition for tx2 and rx2
28+
uart = m.UART(2,baudrate)
29+
uart.init(baudrate, bits=8, parity=None, stop=1, txbuf=0)
30+
31+
#values for topic negotiation
32+
topic_id=1
33+
topic_name="chatter"
34+
message_type="std_msgs/String"
35+
md5sum='992ce8a1687cec8c8bd883ec73ca41d1'
36+
buffer_size=100
37+
38+
#packet serialization for negotiation
39+
packettopic=uio.StringIO()
40+
def serialize(packet):
41+
packettopic.write(struct.pack('<H',topic_id))
42+
packettopic.write(struct.pack('<I%ss'%len(topic_name),len(topic_name),topic_name))
43+
packettopic.write(struct.pack('<I%ss'%len(message_type),len(message_type),message_type))
44+
packettopic.write(struct.pack('<I%ss'%len(md5sum),len(md5sum),md5sum))
45+
packettopic.write(struct.pack('<i',buffer_size))
46+
serialize(packettopic)
47+
packettopic=packettopic.getvalue().encode('utf-8')
48+
packetlen=len(list(packettopic))
49+
checksumlen=checksum([packetlen])
50+
checksumpack=list([checksum(list([1,0]+list(packettopic)))])
51+
52+
listo=0
53+
54+
while True:
55+
data = uart.read()
56+
if data==b'\xff\xfe\x00\x00\xff\x00\x00\xff':
57+
#negotiation is made
58+
uart.write(header)
59+
uart.write(bytearray(le(packetlen)))
60+
uart.write(bytearray([checksumlen]))
61+
uart.write(bytearray([1,0]))
62+
uart.write(packettopic)
63+
uart.write(bytearray(checksumpack))
64+
print('enviado registro de topic')
65+
listo=1
66+
elif listo==1:
67+
data=uart.read()
68+
if data is not None:
69+
print(data[11:33])

0 commit comments

Comments
 (0)