Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 38 additions & 14 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,24 @@ func fingerprint(addrs []swarm.Address) string {
return string(h.Sum(nil))
}

// createRemoveCallback returns a function that handles the cleanup after a recovery attempt
func (g *decoderCache) createRemoveCallback(key string) func(error) {
return func(err error) {
g.mu.Lock()
defer g.mu.Unlock()
if err != nil {
// signals that a new getter is needed to reattempt to recover the data
delete(g.cache, key)
} else {
// signals that the chunks were fetched/recovered/cached so a future getter is not needed
// The nil value indicates a successful recovery
g.cache[key] = nil
}
}
}

// GetOrCreate returns a decoder for the given chunk address
func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter {

// since a recovery decoder is not allowed, simply return the underlying netstore
if g.config.Strict && g.config.Strategy == getter.NONE {
return g.fetcher
Expand All @@ -83,22 +98,31 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
d, ok := g.cache[key]
if ok {
if d == nil {
return g.fetcher
// The nil value indicates a previous successful recovery
// Create a new decoder but only use it as fallback if network fetch fails
decoderCallback := g.createRemoveCallback(key)

// Create a factory function that will instantiate the decoder only when needed
recovery := func() storage.Getter {
g.config.Logger.Debug("lazy-creating recovery decoder after fetch failed", "key", key)
g.mu.Lock()
defer g.mu.Unlock()
d, ok := g.cache[key]
if ok && d != nil {
return d
}
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here it is possible to have a deadlock. At this point, the same goroutine is holding the lock from line 96 and trying to acquire it again here in decoderCallback.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decoderCallback is called in prefetch function only that is executed by a different go routine

g.cache[key] = d
return d
}

return getter.NewReDecoder(g.fetcher, recovery, g.config.Logger)
}
return d
}
remove := func(err error) {
g.mu.Lock()
defer g.mu.Unlock()
if err != nil {
// signals that a new getter is needed to reattempt to recover the data
delete(g.cache, key)
} else {
// signals that the chunks were fetched/recovered/cached so a future getter is not needed
g.cache[key] = nil
}
}
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, remove, g.config)

removeCallback := g.createRemoveCallback(key)
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config)
g.cache[key] = d
return d
}
Expand Down
209 changes: 209 additions & 0 deletions pkg/file/joiner/redecoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2025 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package joiner_test

import (
"bytes"
"context"
"strconv"
"testing"

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/file/joiner"
"github.com/ethersphere/bee/v2/pkg/file/redundancy/getter"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/klauspost/reedsolomon"
)

// TestReDecoderFlow tests the complete flow of:
// 1. Loading data with redundancy getter
// 2. Successful recovery which nulls the decoder
// 3. Chunk eviction from cache
// 4. Reloading the same data through ReDecoder fallback
func TestReDecoderFlow(t *testing.T) {
ctx := context.Background()
dataShardCount := 4
parityShardCount := 2
totalShardCount := dataShardCount + parityShardCount

// Create real data chunks with proper content
dataShards := make([][]byte, dataShardCount)
for i := 0; i < dataShardCount; i++ {
// Create chunks with simpler test data
dataShards[i] = make([]byte, swarm.ChunkWithSpanSize)
// Create a unique string for this shard
testData := []byte("test-data-" + strconv.Itoa(i))
// Copy as much as will fit
copy(dataShards[i], testData)
}

// Create parity chunks using Reed-Solomon encoding
parityShards := make([][]byte, parityShardCount)
for i := 0; i < parityShardCount; i++ {
parityShards[i] = make([]byte, swarm.ChunkWithSpanSize)
}

// Create Reed-Solomon encoder
enc, err := reedsolomon.New(dataShardCount, parityShardCount)
if err != nil {
t.Fatalf("Failed to create Reed-Solomon encoder: %v", err)
}

// Combine data and parity shards
allShards := make([][]byte, totalShardCount)
copy(allShards, dataShards)
copy(allShards[dataShardCount:], parityShards)

// Encode to generate parity chunks
if err := enc.Encode(allShards); err != nil {
t.Fatalf("Failed to encode data: %v", err)
}

// Create content-addressed chunks for all shards
addresses := make([]swarm.Address, totalShardCount)
chunks := make([]swarm.Chunk, totalShardCount)

for i := 0; i < totalShardCount; i++ {
// Create proper content-addressed chunks
chunk, err := cac.NewWithDataSpan(allShards[i])
if err != nil {
t.Fatalf("Failed to create content-addressed chunk %d: %v", i, err)
}
chunks[i] = chunk
addresses[i] = chunk.Address()
}

// Select a data chunk to be missing (which will be recovered)
missingChunkIndex := 2 // The third data chunk will be missing
mockStore := inmemchunkstore.New()
netFetcher := newMockNetworkFetcher(addresses, addresses[missingChunkIndex])
config := getter.Config{
Strategy: getter.RACE,
Logger: log.Noop,
}

j := joiner.NewDecoderCache(netFetcher, mockStore, config)

// Step 1: Initializing decoder and triggering recovery
decoder := j.GetOrCreate(addresses, dataShardCount)
if decoder == nil {
t.Fatal("Failed to create decoder")
}

// Verify we can now fetch the previously missing chunk through recovery
recoveredChunk, err := decoder.Get(ctx, addresses[missingChunkIndex])
if err != nil {
t.Fatalf("Failed to recover missing chunk: %v", err)
}
// Verify the recovered chunk has the correct content
if !recoveredChunk.Address().Equal(addresses[missingChunkIndex]) {
t.Fatalf("Recovered chunk has incorrect address")
}

// Verify the recovered chunk has the correct content
recoveredData := recoveredChunk.Data()
expectedData := chunks[missingChunkIndex].Data()
if len(recoveredData) != len(expectedData) {
t.Fatalf("Recovered chunk has incorrect data length: got %d, want %d", len(recoveredData), len(expectedData))
}
if !bytes.Equal(recoveredData, expectedData) {
t.Fatalf("Recovered chunk has incorrect data")
}
// Check if the recovered chunk is now in the store
_, err = mockStore.Get(ctx, addresses[missingChunkIndex])
if err != nil {
t.Fatalf("Recovered chunk not saved to store: %v", err)
}

// Step 2: The original decoder should be automatically nulled after successful recovery
// This is an internal state check, we can't directly test it but we can verify that
// we can still access the chunks

// Sanity check - verify we can still fetch chunks through the cache
for i := 0; i < dataShardCount; i++ {
_, err := decoder.Get(ctx, addresses[i])
if err != nil {
t.Fatalf("Failed to get chunk %d after recovery: %v", i, err)
}
}

// Step 3: Testing ReDecoder fallback
newDecoder := j.GetOrCreate(addresses, dataShardCount)
if newDecoder == nil {
t.Fatal("Failed to create ReDecoder")
}

// Verify all chunks can be fetched through the ReDecoder
for i := 0; i < dataShardCount; i++ {
_, err := newDecoder.Get(ctx, addresses[i])
if err != nil {
t.Fatalf("Failed to get chunk %d through ReDecoder: %v", i, err)
}
}

// Verify that we can also access the first missing chunk - now from the store
// This would be using the local store and not network or recovery mechanisms
retrievedChunk, err := newDecoder.Get(ctx, addresses[missingChunkIndex])
if err != nil {
t.Fatalf("Failed to retrieve previously recovered chunk: %v", err)
}

if !retrievedChunk.Address().Equal(addresses[missingChunkIndex]) {
t.Fatalf("Retrieved chunk has incorrect address")
}

// Also verify the data content matches
retrievedData := retrievedChunk.Data()
expectedData = chunks[missingChunkIndex].Data()
if len(retrievedData) != len(expectedData) {
t.Fatalf("Retrieved chunk has incorrect data length: got %d, want %d", len(retrievedData), len(expectedData))
}
if !bytes.Equal(retrievedData, expectedData) {
t.Fatalf("Retrieved chunk has incorrect data")
}
}

// Mock implementation of storage.Getter for testing
type mockNetworkFetcher struct {
allAddresses []swarm.Address
missingAddr swarm.Address
}

// newMockNetworkFetcher creates a new mock fetcher that will return ErrNotFound for specific addresses
func newMockNetworkFetcher(allAddrs []swarm.Address, missingAddr swarm.Address) *mockNetworkFetcher {
return &mockNetworkFetcher{
allAddresses: allAddrs,
missingAddr: missingAddr,
}
}

// Get implements storage.Getter interface
func (m *mockNetworkFetcher) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) {
// Simulate network fetch - fail for the missing chunk
if addr.Equal(m.missingAddr) {
return nil, storage.ErrNotFound
}

// Find the index of this address in the all addresses list
var index int
for i, a := range m.allAddresses {
if addr.Equal(a) {
index = i
break
}
}

// Generate data using the same pattern as in the test
data := make([]byte, swarm.ChunkWithSpanSize)
// Create a unique string for this shard
testData := []byte("test-data-" + strconv.Itoa(index))
// Copy as much as will fit
copy(data, testData)

return swarm.NewChunk(addr, data), nil
}
2 changes: 1 addition & 1 deletion pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (g *decoder) setData(i int, chdata []byte) {
func (g *decoder) getData(i int) []byte {
g.mu.Lock()
defer g.mu.Unlock()
if i == g.shardCnt-1 && g.lastLen > 0 {
if i == g.shardCnt-1 && g.lastLen > 0 && g.rsbuf[i] != nil {
return g.rsbuf[i][:g.lastLen] // cut padding
}
return g.rsbuf[i]
Expand Down
61 changes: 61 additions & 0 deletions pkg/file/redundancy/getter/redecoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2025 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package getter

import (
"context"
"errors"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

// Recovery is a function that creates a recovery decoder on demand
type Recovery func() storage.Getter

// ReDecoder is a wrapper around a Getter that first attempts to fetch a chunk directly
// from the network, and only falls back to recovery if the network fetch fails.
// This is used to handle cases where previously recovered chunks have been evicted from cache.
type ReDecoder struct {
fetcher storage.Getter // Direct fetcher (usually netstore)
recovery Recovery // Factory function to create recovery decoder on demand
logger log.Logger
}

// NewReDecoder creates a new ReDecoder instance with the provided fetcher and recovery factory.
// The recovery decoder will only be created if needed (when network fetch fails).
func NewReDecoder(fetcher storage.Getter, recovery Recovery, logger log.Logger) *ReDecoder {
return &ReDecoder{
fetcher: fetcher,
recovery: recovery,
logger: logger,
}
}

// Get implements the storage.Getter interface.
// It first attempts to fetch the chunk directly from the network.
// If that fails with ErrNotFound, it then creates the recovery decoder and attempts to recover the chunk.
func (rd *ReDecoder) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) {
// First try to get the chunk directly from the network
chunk, err := rd.fetcher.Get(ctx, addr)
if err == nil {
return chunk, nil
}

// Only attempt recovery if the chunk was not found
if !errors.Is(err, storage.ErrNotFound) {
return nil, err
}

// Log that we're falling back to recovery
rd.logger.Debug("chunk not found in network, creating recovery decoder", "address", addr)

// Create the recovery decoder on demand
recovery := rd.recovery()

// Attempt to recover the chunk
return recovery.Get(ctx, addr)
}
Loading