1
+ from cobs import cobsr
2
+ import serial
3
+ import argparse
4
+ import struct
5
+ from crc16 import crc16xmodem as crc16
6
+
7
+ CHUNK_SIZE = 128
8
+
9
+
10
+ # Header format:
11
+ # Little Endian
12
+ # CRC16 (2-bytes)
13
+ # Sequence Number (2-bytes)
14
+ # Payload Size (2-bytes)
15
+ HEADER_FORMAT = "<HH" .format (CHUNK_SIZE )
16
+
17
+
18
+ def read_in_chunks (file_object , chunk_size = 256 ):
19
+ while True :
20
+ data = file_object .read (chunk_size )
21
+ if not data :
22
+ break
23
+ yield data
24
+
25
+ def main ():
26
+
27
+ parser = argparse .ArgumentParser (description = 'Upload an update over SerialOTA' )
28
+ parser .add_argument ('binary' , help = 'path to binary to upload over SerialOTA' )
29
+ parser .add_argument ('port' , help = 'Serial port to use for SerialOTA' )
30
+
31
+ args = parser .parse_args ()
32
+
33
+ # Create the OTA serial
34
+ ser = serial .Serial (args .port , 115200 , timeout = 0.1 )
35
+
36
+ # Open the binary
37
+ with open (args .binary , 'rb' ) as f :
38
+
39
+ # Start by sending 0 bytes until we get a response
40
+ while True :
41
+ ser .write (bytes ([0 ]))
42
+ rx = ser .read ()
43
+ print ("received: {}" .format (rx ))
44
+ if rx == bytes ([1 ]):
45
+ print ("Received bootloader ACK -- waiting for ready" )
46
+ break
47
+
48
+ while True :
49
+ # Wait until the bootloader sends another response
50
+ rx = ser .read ()
51
+ print ("received: {}" .format (rx ))
52
+ if rx == bytes ([2 ]):
53
+ print ("Bootloader ready -- begin upload" )
54
+ break
55
+
56
+ ser .flushInput ()
57
+
58
+ sequence_num = 0
59
+ for chunk in read_in_chunks (f , CHUNK_SIZE ):
60
+
61
+ if len (chunk ) != CHUNK_SIZE :
62
+ sequence_num = 0xFFFF
63
+
64
+ # Pack the header
65
+ tx_buf = struct .pack (HEADER_FORMAT , sequence_num , len (chunk )) + chunk
66
+
67
+ # Calculate the CRC16 of the chunk and the header info
68
+ crc = crc16 (bytes (tx_buf ))
69
+ print ("Sending chunk: crc({}), seq num({}), size({})" .format (crc , sequence_num , len (chunk )))
70
+
71
+ tx_buf = struct .pack ("<H" , crc ) + tx_buf
72
+
73
+ # Encode with COBS
74
+ tx_cobs = cobsr .encode (tx_buf ) + bytes ([0 ])
75
+ print (type (tx_cobs ))
76
+ ser .write (tx_cobs )
77
+ print (tx_cobs .hex ())
78
+
79
+ while (True ):
80
+ # Wait until the bootloader is ready
81
+ c = ser .read ()
82
+ print ("rx:{}" .format (c ))
83
+ if c == bytes ([0 ]):
84
+ break
85
+
86
+ sequence_num = sequence_num + 1
87
+
88
+ if __name__ == '__main__' :
89
+ main ()
0 commit comments