Skip to content

Commit e93677b

Browse files
authored
feat(pushsync): forward chunk after storing (#5037)
1 parent aa6868b commit e93677b

File tree

2 files changed

+105
-2
lines changed

2 files changed

+105
-2
lines changed

pkg/pushsync/pushsync.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,20 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
287287

288288
if ps.topologyDriver.IsReachable() && swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) >= rad {
289289
stored, reason = true, "is within AOR"
290-
return store(ctx)
290+
err = store(ctx)
291+
if err != nil {
292+
return err
293+
}
294+
295+
_, err = ps.pushToClosest(ctx, chunk, false)
296+
if err != nil {
297+
if !errors.Is(err, topology.ErrNotFound) && !errors.Is(err, topology.ErrWantSelf) {
298+
// Do not error out pushsync if we cannot forward to the closest peer after storing.
299+
// The peer will get it via pullsync.
300+
ps.logger.Error(nil, "failed to forward to closest peer", "chunk_address", chunk.Address(), "error", err)
301+
}
302+
}
303+
return nil
291304
}
292305

293306
switch receipt, err := ps.pushToClosest(ctx, chunk, false); {

pkg/pushsync/pushsync_test.go

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/ethersphere/bee/v2/pkg/pushsync"
2727
"github.com/ethersphere/bee/v2/pkg/pushsync/pb"
2828
"github.com/ethersphere/bee/v2/pkg/soc"
29-
storage "github.com/ethersphere/bee/v2/pkg/storage"
29+
"github.com/ethersphere/bee/v2/pkg/storage"
3030
testingc "github.com/ethersphere/bee/v2/pkg/storage/testing"
3131
"github.com/ethersphere/bee/v2/pkg/swarm"
3232
"github.com/ethersphere/bee/v2/pkg/topology"
@@ -307,6 +307,96 @@ func TestShallowReceiptTolerance(t *testing.T) {
307307
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
308308
}
309309

310+
// TestForwardToClosest checks that the chunk is forwarded to the closest peer after storing it.
311+
// Chunk moves from TriggerPeer -> PivotPeer (store) -> ClosestPeer
312+
func TestForwardToClosest(t *testing.T) {
313+
t.Parallel()
314+
// chunk data to upload
315+
chunk := testingc.FixtureChunk("7000")
316+
317+
// create a pivot node and a mocked closest node
318+
triggerPeer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
319+
pivotPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
320+
closestPeer := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
321+
322+
// Create the closest peer
323+
psClosestPeer, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), mock.WithClosestPeerErr(topology.ErrWantSelf))
324+
325+
recorder1 := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer))
326+
327+
// creating the pivot peer
328+
psPivot, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, recorder1, nil, defaultSigner(chunk), mock.WithClosestPeer(closestPeer))
329+
recorder2 := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()), streamtest.WithBaseAddr(triggerPeer))
330+
331+
// Creating the trigger peer
332+
psTriggerPeer, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, recorder2, nil, defaultSigner(chunk), mock.WithClosestPeer(pivotPeer))
333+
334+
receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
335+
if err != nil {
336+
t.Fatal(err)
337+
}
338+
339+
if !chunk.Address().Equal(receipt.Address) {
340+
t.Fatal("invalid receipt")
341+
}
342+
343+
// Pivot peer will forward the chunk to its closest peer. Intercept the incoming stream from pivot node and check
344+
// for the correctness of the chunk
345+
waitOnRecordAndTest(t, closestPeer, recorder1, chunk.Address(), chunk.Data())
346+
347+
// Similarly intercept the same incoming stream to see if the closest peer is sending a proper receipt
348+
waitOnRecordAndTest(t, closestPeer, recorder1, chunk.Address(), nil)
349+
350+
// In the received stream, check if a receipt is sent from pivot peer and check for its correctness.
351+
waitOnRecordAndTest(t, pivotPeer, recorder2, chunk.Address(), nil)
352+
353+
// In pivot peer, intercept the incoming delivery chunk from the trigger peer and check for correctness
354+
waitOnRecordAndTest(t, pivotPeer, recorder2, chunk.Address(), chunk.Data())
355+
356+
tests := []struct {
357+
name string
358+
peer swarm.Address
359+
acc accounting.Interface
360+
want int64
361+
}{
362+
{
363+
name: "trigger-pivot",
364+
peer: pivotPeer,
365+
acc: triggerAccounting,
366+
want: -int64(fixedPrice),
367+
},
368+
{
369+
name: "pivot-trigger",
370+
peer: triggerPeer,
371+
acc: pivotAccounting,
372+
want: int64(fixedPrice),
373+
},
374+
{
375+
name: "pivot-closest",
376+
peer: closestPeer,
377+
acc: pivotAccounting,
378+
want: -int64(fixedPrice),
379+
},
380+
{
381+
name: "closest-pivot",
382+
peer: pivotPeer,
383+
acc: closestAccounting,
384+
want: int64(fixedPrice),
385+
},
386+
}
387+
for _, tt := range tests {
388+
t.Run(tt.name, func(t *testing.T) {
389+
balance, err := tt.acc.Balance(tt.peer)
390+
if err != nil {
391+
t.Fatal(err)
392+
}
393+
if balance.Int64() != tt.want {
394+
t.Fatalf("unexpected balance. want %d got %d", tt.want, balance)
395+
}
396+
})
397+
}
398+
}
399+
310400
// PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective.
311401
// it also checks whether the tags are incremented properly if they are present
312402
func TestPushChunkToClosest(t *testing.T) {

0 commit comments

Comments
 (0)