@@ -19,9 +19,11 @@ package stream
19
19
import (
20
20
"bytes"
21
21
"context"
22
+ "errors"
22
23
"fmt"
23
24
"os"
24
25
"sync"
26
+ "sync/atomic"
25
27
"testing"
26
28
"time"
27
29
@@ -500,7 +502,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
500
502
501
503
log .Info ("Starting simulation" )
502
504
ctx := context .Background ()
503
- result := sim .Run (ctx , func (ctx context.Context , sim * simulation.Simulation ) error {
505
+ result := sim .Run (ctx , func (ctx context.Context , sim * simulation.Simulation ) ( err error ) {
504
506
nodeIDs := sim .UpNodeIDs ()
505
507
//determine the pivot node to be the first node of the simulation
506
508
pivot := nodeIDs [0 ]
@@ -553,14 +555,13 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
553
555
}
554
556
pivotFileStore := item .(* storage.FileStore )
555
557
log .Debug ("Starting retrieval routine" )
558
+ retErrC := make (chan error )
556
559
go func () {
557
560
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
558
561
// we must wait for the peer connections to have started before requesting
559
562
n , err := readAll (pivotFileStore , fileHash )
560
563
log .Info (fmt .Sprintf ("retrieved %v" , fileHash ), "read" , n , "err" , err )
561
- if err != nil {
562
- t .Fatalf ("requesting chunks action error: %v" , err )
563
- }
564
+ retErrC <- err
564
565
}()
565
566
566
567
log .Debug ("Watching for disconnections" )
@@ -570,11 +571,19 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
570
571
simulation .NewPeerEventsFilter ().Drop (),
571
572
)
572
573
574
+ var disconnected atomic.Value
573
575
go func () {
574
576
for d := range disconnections {
575
577
if d .Error != nil {
576
578
log .Error ("peer drop" , "node" , d .NodeID , "peer" , d .PeerID )
577
- t .Fatal (d .Error )
579
+ disconnected .Store (true )
580
+ }
581
+ }
582
+ }()
583
+ defer func () {
584
+ if err != nil {
585
+ if yes , ok := disconnected .Load ().(bool ); ok && yes {
586
+ err = errors .New ("disconnect events received" )
578
587
}
579
588
}
580
589
}()
@@ -595,6 +604,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
595
604
if ! success {
596
605
return fmt .Errorf ("Test failed, chunks not available on all nodes" )
597
606
}
607
+ if err := <- retErrC ; err != nil {
608
+ t .Fatalf ("requesting chunks: %v" , err )
609
+ }
598
610
log .Debug ("Test terminated successfully" )
599
611
return nil
600
612
})
@@ -675,7 +687,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
675
687
}
676
688
677
689
ctx := context .Background ()
678
- result := sim .Run (ctx , func (ctx context.Context , sim * simulation.Simulation ) error {
690
+ result := sim .Run (ctx , func (ctx context.Context , sim * simulation.Simulation ) ( err error ) {
679
691
nodeIDs := sim .UpNodeIDs ()
680
692
node := nodeIDs [len (nodeIDs )- 1 ]
681
693
@@ -702,11 +714,19 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
702
714
simulation .NewPeerEventsFilter ().Drop (),
703
715
)
704
716
717
+ var disconnected atomic.Value
705
718
go func () {
706
719
for d := range disconnections {
707
720
if d .Error != nil {
708
721
log .Error ("peer drop" , "node" , d .NodeID , "peer" , d .PeerID )
709
- b .Fatal (d .Error )
722
+ disconnected .Store (true )
723
+ }
724
+ }
725
+ }()
726
+ defer func () {
727
+ if err != nil {
728
+ if yes , ok := disconnected .Load ().(bool ); ok && yes {
729
+ err = errors .New ("disconnect events received" )
710
730
}
711
731
}
712
732
}()
0 commit comments