Skip to content

Commit 7e10b69

Browse files
authored
revert: "feat(pushsync): forward chunk after storing (#5037)" (#5088)
1 parent 455a3d5 commit 7e10b69

File tree

2 files changed

+1
-104
lines changed

2 files changed

+1
-104
lines changed

pkg/pushsync/pushsync.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -288,20 +288,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
288288

289289
if ps.topologyDriver.IsReachable() && swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) >= rad {
290290
stored, reason = true, "is within AOR"
291-
err = store(ctx)
292-
if err != nil {
293-
return err
294-
}
295-
296-
_, err = ps.pushToClosest(ctx, chunk, false)
297-
if err != nil {
298-
if !errors.Is(err, topology.ErrNotFound) && !errors.Is(err, topology.ErrWantSelf) {
299-
// Do not error out pushsync if we cannot forward to the closest peer after storing.
300-
// The peer will get it via pullsync.
301-
ps.logger.Error(nil, "failed to forward to closest peer", "chunk_address", chunk.Address(), "error", err)
302-
}
303-
}
304-
return nil
291+
return store(ctx)
305292
}
306293

307294
switch receipt, err := ps.pushToClosest(ctx, chunk, false); {

pkg/pushsync/pushsync_test.go

Lines changed: 0 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -308,96 +308,6 @@ func TestShallowReceiptTolerance(t *testing.T) {
308308
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
309309
}
310310

311-
// TestForwardToClosest checks that the chunk is forwarded to the closest peer after storing it.
312-
// Chunk moves from TriggerPeer -> PivotPeer (store) -> ClosestPeer
313-
func TestForwardToClosest(t *testing.T) {
314-
t.Parallel()
315-
// chunk data to upload
316-
chunk := testingc.FixtureChunk("7000")
317-
318-
// create a pivot node and a mocked closest node
319-
triggerPeer := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
320-
pivotPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
321-
closestPeer := swarm.MustParseHexAddress("7000000000000000000000000000000000000000000000000000000000000000")
322-
323-
// Create the closest peer
324-
psClosestPeer, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), mock.WithClosestPeerErr(topology.ErrWantSelf))
325-
326-
recorder1 := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer))
327-
328-
// creating the pivot peer
329-
psPivot, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, recorder1, nil, defaultSigner(chunk), mock.WithClosestPeer(closestPeer))
330-
recorder2 := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()), streamtest.WithBaseAddr(triggerPeer))
331-
332-
// Creating the trigger peer
333-
psTriggerPeer, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, recorder2, nil, defaultSigner(chunk), mock.WithClosestPeer(pivotPeer))
334-
335-
receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
336-
if err != nil {
337-
t.Fatal(err)
338-
}
339-
340-
if !chunk.Address().Equal(receipt.Address) {
341-
t.Fatal("invalid receipt")
342-
}
343-
344-
// Pivot peer will forward the chunk to its closest peer. Intercept the incoming stream from pivot node and check
345-
// for the correctness of the chunk
346-
waitOnRecordAndTest(t, closestPeer, recorder1, chunk.Address(), chunk.Data())
347-
348-
// Similarly intercept the same incoming stream to see if the closest peer is sending a proper receipt
349-
waitOnRecordAndTest(t, closestPeer, recorder1, chunk.Address(), nil)
350-
351-
// In the received stream, check if a receipt is sent from pivot peer and check for its correctness.
352-
waitOnRecordAndTest(t, pivotPeer, recorder2, chunk.Address(), nil)
353-
354-
// In pivot peer, intercept the incoming delivery chunk from the trigger peer and check for correctness
355-
waitOnRecordAndTest(t, pivotPeer, recorder2, chunk.Address(), chunk.Data())
356-
357-
tests := []struct {
358-
name string
359-
peer swarm.Address
360-
acc accounting.Interface
361-
want int64
362-
}{
363-
{
364-
name: "trigger-pivot",
365-
peer: pivotPeer,
366-
acc: triggerAccounting,
367-
want: -int64(fixedPrice),
368-
},
369-
{
370-
name: "pivot-trigger",
371-
peer: triggerPeer,
372-
acc: pivotAccounting,
373-
want: int64(fixedPrice),
374-
},
375-
{
376-
name: "pivot-closest",
377-
peer: closestPeer,
378-
acc: pivotAccounting,
379-
want: -int64(fixedPrice),
380-
},
381-
{
382-
name: "closest-pivot",
383-
peer: pivotPeer,
384-
acc: closestAccounting,
385-
want: int64(fixedPrice),
386-
},
387-
}
388-
for _, tt := range tests {
389-
t.Run(tt.name, func(t *testing.T) {
390-
balance, err := tt.acc.Balance(tt.peer)
391-
if err != nil {
392-
t.Fatal(err)
393-
}
394-
if balance.Int64() != tt.want {
395-
t.Fatalf("unexpected balance. want %d got %d", tt.want, balance)
396-
}
397-
})
398-
}
399-
}
400-
401311
// PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective.
402312
// it also checks whether the tags are incremented properly if they are present
403313
func TestPushChunkToClosest(t *testing.T) {

0 commit comments

Comments
 (0)