Skip to content

Commit cda61e5

Browse files
authored
fix: stewardship with erasure encoding (#4955)
1 parent ffd67d0 commit cda61e5

File tree

2 files changed

+81
-22
lines changed

2 files changed

+81
-22
lines changed

pkg/api/stewardship_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,27 @@
55
package api_test
66

77
import (
8+
"bytes"
9+
"context"
810
"encoding/hex"
11+
"fmt"
912
"net/http"
13+
"strconv"
1014
"testing"
15+
"time"
1116

1217
"github.com/ethersphere/bee/v2/pkg/api"
18+
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
1319
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
1420
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
1521
"github.com/ethersphere/bee/v2/pkg/log"
1622
mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock"
23+
"github.com/ethersphere/bee/v2/pkg/steward"
1724
"github.com/ethersphere/bee/v2/pkg/steward/mock"
25+
"github.com/ethersphere/bee/v2/pkg/storage"
1826
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
1927
"github.com/ethersphere/bee/v2/pkg/swarm"
28+
"gitlab.com/nolash/go-mockbytes"
2029
)
2130

2231
// nolint:paralleltest
@@ -60,6 +69,58 @@ func TestStewardship(t *testing.T) {
6069
})
6170
}
6271

72+
type localRetriever struct {
73+
getter storage.Getter
74+
}
75+
76+
func (lr *localRetriever) RetrieveChunk(ctx context.Context, addr, sourceAddr swarm.Address) (chunk swarm.Chunk, err error) {
77+
ch, err := lr.getter.Get(ctx, addr)
78+
if err != nil {
79+
return nil, fmt.Errorf("retrieve chunk %s: %w", addr, err)
80+
}
81+
return ch, nil
82+
}
83+
84+
func TestStewardshipWithRedundancy(t *testing.T) {
85+
t.Parallel()
86+
87+
var (
88+
storerMock = mockstorer.New()
89+
localRetrieval = &localRetriever{getter: storerMock.ChunkStore()}
90+
s = steward.New(storerMock, localRetrieval, storerMock.Cache())
91+
client, _, _, _ = newTestServer(t, testServerOptions{
92+
Storer: storerMock,
93+
Logger: log.Noop,
94+
Steward: s,
95+
Post: mockpost.New(mockpost.WithAcceptAll()),
96+
})
97+
)
98+
99+
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
100+
content, err := g.SequentialBytes(512000) // 500KB
101+
if err != nil {
102+
t.Fatal(err)
103+
}
104+
105+
for _, l := range []redundancy.Level{redundancy.NONE, redundancy.MEDIUM, redundancy.STRONG, redundancy.INSANE, redundancy.PARANOID} {
106+
t.Run(fmt.Sprintf("rLevel-%d", l), func(t *testing.T) {
107+
res := new(api.BytesPostResponse)
108+
jsonhttptest.Request(t, client, http.MethodPost, "/bytes", http.StatusCreated,
109+
jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"),
110+
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr),
111+
jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, strconv.Itoa(int(l))),
112+
jsonhttptest.WithRequestBody(bytes.NewReader(content)),
113+
jsonhttptest.WithUnmarshalJSONResponse(res),
114+
)
115+
116+
time.Sleep(2 * time.Second)
117+
jsonhttptest.Request(t, client, http.MethodGet, "/stewardship/"+res.Reference.String(), http.StatusOK,
118+
jsonhttptest.WithExpectedJSONResponse(api.IsRetrievableResponse{IsRetrievable: true}),
119+
)
120+
})
121+
}
122+
}
123+
63124
func TestStewardshipInvalidInputs(t *testing.T) {
64125
t.Parallel()
65126

pkg/file/joiner/joiner.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
2020
"github.com/ethersphere/bee/v2/pkg/file/redundancy/getter"
2121
"github.com/ethersphere/bee/v2/pkg/replicas"
22-
storage "github.com/ethersphere/bee/v2/pkg/storage"
22+
"github.com/ethersphere/bee/v2/pkg/storage"
2323
"github.com/ethersphere/bee/v2/pkg/swarm"
2424
"golang.org/x/sync/errgroup"
2525
)
@@ -376,10 +376,6 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
376376
default:
377377
}
378378

379-
eg, ectx := errgroup.WithContext(ctx)
380-
381-
var wg sync.WaitGroup
382-
383379
eSize, err := file.ChunkPayloadSize(data)
384380
if err != nil {
385381
return err
@@ -399,29 +395,31 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
399395
continue
400396
}
401397

402-
wg.Add(1)
403-
eg.Go(func() error {
404-
defer wg.Done()
398+
if j.refLength == encryption.ReferenceSize && i < shardCnt {
399+
addr = swarm.NewAddress(data[cursor : cursor+swarm.HashSize*2])
400+
}
405401

406-
if j.refLength == encryption.ReferenceSize && i < shardCnt {
407-
addr = swarm.NewAddress(data[cursor : cursor+swarm.HashSize*2])
408-
}
409-
ch, err := g.Get(ectx, addr)
410-
if err != nil {
411-
return err
412-
}
402+
// not a shard
403+
if i >= shardCnt {
404+
continue
405+
}
413406

414-
chunkData := ch.Data()[8:]
415-
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
416-
_, parities := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)
407+
ch, err := g.Get(ctx, addr)
408+
if err != nil {
409+
return err
410+
}
417411

418-
return j.processChunkAddresses(ectx, fn, chunkData, subtrieSpan, parities)
419-
})
412+
chunkData := ch.Data()[8:]
413+
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
414+
_, parities := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)
420415

421-
wg.Wait()
416+
err = j.processChunkAddresses(ctx, fn, chunkData, subtrieSpan, parities)
417+
if err != nil {
418+
return err
419+
}
422420
}
423421

424-
return eg.Wait()
422+
return nil
425423
}
426424

427425
func (j *joiner) Size() int64 {

0 commit comments

Comments
 (0)