diff --git a/pkg/file/joiner/joiner.go b/pkg/file/joiner/joiner.go index 4d9152357c0..313ea21bd2c 100644 --- a/pkg/file/joiner/joiner.go +++ b/pkg/file/joiner/joiner.go @@ -65,9 +65,24 @@ func fingerprint(addrs []swarm.Address) string { return string(h.Sum(nil)) } +// createRemoveCallback returns a function that handles the cleanup after a recovery attempt +func (g *decoderCache) createRemoveCallback(key string) func(error) { + return func(err error) { + g.mu.Lock() + defer g.mu.Unlock() + if err != nil { + // signals that a new getter is needed to reattempt to recover the data + delete(g.cache, key) + } else { + // signals that the chunks were fetched/recovered/cached so a future getter is not needed + // The nil value indicates a successful recovery + g.cache[key] = nil + } + } +} + // GetOrCreate returns a decoder for the given chunk address func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter { - // since a recovery decoder is not allowed, simply return the underlying netstore if g.config.Strict && g.config.Strategy == getter.NONE { return g.fetcher @@ -83,22 +98,31 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage. d, ok := g.cache[key] if ok { if d == nil { - return g.fetcher + // The nil value indicates a previous successful recovery + // Create a new decoder but only use it as fallback if network fetch fails + decoderCallback := g.createRemoveCallback(key) + + // Create a factory function that will instantiate the decoder only when needed + recovery := func() storage.Getter { + g.config.Logger.Debug("lazy-creating recovery decoder after fetch failed", "key", key) + g.mu.Lock() + defer g.mu.Unlock() + d, ok := g.cache[key] + if ok && d != nil { + return d + } + d = getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config) + g.cache[key] = d + return d + } + + return getter.NewReDecoder(g.fetcher, recovery, g.config.Logger) } return d } - remove := func(err error) { - g.mu.Lock() - defer g.mu.Unlock() - if err != nil { - // signals that a new getter is needed to reattempt to recover the data - delete(g.cache, key) - } else { - // signals that the chunks were fetched/recovered/cached so a future getter is not needed - g.cache[key] = nil - } - } - d = getter.New(addrs, shardCnt, g.fetcher, g.putter, remove, g.config) + + removeCallback := g.createRemoveCallback(key) + d = getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config) g.cache[key] = d return d } diff --git a/pkg/file/joiner/redecoder_test.go b/pkg/file/joiner/redecoder_test.go new file mode 100644 index 00000000000..7df041de76d --- /dev/null +++ b/pkg/file/joiner/redecoder_test.go @@ -0,0 +1,209 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package joiner_test + +import ( + "bytes" + "context" + "strconv" + "testing" + + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/file/joiner" + "github.com/ethersphere/bee/v2/pkg/file/redundancy/getter" + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/klauspost/reedsolomon" +) + +// TestReDecoderFlow tests the complete flow of: +// 1. Loading data with redundancy getter +// 2. Successful recovery which nulls the decoder +// 3. Chunk eviction from cache +// 4. Reloading the same data through ReDecoder fallback +func TestReDecoderFlow(t *testing.T) { + ctx := context.Background() + dataShardCount := 4 + parityShardCount := 2 + totalShardCount := dataShardCount + parityShardCount + + // Create real data chunks with proper content + dataShards := make([][]byte, dataShardCount) + for i := 0; i < dataShardCount; i++ { + // Create chunks with simpler test data + dataShards[i] = make([]byte, swarm.ChunkWithSpanSize) + // Create a unique string for this shard + testData := []byte("test-data-" + strconv.Itoa(i)) + // Copy as much as will fit + copy(dataShards[i], testData) + } + + // Create parity chunks using Reed-Solomon encoding + parityShards := make([][]byte, parityShardCount) + for i := 0; i < parityShardCount; i++ { + parityShards[i] = make([]byte, swarm.ChunkWithSpanSize) + } + + // Create Reed-Solomon encoder + enc, err := reedsolomon.New(dataShardCount, parityShardCount) + if err != nil { + t.Fatalf("Failed to create Reed-Solomon encoder: %v", err) + } + + // Combine data and parity shards + allShards := make([][]byte, totalShardCount) + copy(allShards, dataShards) + copy(allShards[dataShardCount:], parityShards) + + // Encode to generate parity chunks + if err := enc.Encode(allShards); err != nil { + t.Fatalf("Failed to encode data: %v", err) + } + + // Create content-addressed chunks for all shards + addresses := make([]swarm.Address, totalShardCount) + chunks := make([]swarm.Chunk, totalShardCount) + + for i := 0; i < totalShardCount; i++ { + // Create proper content-addressed chunks + chunk, err := cac.NewWithDataSpan(allShards[i]) + if err != nil { + t.Fatalf("Failed to create content-addressed chunk %d: %v", i, err) + } + chunks[i] = chunk + addresses[i] = chunk.Address() + } + + // Select a data chunk to be missing (which will be recovered) + missingChunkIndex := 2 // The third data chunk will be missing + mockStore := inmemchunkstore.New() + netFetcher := newMockNetworkFetcher(addresses, addresses[missingChunkIndex]) + config := getter.Config{ + Strategy: getter.RACE, + Logger: log.Noop, + } + + j := joiner.NewDecoderCache(netFetcher, mockStore, config) + + // Step 1: Initializing decoder and triggering recovery + decoder := j.GetOrCreate(addresses, dataShardCount) + if decoder == nil { + t.Fatal("Failed to create decoder") + } + + // Verify we can now fetch the previously missing chunk through recovery + recoveredChunk, err := decoder.Get(ctx, addresses[missingChunkIndex]) + if err != nil { + t.Fatalf("Failed to recover missing chunk: %v", err) + } + // Verify the recovered chunk has the correct content + if !recoveredChunk.Address().Equal(addresses[missingChunkIndex]) { + t.Fatalf("Recovered chunk has incorrect address") + } + + // Verify the recovered chunk has the correct content + recoveredData := recoveredChunk.Data() + expectedData := chunks[missingChunkIndex].Data() + if len(recoveredData) != len(expectedData) { + t.Fatalf("Recovered chunk has incorrect data length: got %d, want %d", len(recoveredData), len(expectedData)) + } + if !bytes.Equal(recoveredData, expectedData) { + t.Fatalf("Recovered chunk has incorrect data") + } + // Check if the recovered chunk is now in the store + _, err = mockStore.Get(ctx, addresses[missingChunkIndex]) + if err != nil { + t.Fatalf("Recovered chunk not saved to store: %v", err) + } + + // Step 2: The original decoder should be automatically nulled after successful recovery + // This is an internal state check, we can't directly test it but we can verify that + // we can still access the chunks + + // Sanity check - verify we can still fetch chunks through the cache + for i := 0; i < dataShardCount; i++ { + _, err := decoder.Get(ctx, addresses[i]) + if err != nil { + t.Fatalf("Failed to get chunk %d after recovery: %v", i, err) + } + } + + // Step 3: Testing ReDecoder fallback + newDecoder := j.GetOrCreate(addresses, dataShardCount) + if newDecoder == nil { + t.Fatal("Failed to create ReDecoder") + } + + // Verify all chunks can be fetched through the ReDecoder + for i := 0; i < dataShardCount; i++ { + _, err := newDecoder.Get(ctx, addresses[i]) + if err != nil { + t.Fatalf("Failed to get chunk %d through ReDecoder: %v", i, err) + } + } + + // Verify that we can also access the first missing chunk - now from the store + // This would be using the local store and not network or recovery mechanisms + retrievedChunk, err := newDecoder.Get(ctx, addresses[missingChunkIndex]) + if err != nil { + t.Fatalf("Failed to retrieve previously recovered chunk: %v", err) + } + + if !retrievedChunk.Address().Equal(addresses[missingChunkIndex]) { + t.Fatalf("Retrieved chunk has incorrect address") + } + + // Also verify the data content matches + retrievedData := retrievedChunk.Data() + expectedData = chunks[missingChunkIndex].Data() + if len(retrievedData) != len(expectedData) { + t.Fatalf("Retrieved chunk has incorrect data length: got %d, want %d", len(retrievedData), len(expectedData)) + } + if !bytes.Equal(retrievedData, expectedData) { + t.Fatalf("Retrieved chunk has incorrect data") + } +} + +// Mock implementation of storage.Getter for testing +type mockNetworkFetcher struct { + allAddresses []swarm.Address + missingAddr swarm.Address +} + +// newMockNetworkFetcher creates a new mock fetcher that will return ErrNotFound for specific addresses +func newMockNetworkFetcher(allAddrs []swarm.Address, missingAddr swarm.Address) *mockNetworkFetcher { + return &mockNetworkFetcher{ + allAddresses: allAddrs, + missingAddr: missingAddr, + } +} + +// Get implements storage.Getter interface +func (m *mockNetworkFetcher) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) { + // Simulate network fetch - fail for the missing chunk + if addr.Equal(m.missingAddr) { + return nil, storage.ErrNotFound + } + + // Find the index of this address in the all addresses list + var index int + for i, a := range m.allAddresses { + if addr.Equal(a) { + index = i + break + } + } + + // Generate data using the same pattern as in the test + data := make([]byte, swarm.ChunkWithSpanSize) + // Create a unique string for this shard + testData := []byte("test-data-" + strconv.Itoa(index)) + // Copy as much as will fit + copy(data, testData) + + return swarm.NewChunk(addr, data), nil +} diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index 39bf23f87c3..0953c501dc5 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -340,7 +340,7 @@ func (g *decoder) setData(i int, chdata []byte) { func (g *decoder) getData(i int) []byte { g.mu.Lock() defer g.mu.Unlock() - if i == g.shardCnt-1 && g.lastLen > 0 { + if i == g.shardCnt-1 && g.lastLen > 0 && g.rsbuf[i] != nil { return g.rsbuf[i][:g.lastLen] // cut padding } return g.rsbuf[i] diff --git a/pkg/file/redundancy/getter/redecoder.go b/pkg/file/redundancy/getter/redecoder.go new file mode 100644 index 00000000000..0338a069841 --- /dev/null +++ b/pkg/file/redundancy/getter/redecoder.go @@ -0,0 +1,61 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package getter + +import ( + "context" + "errors" + + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// Recovery is a function that creates a recovery decoder on demand +type Recovery func() storage.Getter + +// ReDecoder is a wrapper around a Getter that first attempts to fetch a chunk directly +// from the network, and only falls back to recovery if the network fetch fails. +// This is used to handle cases where previously recovered chunks have been evicted from cache. +type ReDecoder struct { + fetcher storage.Getter // Direct fetcher (usually netstore) + recovery Recovery // Factory function to create recovery decoder on demand + logger log.Logger +} + +// NewReDecoder creates a new ReDecoder instance with the provided fetcher and recovery factory. +// The recovery decoder will only be created if needed (when network fetch fails). +func NewReDecoder(fetcher storage.Getter, recovery Recovery, logger log.Logger) *ReDecoder { + return &ReDecoder{ + fetcher: fetcher, + recovery: recovery, + logger: logger, + } +} + +// Get implements the storage.Getter interface. +// It first attempts to fetch the chunk directly from the network. +// If that fails with ErrNotFound, it then creates the recovery decoder and attempts to recover the chunk. +func (rd *ReDecoder) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) { + // First try to get the chunk directly from the network + chunk, err := rd.fetcher.Get(ctx, addr) + if err == nil { + return chunk, nil + } + + // Only attempt recovery if the chunk was not found + if !errors.Is(err, storage.ErrNotFound) { + return nil, err + } + + // Log that we're falling back to recovery + rd.logger.Debug("chunk not found in network, creating recovery decoder", "address", addr) + + // Create the recovery decoder on demand + recovery := rd.recovery() + + // Attempt to recover the chunk + return recovery.Get(ctx, addr) +}