@@ -15,6 +15,7 @@ import (
15
15
"go.mongodb.org/mongo-driver/bson/primitive"
16
16
"go.mongodb.org/mongo-driver/event"
17
17
"go.mongodb.org/mongo-driver/internal/testutil/assert"
18
+ "go.mongodb.org/mongo-driver/internal/testutil/monitor"
18
19
"go.mongodb.org/mongo-driver/mongo"
19
20
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
20
21
"go.mongodb.org/mongo-driver/mongo/options"
@@ -503,23 +504,23 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
503
504
})
504
505
})
505
506
506
- customDeploymentClientOpts := options .Client ().
507
- SetPoolMonitor (poolMonitor ).
508
- SetWriteConcern (mtest .MajorityWc ).
509
- SetReadConcern (mtest .MajorityRc ).
510
- SetRetryReads (false )
511
507
customDeploymentOpts := mtest .NewOptions ().
512
508
Topologies (mtest .ReplicaSet ). // Avoid complexity of sharded fail points.
513
509
MinServerVersion ("4.0" ). // 4.0 is needed to use replica set fail points.
514
- ClientOptions (customDeploymentClientOpts ).
515
510
CreateClient (false )
516
511
mt .RunOpts ("custom deployment" , customDeploymentOpts , func (mt * mtest.T ) {
517
512
// Tests for the changeStreamDeployment type. These are written as integration tests for ChangeStream rather
518
513
// than unit/integration tests for changeStreamDeployment to ensure that the deployment is correctly wired
519
514
// by ChangeStream when executing an aggregate.
520
515
521
516
mt .Run ("errors are processed for SDAM on initial aggregate" , func (mt * mtest.T ) {
522
- clearPoolChan ()
517
+ tpm := monitor .NewTestPoolMonitor ()
518
+ mt .ResetClient (options .Client ().
519
+ SetPoolMonitor (tpm .PoolMonitor ).
520
+ SetWriteConcern (mtest .MajorityWc ).
521
+ SetReadConcern (mtest .MajorityRc ).
522
+ SetRetryReads (false ))
523
+
523
524
mt .SetFailPoint (mtest.FailPoint {
524
525
ConfigureFailPoint : "failCommand" ,
525
526
Mode : mtest.FailPointMode {
@@ -533,10 +534,16 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
533
534
534
535
_ , err := mt .Coll .Watch (context .Background (), mongo.Pipeline {})
535
536
assert .NotNil (mt , err , "expected Watch error, got nil" )
536
- assert .True (mt , isPoolCleared (), "expected pool to be cleared after non-timeout network error but was not" )
537
+ assert .True (mt , tpm . IsPoolCleared (), "expected pool to be cleared after non-timeout network error but was not" )
537
538
})
538
539
mt .Run ("errors are processed for SDAM on getMore" , func (mt * mtest.T ) {
539
- clearPoolChan ()
540
+ tpm := monitor .NewTestPoolMonitor ()
541
+ mt .ResetClient (options .Client ().
542
+ SetPoolMonitor (tpm .PoolMonitor ).
543
+ SetWriteConcern (mtest .MajorityWc ).
544
+ SetReadConcern (mtest .MajorityRc ).
545
+ SetRetryReads (false ))
546
+
540
547
mt .SetFailPoint (mtest.FailPoint {
541
548
ConfigureFailPoint : "failCommand" ,
542
549
Mode : mtest.FailPointMode {
@@ -557,12 +564,13 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
557
564
558
565
assert .True (mt , cs .Next (context .Background ()), "expected Next to return true, got false (iteration error %v)" ,
559
566
cs .Err ())
560
- assert .True (mt , isPoolCleared (), "expected pool to be cleared after non-timeout network error but was not" )
567
+ assert .True (mt , tpm . IsPoolCleared (), "expected pool to be cleared after non-timeout network error but was not" )
561
568
})
562
- retryAggClientOpts := options .Client ().SetRetryReads (true ).SetPoolMonitor (poolMonitor )
563
- retryAggMtOpts := mtest .NewOptions ().ClientOptions (retryAggClientOpts )
564
- mt .RunOpts ("errors are processed for SDAM on retried aggregate" , retryAggMtOpts , func (mt * mtest.T ) {
565
- clearPoolChan ()
569
+ mt .Run ("errors are processed for SDAM on retried aggregate" , func (mt * mtest.T ) {
570
+ tpm := monitor .NewTestPoolMonitor ()
571
+ mt .ResetClient (options .Client ().
572
+ SetPoolMonitor (tpm .PoolMonitor ).
573
+ SetRetryReads (true ))
566
574
567
575
mt .SetFailPoint (mtest.FailPoint {
568
576
ConfigureFailPoint : "failCommand" ,
@@ -578,14 +586,10 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
578
586
_ , err := mt .Coll .Watch (context .Background (), mongo.Pipeline {})
579
587
assert .NotNil (mt , err , "expected Watch error, got nil" )
580
588
581
- var numClearedEvents int
582
- for len (poolChan ) > 0 {
583
- curr := <- poolChan
584
- if curr .Type == event .PoolCleared {
585
- numClearedEvents ++
586
- }
587
- }
588
- assert .Equal (mt , 2 , numClearedEvents , "expected two PoolCleared events, got %d" , numClearedEvents )
589
+ clearedEvents := tpm .Events (func (evt * event.PoolEvent ) bool {
590
+ return evt .Type == event .PoolCleared
591
+ })
592
+ assert .Equal (mt , 2 , len (clearedEvents ), "expected two PoolCleared events, got %d" , len (clearedEvents ))
589
593
})
590
594
})
591
595
// Setting min server version as 4.0 since v3.6 does not send a "dropEvent"
0 commit comments