Skip to content

Commit c3b13a6

Browse files
magik6kCopilot
andauthored
fix: fr32: Correct multithreaded Unpad chunk boundary alignment and simplify unpadReader (#13455)
* reader tests, new unpad reader * fix: fr32: multithreaded Unpad chunk boundary alignment * fr32: avoid allocation in NewUnpadReaderBuf * add changelog entry * pieceReader: use readPaddedSize for buffer sizing Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * address important review issue * fix lint --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 7538c47 commit c3b13a6

File tree

6 files changed

+837
-88
lines changed

6 files changed

+837
-88
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ The Lotus and Lotus-Miner v1.34.2 release includes numerous bug fixes, CLI enhan
4545
- fix(miner): ensure sender account exists ([filecoin-project/lotus#13348](https://github.com/filecoin-project/lotus/pull/13348))
4646
- fix(eth): properly return vm error in all gas estimation methods ([filecoin-project/lotus#13389](https://github.com/filecoin-project/lotus/pull/13389))
4747
- chore: all actor cmd support --actor ([filecoin-project/lotus#13391](https://github.com/filecoin-project/lotus/pull/13391))
48+
- fix(fr32): fix data corruption in multithreaded Pad/Unpad for non-aligned sizes
4849

4950
## 📝 Changelog
5051

storage/sealer/fr32/fr32.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,14 @@ func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
4141

4242
func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) {
4343
threads := mtChunkCount(abi.PaddedPieceSize(padLen))
44-
threadBytes := abi.PaddedPieceSize(padLen / int(threads))
44+
45+
// Ensure threadBytes is aligned to 128-byte chunk boundaries.
46+
// Each fr32 chunk is 128 padded bytes / 127 unpadded bytes.
47+
chunksPerThread := (padLen / int(threads)) / 128
48+
if chunksPerThread == 0 {
49+
chunksPerThread = 1
50+
}
51+
threadBytes := abi.PaddedPieceSize(chunksPerThread * 128)
4552

4653
var wg sync.WaitGroup
4754
wg.Add(int(threads))
@@ -53,6 +60,16 @@ func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) {
5360
start := threadBytes * abi.PaddedPieceSize(thread)
5461
end := start + threadBytes
5562

63+
// Last thread takes any remainder
64+
if thread == int(threads)-1 {
65+
end = abi.PaddedPieceSize(padLen)
66+
}
67+
68+
// Skip if this thread has no work
69+
if start >= abi.PaddedPieceSize(padLen) {
70+
return
71+
}
72+
5673
op(in[start.Unpadded():end.Unpadded()], out[start:end])
5774
}(i)
5875
}

storage/sealer/fr32/fr32_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package fr32_test
33
import (
44
"bytes"
55
"crypto/rand"
6+
"fmt"
67
"io"
78
"os"
89
"testing"
@@ -149,6 +150,134 @@ func TestRoundtrip16MRand(t *testing.T) {
149150
require.Equal(t, ffi, buf)
150151
}
151152

153+
// TestRoundtripMisalignedSizes tests the multithreaded Pad/Unpad with sizes that
154+
// previously caused data corruption due to thread boundary misalignment.
155+
// The bug occurred when (padLen / threads) was not a multiple of 128 bytes,
156+
// causing partial chunks at thread boundaries to be skipped.
157+
func TestRoundtripMisalignedSizes(t *testing.T) {
158+
// These sizes are chosen to trigger the multithreaded path (> 512KB)
159+
// and create thread boundaries that don't align to 128-byte chunks.
160+
testCases := []struct {
161+
name string
162+
numChunks int
163+
}{
164+
// 66061 chunks = 8455808 padded bytes
165+
// With 16 threads: 8455808/16 = 528488 bytes per thread
166+
// 528488/128 = 4128.5 - NOT aligned! This was the original bug case.
167+
{"66061_chunks_8MiB_boundary", 66061},
168+
169+
// Various sizes that create misaligned thread boundaries
170+
{"prime_chunks_1009", 1009 * 8}, // ~1MB, prime-ish number of chunks
171+
{"odd_chunks_8193", 8193}, // Just over 8192 (power of 2)
172+
{"odd_chunks_65537", 65537}, // Just over 65536 (power of 2)
173+
{"odd_chunks_100003", 100003}, // Large prime
174+
{"boundary_chunks_66000", 66000}, // Near the original bug size
175+
{"boundary_chunks_70000", 70000}, // Larger odd size
176+
}
177+
178+
for _, tc := range testCases {
179+
t.Run(tc.name, func(t *testing.T) {
180+
unpaddedSize := tc.numChunks * 127
181+
paddedSize := tc.numChunks * 128
182+
183+
// Skip if too large for this test
184+
if paddedSize > 64<<20 {
185+
t.Skip("Size too large for this test")
186+
}
187+
188+
input := make([]byte, unpaddedSize)
189+
_, err := rand.Read(input)
190+
require.NoError(t, err)
191+
192+
padded := make([]byte, paddedSize)
193+
fr32.Pad(input, padded)
194+
195+
output := make([]byte, unpaddedSize)
196+
fr32.Unpad(padded, output)
197+
198+
require.Equal(t, input, output, "Roundtrip failed for %d chunks", tc.numChunks)
199+
})
200+
}
201+
}
202+
203+
// TestUnpadMisalignedThreadBoundaries specifically tests the fix for the
204+
// multithreaded Unpad bug where thread boundaries weren't aligned to
205+
// 128-byte fr32 chunks, causing data loss.
206+
func TestUnpadMisalignedThreadBoundaries(t *testing.T) {
207+
// Create data that's just over 8MiB to trigger the original bug
208+
// 66061 chunks * 127 bytes = 8389747 unpadded bytes
209+
// 66061 chunks * 128 bytes = 8455808 padded bytes
210+
numChunks := 66061
211+
unpaddedSize := numChunks * 127
212+
paddedSize := numChunks * 128
213+
214+
// Create sequential data so we can detect exactly where corruption occurs
215+
input := make([]byte, unpaddedSize)
216+
for i := range input {
217+
input[i] = byte(i & 0xFF)
218+
}
219+
220+
padded := make([]byte, paddedSize)
221+
fr32.Pad(input, padded)
222+
223+
output := make([]byte, unpaddedSize)
224+
fr32.Unpad(padded, output)
225+
226+
// Check for corruption at thread boundaries
227+
// With the original bug, corruption occurred at offsets like:
228+
// 528384 (thread 0/1 boundary), 1056768 (thread 1/2 boundary), etc.
229+
230+
// First verify total length
231+
require.Equal(t, len(input), len(output), "Output length mismatch")
232+
233+
// Check every byte
234+
for i := 0; i < len(input); i++ {
235+
if input[i] != output[i] {
236+
// Find the extent of the corruption
237+
corruptStart := i
238+
corruptEnd := i
239+
for corruptEnd < len(input) && input[corruptEnd] != output[corruptEnd] {
240+
corruptEnd++
241+
}
242+
t.Fatalf("Data corruption at offset %d (0x%x) to %d (0x%x): expected 0x%02x, got 0x%02x (corrupt bytes: %d)",
243+
corruptStart, corruptStart, corruptEnd, corruptEnd,
244+
input[i], output[i], corruptEnd-corruptStart)
245+
}
246+
}
247+
}
248+
249+
// TestPadUnpadVariousSizesAboveMTTresh tests Pad/Unpad roundtrip for various
250+
// sizes above the MTTresh (512KB) threshold that triggers multithreading.
251+
func TestPadUnpadVariousSizesAboveMTTresh(t *testing.T) {
252+
// Test sizes from just above MTTresh to several MB
253+
// These should all use the multithreaded path
254+
sizes := []int{
255+
513 * 1024 / 127 * 127, // Just above 512KB, aligned to chunks
256+
1 * 1024 * 1024 / 127 * 127, // ~1MB aligned
257+
2*1024*1024/127*127 + 127*100, // ~2MB + extra chunks
258+
4*1024*1024/127*127 + 127*333, // ~4MB + odd chunks
259+
8*1024*1024/127*127 + 127*777, // ~8MB + odd chunks
260+
}
261+
262+
for _, unpaddedSize := range sizes {
263+
paddedSize := unpaddedSize / 127 * 128
264+
265+
t.Run(fmt.Sprintf("%d_bytes", unpaddedSize), func(t *testing.T) {
266+
input := make([]byte, unpaddedSize)
267+
_, err := rand.Read(input)
268+
require.NoError(t, err)
269+
270+
padded := make([]byte, paddedSize)
271+
fr32.Pad(input, padded)
272+
273+
output := make([]byte, unpaddedSize)
274+
fr32.Unpad(padded, output)
275+
276+
require.Equal(t, input, output)
277+
})
278+
}
279+
}
280+
152281
func BenchmarkPadChunk(b *testing.B) {
153282
var buf [128]byte
154283
in := bytes.Repeat([]byte{0xff}, 127)

0 commit comments

Comments
 (0)