Skip to content

Commit 63a8752

Browse files
committed
tools.checksum: new module for checksums
Checksums are used in various places, so it makes sense for them to have their own helper module.
1 parent f415459 commit 63a8752

File tree

5 files changed

+152
-129
lines changed

5 files changed

+152
-129
lines changed

pybricksdev/connections.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,23 @@
1010
import struct
1111

1212
import asyncssh
13-
from bleak.backends.device import BLEDevice
14-
from bleak import BleakClient
1513
import semver
14+
from bleak import BleakClient
15+
from bleak.backends.device import BLEDevice
1616
from tqdm.auto import tqdm
1717
from tqdm.contrib.logging import logging_redirect_tqdm
1818

1919
from .ble import BLEConnection
2020
from .ble.nus import NUS_RX_UUID, NUS_TX_UUID
2121
from .ble.pybricks import (
22-
Event,
2322
PYBRICKS_CONTROL_UUID,
2423
PYBRICKS_PROTOCOL_VERSION,
2524
SW_REV_UUID,
25+
Event,
2626
Status,
2727
)
2828
from .compile import compile_file
29+
from .tools.checksum import xor_bytes
2930
from .usbconnection import USBConnection
3031

3132
logger = logging.getLogger(__name__)
@@ -249,9 +250,7 @@ async def send_message(self, data):
249250
raise ValueError("Cannot send this much data at once")
250251

251252
# Compute expected reply
252-
checksum = 0
253-
for b in data:
254-
checksum ^= b
253+
checksum = xor_bytes(data, 0)
255254

256255
# Clear existing checksum
257256
self.prepare_checksum()
@@ -746,15 +745,9 @@ async def disconnect(self):
746745
else:
747746
logger.debug("already disconnected")
748747

749-
def get_checksum(self, block):
750-
checksum = 0
751-
for b in block:
752-
checksum ^= b
753-
return checksum
754-
755748
async def send_block(self, data):
756749
self.checksum_ready.clear()
757-
self.expected_checksum = self.get_checksum(data)
750+
self.expected_checksum = xor_bytes(data, 0)
758751
try:
759752
await self.client.write_gatt_char(NUS_RX_UUID, bytearray(data), False)
760753
await asyncio.wait_for(self.checksum_ready.wait(), timeout=0.5)

pybricksdev/flash.py

Lines changed: 6 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -3,139 +3,29 @@
33

44
import asyncio
55
import io
6-
from collections import namedtuple
76
import json
87
import logging
98
import os
109
import platform
1110
import struct
1211
import sys
12+
import typing
13+
import zipfile
14+
from collections import namedtuple
1315
from typing import Dict, Tuple
16+
1417
from tqdm.auto import tqdm
1518
from tqdm.contrib.logging import logging_redirect_tqdm
16-
import typing
17-
import zipfile
1819

1920
from .ble import BLERequestsConnection
2021
from .ble.lwp3 import BootloaderCommand
2122
from .ble.lwp3.bytecodes import HubKind
22-
from .compile import save_script, compile_file
23+
from .compile import compile_file, save_script
24+
from .tools.checksum import crc32_checksum, sum_complement
2325

2426
logger = logging.getLogger(__name__)
2527

2628

27-
def sum_complement(fw, max_size):
28-
"""Calculates the checksum of a firmware file using the sum complement
29-
method of adding each 32-bit word and the returning the two's complement
30-
as the checksum.
31-
32-
Arguments:
33-
fw (file):
34-
The firmware file (a binary buffer - e.g. a file opened in 'rb' mode)
35-
max_size (int):
36-
The maximum size of the firmware file.
37-
38-
Returns:
39-
int: The correction needed to make the checksum of the file == 0.
40-
"""
41-
checksum = 0
42-
size = 0
43-
44-
while True:
45-
word = fw.read(4)
46-
if not word:
47-
break
48-
checksum += struct.unpack("I", word)[0]
49-
size += 4
50-
51-
if size > max_size:
52-
print('firmware + main.mpy is too large"', file=sys.stderr)
53-
exit(1)
54-
55-
for _ in range(size, max_size, 4):
56-
checksum += 0xFFFFFFFF
57-
58-
checksum &= 0xFFFFFFFF
59-
correction = checksum and (1 << 32) - checksum or 0
60-
61-
return correction
62-
63-
64-
# thanks https://stackoverflow.com/a/33152544/1976323
65-
66-
_CRC_TABLE = (
67-
0x00000000,
68-
0x04C11DB7,
69-
0x09823B6E,
70-
0x0D4326D9,
71-
0x130476DC,
72-
0x17C56B6B,
73-
0x1A864DB2,
74-
0x1E475005,
75-
0x2608EDB8,
76-
0x22C9F00F,
77-
0x2F8AD6D6,
78-
0x2B4BCB61,
79-
0x350C9B64,
80-
0x31CD86D3,
81-
0x3C8EA00A,
82-
0x384FBDBD,
83-
)
84-
85-
86-
def _dword(value):
87-
return value & 0xFFFFFFFF
88-
89-
90-
def _crc32_fast(crc, data):
91-
crc, data = _dword(crc), _dword(data)
92-
crc ^= data
93-
for _ in range(8):
94-
crc = _dword(crc << 4) ^ _CRC_TABLE[crc >> 28]
95-
return crc
96-
97-
98-
def _crc32_fast_block(crc, buffer):
99-
for data in buffer:
100-
crc = _crc32_fast(crc, data)
101-
return crc
102-
103-
104-
def crc32_checksum(fw, max_size):
105-
"""Calculate the checksum of a firmware file using CRC-32 as implemented
106-
in STM32 microprocessors.
107-
108-
Parameters
109-
----------
110-
fw : file
111-
The firmware file (a binary buffer - e.g. a file opened in 'rb' mode)
112-
max_size : int
113-
The maximum size of the firmware file.
114-
115-
Returns
116-
-------
117-
int
118-
The checksum
119-
"""
120-
121-
# remove the last 4 bytes that are the placeholder for the checksum
122-
try:
123-
fw = fw.read()[:-4]
124-
except AttributeError:
125-
fw = fw[:-4]
126-
if len(fw) + 4 > max_size:
127-
raise ValueError("File is too large")
128-
129-
if len(fw) & 3:
130-
raise ValueError("bytes_data length must be multiple of four")
131-
132-
crc = 0xFFFFFFFF
133-
for index in range(0, len(fw), 4):
134-
data = int.from_bytes(fw[index : index + 4], "little")
135-
crc = _crc32_fast(crc, data)
136-
return crc
137-
138-
13929
async def create_firmware(
14030
firmware_zip: typing.Union[str, os.PathLike, typing.BinaryIO]
14131
) -> Tuple[bytes, dict]:

pybricksdev/tools/__init__.py

Whitespace-only changes.

pybricksdev/tools/checksum.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# SPDX-License-Identifier: MIT
2+
# Copyright (c) 2021 The Pybricks Authors
3+
4+
"""
5+
Helper functions for calculating checksums.
6+
"""
7+
8+
from io import BytesIO
9+
10+
11+
def xor_bytes(data: bytes, init: int = 0xFF) -> int:
12+
"""
13+
Computes a checksum by xoring each byte in ``data`` starting with ``init``.
14+
15+
Args:
16+
data: The byte data to perform the checksum on.
17+
init: The initial value of the checksum.
18+
19+
Returns:
20+
The calculated checksum.
21+
"""
22+
checksum = init
23+
24+
for b in data:
25+
checksum ^= b
26+
27+
return checksum
28+
29+
30+
def sum_complement(data: BytesIO, max_size: int) -> int:
31+
"""
32+
Calculates the checksum of using the sum complement method of adding each
33+
32-bit word (little-endian) and the returning the two's complement as the
34+
checksum.
35+
36+
Arguments:
37+
data:
38+
A binary buffer - e.g. a file opened in 'rb' mode.
39+
max_size:
40+
The maximum size of the firmware file.
41+
42+
Returns:
43+
The correction needed to make the checksum == 0.
44+
"""
45+
checksum = 0
46+
size = 0
47+
48+
while True:
49+
word = data.read(4)
50+
if not word:
51+
break
52+
checksum += int.from_bytes(word, "little")
53+
size += 4
54+
55+
if size > max_size:
56+
raise ValueError("data is too large")
57+
58+
for _ in range(size, max_size, 4):
59+
checksum += 0xFFFFFFFF
60+
61+
checksum &= 0xFFFFFFFF
62+
correction = checksum and (1 << 32) - checksum or 0
63+
64+
return correction
65+
66+
67+
# thanks https://stackoverflow.com/a/33152544/1976323
68+
69+
_CRC_TABLE = (
70+
0x00000000,
71+
0x04C11DB7,
72+
0x09823B6E,
73+
0x0D4326D9,
74+
0x130476DC,
75+
0x17C56B6B,
76+
0x1A864DB2,
77+
0x1E475005,
78+
0x2608EDB8,
79+
0x22C9F00F,
80+
0x2F8AD6D6,
81+
0x2B4BCB61,
82+
0x350C9B64,
83+
0x31CD86D3,
84+
0x3C8EA00A,
85+
0x384FBDBD,
86+
)
87+
88+
89+
def _dword(value):
90+
return value & 0xFFFFFFFF
91+
92+
93+
def _crc32_fast(crc, data):
94+
crc, data = _dword(crc), _dword(data)
95+
crc ^= data
96+
for _ in range(8):
97+
crc = _dword(crc << 4) ^ _CRC_TABLE[crc >> 28]
98+
return crc
99+
100+
101+
def crc32_checksum(data: BytesIO, max_size: int) -> int:
102+
"""
103+
Calculate the checksum using CRC-32 as implemented in STM32 microprocessors.
104+
105+
Args:
106+
data:
107+
A binary buffer - e.g. a file opened in 'rb' mode.
108+
max_size:
109+
The maximum size of the firmware file.
110+
111+
Returns:
112+
The checksum.
113+
"""
114+
115+
# remove the last 4 bytes that are the placeholder for the checksum
116+
try:
117+
data = data.read()[:-4]
118+
except AttributeError:
119+
data = data[:-4]
120+
121+
if len(data) + 4 > max_size:
122+
raise ValueError("data is too large")
123+
124+
if len(data) & 3:
125+
raise ValueError("bytes_data length must be multiple of four")
126+
127+
crc = 0xFFFFFFFF
128+
for index in range(0, len(data), 4):
129+
data = int.from_bytes(data[index : index + 4], "little")
130+
crc = _crc32_fast(crc, data)
131+
return crc

tests/test_checksums.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from pybricksdev.tools.checksum import xor_bytes
2+
3+
4+
def test_xor_bytes():
5+
assert xor_bytes(b"\x00") == 0xFF ^ 0x00
6+
assert xor_bytes(b"\x00", 0x00) == 0x00 ^ 0x00
7+
assert xor_bytes(b"\xFF") == 0xFF ^ 0xFF
8+
assert xor_bytes(b"\xFF", 0x00) == 0x00 ^ 0xFF
9+
assert xor_bytes(b"\x01\x02\x03\x04") == 0xFF ^ 0x01 ^ 0x02 ^ 0x03 ^ 0x04

0 commit comments

Comments
 (0)