Skip to content

Commit 7f997bd

Browse files
authored
fix: add erasure reDecoder for evicted chunks (#5097)
1 parent 7bd8512 commit 7f997bd

File tree

4 files changed

+309
-15
lines changed

4 files changed

+309
-15
lines changed

pkg/file/joiner/joiner.go

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,24 @@ func fingerprint(addrs []swarm.Address) string {
6565
return string(h.Sum(nil))
6666
}
6767

68+
// createRemoveCallback returns a function that handles the cleanup after a recovery attempt
69+
func (g *decoderCache) createRemoveCallback(key string) func(error) {
70+
return func(err error) {
71+
g.mu.Lock()
72+
defer g.mu.Unlock()
73+
if err != nil {
74+
// signals that a new getter is needed to reattempt to recover the data
75+
delete(g.cache, key)
76+
} else {
77+
// signals that the chunks were fetched/recovered/cached so a future getter is not needed
78+
// The nil value indicates a successful recovery
79+
g.cache[key] = nil
80+
}
81+
}
82+
}
83+
6884
// GetOrCreate returns a decoder for the given chunk address
6985
func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter {
70-
7186
// since a recovery decoder is not allowed, simply return the underlying netstore
7287
if g.config.Strict && g.config.Strategy == getter.NONE {
7388
return g.fetcher
@@ -83,22 +98,31 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
8398
d, ok := g.cache[key]
8499
if ok {
85100
if d == nil {
86-
return g.fetcher
101+
// The nil value indicates a previous successful recovery
102+
// Create a new decoder but only use it as fallback if network fetch fails
103+
decoderCallback := g.createRemoveCallback(key)
104+
105+
// Create a factory function that will instantiate the decoder only when needed
106+
recovery := func() storage.Getter {
107+
g.config.Logger.Debug("lazy-creating recovery decoder after fetch failed", "key", key)
108+
g.mu.Lock()
109+
defer g.mu.Unlock()
110+
d, ok := g.cache[key]
111+
if ok && d != nil {
112+
return d
113+
}
114+
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config)
115+
g.cache[key] = d
116+
return d
117+
}
118+
119+
return getter.NewReDecoder(g.fetcher, recovery, g.config.Logger)
87120
}
88121
return d
89122
}
90-
remove := func(err error) {
91-
g.mu.Lock()
92-
defer g.mu.Unlock()
93-
if err != nil {
94-
// signals that a new getter is needed to reattempt to recover the data
95-
delete(g.cache, key)
96-
} else {
97-
// signals that the chunks were fetched/recovered/cached so a future getter is not needed
98-
g.cache[key] = nil
99-
}
100-
}
101-
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, remove, g.config)
123+
124+
removeCallback := g.createRemoveCallback(key)
125+
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config)
102126
g.cache[key] = d
103127
return d
104128
}

pkg/file/joiner/redecoder_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// Copyright 2025 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package joiner_test
6+
7+
import (
8+
"bytes"
9+
"context"
10+
"strconv"
11+
"testing"
12+
13+
"github.com/ethersphere/bee/v2/pkg/cac"
14+
"github.com/ethersphere/bee/v2/pkg/file/joiner"
15+
"github.com/ethersphere/bee/v2/pkg/file/redundancy/getter"
16+
"github.com/ethersphere/bee/v2/pkg/log"
17+
"github.com/ethersphere/bee/v2/pkg/storage"
18+
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
19+
"github.com/ethersphere/bee/v2/pkg/swarm"
20+
"github.com/klauspost/reedsolomon"
21+
)
22+
23+
// TestReDecoderFlow tests the complete flow of:
24+
// 1. Loading data with redundancy getter
25+
// 2. Successful recovery which nulls the decoder
26+
// 3. Chunk eviction from cache
27+
// 4. Reloading the same data through ReDecoder fallback
28+
func TestReDecoderFlow(t *testing.T) {
29+
ctx := context.Background()
30+
dataShardCount := 4
31+
parityShardCount := 2
32+
totalShardCount := dataShardCount + parityShardCount
33+
34+
// Create real data chunks with proper content
35+
dataShards := make([][]byte, dataShardCount)
36+
for i := 0; i < dataShardCount; i++ {
37+
// Create chunks with simpler test data
38+
dataShards[i] = make([]byte, swarm.ChunkWithSpanSize)
39+
// Create a unique string for this shard
40+
testData := []byte("test-data-" + strconv.Itoa(i))
41+
// Copy as much as will fit
42+
copy(dataShards[i], testData)
43+
}
44+
45+
// Create parity chunks using Reed-Solomon encoding
46+
parityShards := make([][]byte, parityShardCount)
47+
for i := 0; i < parityShardCount; i++ {
48+
parityShards[i] = make([]byte, swarm.ChunkWithSpanSize)
49+
}
50+
51+
// Create Reed-Solomon encoder
52+
enc, err := reedsolomon.New(dataShardCount, parityShardCount)
53+
if err != nil {
54+
t.Fatalf("Failed to create Reed-Solomon encoder: %v", err)
55+
}
56+
57+
// Combine data and parity shards
58+
allShards := make([][]byte, totalShardCount)
59+
copy(allShards, dataShards)
60+
copy(allShards[dataShardCount:], parityShards)
61+
62+
// Encode to generate parity chunks
63+
if err := enc.Encode(allShards); err != nil {
64+
t.Fatalf("Failed to encode data: %v", err)
65+
}
66+
67+
// Create content-addressed chunks for all shards
68+
addresses := make([]swarm.Address, totalShardCount)
69+
chunks := make([]swarm.Chunk, totalShardCount)
70+
71+
for i := 0; i < totalShardCount; i++ {
72+
// Create proper content-addressed chunks
73+
chunk, err := cac.NewWithDataSpan(allShards[i])
74+
if err != nil {
75+
t.Fatalf("Failed to create content-addressed chunk %d: %v", i, err)
76+
}
77+
chunks[i] = chunk
78+
addresses[i] = chunk.Address()
79+
}
80+
81+
// Select a data chunk to be missing (which will be recovered)
82+
missingChunkIndex := 2 // The third data chunk will be missing
83+
mockStore := inmemchunkstore.New()
84+
netFetcher := newMockNetworkFetcher(addresses, addresses[missingChunkIndex])
85+
config := getter.Config{
86+
Strategy: getter.RACE,
87+
Logger: log.Noop,
88+
}
89+
90+
j := joiner.NewDecoderCache(netFetcher, mockStore, config)
91+
92+
// Step 1: Initializing decoder and triggering recovery
93+
decoder := j.GetOrCreate(addresses, dataShardCount)
94+
if decoder == nil {
95+
t.Fatal("Failed to create decoder")
96+
}
97+
98+
// Verify we can now fetch the previously missing chunk through recovery
99+
recoveredChunk, err := decoder.Get(ctx, addresses[missingChunkIndex])
100+
if err != nil {
101+
t.Fatalf("Failed to recover missing chunk: %v", err)
102+
}
103+
// Verify the recovered chunk has the correct content
104+
if !recoveredChunk.Address().Equal(addresses[missingChunkIndex]) {
105+
t.Fatalf("Recovered chunk has incorrect address")
106+
}
107+
108+
// Verify the recovered chunk has the correct content
109+
recoveredData := recoveredChunk.Data()
110+
expectedData := chunks[missingChunkIndex].Data()
111+
if len(recoveredData) != len(expectedData) {
112+
t.Fatalf("Recovered chunk has incorrect data length: got %d, want %d", len(recoveredData), len(expectedData))
113+
}
114+
if !bytes.Equal(recoveredData, expectedData) {
115+
t.Fatalf("Recovered chunk has incorrect data")
116+
}
117+
// Check if the recovered chunk is now in the store
118+
_, err = mockStore.Get(ctx, addresses[missingChunkIndex])
119+
if err != nil {
120+
t.Fatalf("Recovered chunk not saved to store: %v", err)
121+
}
122+
123+
// Step 2: The original decoder should be automatically nulled after successful recovery
124+
// This is an internal state check, we can't directly test it but we can verify that
125+
// we can still access the chunks
126+
127+
// Sanity check - verify we can still fetch chunks through the cache
128+
for i := 0; i < dataShardCount; i++ {
129+
_, err := decoder.Get(ctx, addresses[i])
130+
if err != nil {
131+
t.Fatalf("Failed to get chunk %d after recovery: %v", i, err)
132+
}
133+
}
134+
135+
// Step 3: Testing ReDecoder fallback
136+
newDecoder := j.GetOrCreate(addresses, dataShardCount)
137+
if newDecoder == nil {
138+
t.Fatal("Failed to create ReDecoder")
139+
}
140+
141+
// Verify all chunks can be fetched through the ReDecoder
142+
for i := 0; i < dataShardCount; i++ {
143+
_, err := newDecoder.Get(ctx, addresses[i])
144+
if err != nil {
145+
t.Fatalf("Failed to get chunk %d through ReDecoder: %v", i, err)
146+
}
147+
}
148+
149+
// Verify that we can also access the first missing chunk - now from the store
150+
// This would be using the local store and not network or recovery mechanisms
151+
retrievedChunk, err := newDecoder.Get(ctx, addresses[missingChunkIndex])
152+
if err != nil {
153+
t.Fatalf("Failed to retrieve previously recovered chunk: %v", err)
154+
}
155+
156+
if !retrievedChunk.Address().Equal(addresses[missingChunkIndex]) {
157+
t.Fatalf("Retrieved chunk has incorrect address")
158+
}
159+
160+
// Also verify the data content matches
161+
retrievedData := retrievedChunk.Data()
162+
expectedData = chunks[missingChunkIndex].Data()
163+
if len(retrievedData) != len(expectedData) {
164+
t.Fatalf("Retrieved chunk has incorrect data length: got %d, want %d", len(retrievedData), len(expectedData))
165+
}
166+
if !bytes.Equal(retrievedData, expectedData) {
167+
t.Fatalf("Retrieved chunk has incorrect data")
168+
}
169+
}
170+
171+
// Mock implementation of storage.Getter for testing
172+
type mockNetworkFetcher struct {
173+
allAddresses []swarm.Address
174+
missingAddr swarm.Address
175+
}
176+
177+
// newMockNetworkFetcher creates a new mock fetcher that will return ErrNotFound for specific addresses
178+
func newMockNetworkFetcher(allAddrs []swarm.Address, missingAddr swarm.Address) *mockNetworkFetcher {
179+
return &mockNetworkFetcher{
180+
allAddresses: allAddrs,
181+
missingAddr: missingAddr,
182+
}
183+
}
184+
185+
// Get implements storage.Getter interface
186+
func (m *mockNetworkFetcher) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) {
187+
// Simulate network fetch - fail for the missing chunk
188+
if addr.Equal(m.missingAddr) {
189+
return nil, storage.ErrNotFound
190+
}
191+
192+
// Find the index of this address in the all addresses list
193+
var index int
194+
for i, a := range m.allAddresses {
195+
if addr.Equal(a) {
196+
index = i
197+
break
198+
}
199+
}
200+
201+
// Generate data using the same pattern as in the test
202+
data := make([]byte, swarm.ChunkWithSpanSize)
203+
// Create a unique string for this shard
204+
testData := []byte("test-data-" + strconv.Itoa(index))
205+
// Copy as much as will fit
206+
copy(data, testData)
207+
208+
return swarm.NewChunk(addr, data), nil
209+
}

pkg/file/redundancy/getter/getter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ func (g *decoder) setData(i int, chdata []byte) {
340340
func (g *decoder) getData(i int) []byte {
341341
g.mu.Lock()
342342
defer g.mu.Unlock()
343-
if i == g.shardCnt-1 && g.lastLen > 0 {
343+
if i == g.shardCnt-1 && g.lastLen > 0 && g.rsbuf[i] != nil {
344344
return g.rsbuf[i][:g.lastLen] // cut padding
345345
}
346346
return g.rsbuf[i]
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2025 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package getter
6+
7+
import (
8+
"context"
9+
"errors"
10+
11+
"github.com/ethersphere/bee/v2/pkg/log"
12+
"github.com/ethersphere/bee/v2/pkg/storage"
13+
"github.com/ethersphere/bee/v2/pkg/swarm"
14+
)
15+
16+
// Recovery is a function that creates a recovery decoder on demand
17+
type Recovery func() storage.Getter
18+
19+
// ReDecoder is a wrapper around a Getter that first attempts to fetch a chunk directly
20+
// from the network, and only falls back to recovery if the network fetch fails.
21+
// This is used to handle cases where previously recovered chunks have been evicted from cache.
22+
type ReDecoder struct {
23+
fetcher storage.Getter // Direct fetcher (usually netstore)
24+
recovery Recovery // Factory function to create recovery decoder on demand
25+
logger log.Logger
26+
}
27+
28+
// NewReDecoder creates a new ReDecoder instance with the provided fetcher and recovery factory.
29+
// The recovery decoder will only be created if needed (when network fetch fails).
30+
func NewReDecoder(fetcher storage.Getter, recovery Recovery, logger log.Logger) *ReDecoder {
31+
return &ReDecoder{
32+
fetcher: fetcher,
33+
recovery: recovery,
34+
logger: logger,
35+
}
36+
}
37+
38+
// Get implements the storage.Getter interface.
39+
// It first attempts to fetch the chunk directly from the network.
40+
// If that fails with ErrNotFound, it then creates the recovery decoder and attempts to recover the chunk.
41+
func (rd *ReDecoder) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) {
42+
// First try to get the chunk directly from the network
43+
chunk, err := rd.fetcher.Get(ctx, addr)
44+
if err == nil {
45+
return chunk, nil
46+
}
47+
48+
// Only attempt recovery if the chunk was not found
49+
if !errors.Is(err, storage.ErrNotFound) {
50+
return nil, err
51+
}
52+
53+
// Log that we're falling back to recovery
54+
rd.logger.Debug("chunk not found in network, creating recovery decoder", "address", addr)
55+
56+
// Create the recovery decoder on demand
57+
recovery := rd.recovery()
58+
59+
// Attempt to recover the chunk
60+
return recovery.Get(ctx, addr)
61+
}

0 commit comments

Comments
 (0)