Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,20 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)

if ps.topologyDriver.IsReachable() && swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) >= rad {
stored, reason = true, "is within AOR"
return store(ctx)
err = store(ctx)
if err != nil {
return err
}

_, err = ps.pushToClosest(ctx, chunk, false)
if err != nil {
if !errors.Is(err, topology.ErrNotFound) && !errors.Is(err, topology.ErrWantSelf) {
// Do not error out pushsync if we cannot forward to the closest peer after storing.
// The peer will get it via pullsync.
ps.logger.Error(nil, "failed to forward to closest peer", "chunk_address", chunk.Address(), "error", err)
}
}
return nil
}

switch receipt, err := ps.pushToClosest(ctx, chunk, false); {
Expand Down
92 changes: 91 additions & 1 deletion pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/pushsync"
"github.com/ethersphere/bee/v2/pkg/pushsync/pb"
"github.com/ethersphere/bee/v2/pkg/soc"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
testingc "github.com/ethersphere/bee/v2/pkg/storage/testing"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/topology"
Expand Down Expand Up @@ -307,6 +307,96 @@ func TestShallowReceiptTolerance(t *testing.T) {
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
}

// TestForwardToClosest checks that the chunk is forwarded to the closest peer after storing it.
// Chunk moves from TriggerPeer -> PivotPeer (store) -> ClosestPeer
func TestForwardToClosest(t *testing.T) {
t.Parallel()
// chunk data to upload
chunk := testingc.FixtureChunk("7000")

// create a pivot node and a mocked closest node
triggerPeer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
pivotPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")

// Create the closest peer
psClosestPeer, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), mock.WithClosestPeerErr(topology.ErrWantSelf))

recorder1 := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer))

// creating the pivot peer
psPivot, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, recorder1, nil, defaultSigner(chunk), mock.WithClosestPeer(closestPeer))
recorder2 := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()), streamtest.WithBaseAddr(triggerPeer))

// Creating the trigger peer
psTriggerPeer, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, recorder2, nil, defaultSigner(chunk), mock.WithClosestPeer(pivotPeer))

receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}

if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}

// Pivot peer will forward the chunk to its closest peer. Intercept the incoming stream from pivot node and check
// for the correctness of the chunk
waitOnRecordAndTest(t, closestPeer, recorder1, chunk.Address(), chunk.Data())

// Similarly intercept the same incoming stream to see if the closest peer is sending a proper receipt
waitOnRecordAndTest(t, closestPeer, recorder1, chunk.Address(), nil)

// In the received stream, check if a receipt is sent from pivot peer and check for its correctness.
waitOnRecordAndTest(t, pivotPeer, recorder2, chunk.Address(), nil)

// In pivot peer, intercept the incoming delivery chunk from the trigger peer and check for correctness
waitOnRecordAndTest(t, pivotPeer, recorder2, chunk.Address(), chunk.Data())

tests := []struct {
name string
peer swarm.Address
acc accounting.Interface
want int64
}{
{
name: "trigger-pivot",
peer: pivotPeer,
acc: triggerAccounting,
want: -int64(fixedPrice),
},
{
name: "pivot-trigger",
peer: triggerPeer,
acc: pivotAccounting,
want: int64(fixedPrice),
},
{
name: "pivot-closest",
peer: closestPeer,
acc: pivotAccounting,
want: -int64(fixedPrice),
},
{
name: "closest-pivot",
peer: pivotPeer,
acc: closestAccounting,
want: int64(fixedPrice),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
balance, err := tt.acc.Balance(tt.peer)
if err != nil {
t.Fatal(err)
}
if balance.Int64() != tt.want {
t.Fatalf("unexpected balance. want %d got %d", tt.want, balance)
}
})
}
}

// PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective.
// it also checks whether the tags are incremented properly if they are present
func TestPushChunkToClosest(t *testing.T) {
Expand Down
Loading