Skip to content

Commit 0354d3b

Browse files
committed
revert: "feat(pushsync): forward chunk after storing (#5037)" (#5088)
1 parent 6dfa847 commit 0354d3b

File tree

2 files changed

+1
-104
lines changed

2 files changed

+1
-104
lines changed

pkg/pushsync/pushsync.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -287,20 +287,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
287287

288288
if ps.topologyDriver.IsReachable() && swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) >= rad {
289289
stored, reason = true, "is within AOR"
290-
err = store(ctx)
291-
if err != nil {
292-
return err
293-
}
294-
295-
_, err = ps.pushToClosest(ctx, chunk, false)
296-
if err != nil {
297-
if !errors.Is(err, topology.ErrNotFound) && !errors.Is(err, topology.ErrWantSelf) {
298-
// Do not error out pushsync if we cannot forward to the closest peer after storing.
299-
// The peer will get it via pullsync.
300-
ps.logger.Error(nil, "failed to forward to closest peer", "chunk_address", chunk.Address(), "error", err)
301-
}
302-
}
303-
return nil
290+
return store(ctx)
304291
}
305292

306293
switch receipt, err := ps.pushToClosest(ctx, chunk, false); {

pkg/pushsync/pushsync_test.go

Lines changed: 0 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -307,96 +307,6 @@ func TestShallowReceiptTolerance(t *testing.T) {
307307
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
308308
}
309309

310-
// TestForwardToClosest checks that the chunk is forwarded to the closest peer after storing it.
311-
// Chunk moves from TriggerPeer -> PivotPeer (store) -> ClosestPeer
312-
func TestForwardToClosest(t *testing.T) {
313-
t.Parallel()
314-
// chunk data to upload
315-
chunk := testingc.FixtureChunk("7000")
316-
317-
// create a pivot node and a mocked closest node
318-
triggerPeer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
319-
pivotPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
320-
closestPeer := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
321-
322-
// Create the closest peer
323-
psClosestPeer, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), mock.WithClosestPeerErr(topology.ErrWantSelf))
324-
325-
recorder1 := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer))
326-
327-
// creating the pivot peer
328-
psPivot, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, recorder1, nil, defaultSigner(chunk), mock.WithClosestPeer(closestPeer))
329-
recorder2 := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()), streamtest.WithBaseAddr(triggerPeer))
330-
331-
// Creating the trigger peer
332-
psTriggerPeer, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, recorder2, nil, defaultSigner(chunk), mock.WithClosestPeer(pivotPeer))
333-
334-
receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
335-
if err != nil {
336-
t.Fatal(err)
337-
}
338-
339-
if !chunk.Address().Equal(receipt.Address) {
340-
t.Fatal("invalid receipt")
341-
}
342-
343-
// Pivot peer will forward the chunk to its closest peer. Intercept the incoming stream from pivot node and check
344-
// for the correctness of the chunk
345-
waitOnRecordAndTest(t, closestPeer, recorder1, chunk.Address(), chunk.Data())
346-
347-
// Similarly intercept the same incoming stream to see if the closest peer is sending a proper receipt
348-
waitOnRecordAndTest(t, closestPeer, recorder1, chunk.Address(), nil)
349-
350-
// In the received stream, check if a receipt is sent from pivot peer and check for its correctness.
351-
waitOnRecordAndTest(t, pivotPeer, recorder2, chunk.Address(), nil)
352-
353-
// In pivot peer, intercept the incoming delivery chunk from the trigger peer and check for correctness
354-
waitOnRecordAndTest(t, pivotPeer, recorder2, chunk.Address(), chunk.Data())
355-
356-
tests := []struct {
357-
name string
358-
peer swarm.Address
359-
acc accounting.Interface
360-
want int64
361-
}{
362-
{
363-
name: "trigger-pivot",
364-
peer: pivotPeer,
365-
acc: triggerAccounting,
366-
want: -int64(fixedPrice),
367-
},
368-
{
369-
name: "pivot-trigger",
370-
peer: triggerPeer,
371-
acc: pivotAccounting,
372-
want: int64(fixedPrice),
373-
},
374-
{
375-
name: "pivot-closest",
376-
peer: closestPeer,
377-
acc: pivotAccounting,
378-
want: -int64(fixedPrice),
379-
},
380-
{
381-
name: "closest-pivot",
382-
peer: pivotPeer,
383-
acc: closestAccounting,
384-
want: int64(fixedPrice),
385-
},
386-
}
387-
for _, tt := range tests {
388-
t.Run(tt.name, func(t *testing.T) {
389-
balance, err := tt.acc.Balance(tt.peer)
390-
if err != nil {
391-
t.Fatal(err)
392-
}
393-
if balance.Int64() != tt.want {
394-
t.Fatalf("unexpected balance. want %d got %d", tt.want, balance)
395-
}
396-
})
397-
}
398-
}
399-
400310
// PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective.
401311
// it also checks whether the tags are incremented properly if they are present
402312
func TestPushChunkToClosest(t *testing.T) {

0 commit comments

Comments
 (0)