2222import com .mongodb .client .test .CollectionHelper ;
2323import com .mongodb .event .CommandFailedEvent ;
2424import com .mongodb .event .CommandSucceededEvent ;
25+ import com .mongodb .internal .connection .TestClusterListener ;
2526import com .mongodb .internal .connection .TestCommandListener ;
2627import org .bson .BsonDocument ;
2728import org .bson .Document ;
2829import org .junit .jupiter .api .AfterEach ;
2930import org .junit .jupiter .api .Test ;
3031
32+ import java .time .Duration ;
3133import java .util .List ;
34+ import java .util .concurrent .TimeoutException ;
3235
3336import static com .mongodb .ClusterFixture .isDiscoverableReplicaSet ;
3437import static com .mongodb .ClusterFixture .serverVersionAtLeast ;
@@ -53,11 +56,17 @@ public abstract class AbstractRetryableReadsProseTest {
5356
5457 private static final String COLLECTION_NAME = "test" ;
5558
59+ private final TestCommandListener commandListener =
60+ new TestCommandListener (asList ("commandFailedEvent" , "commandSucceededEvent" ), emptyList ());
61+ private final TestClusterListener clusterListener = new TestClusterListener ();
62+
5663 protected abstract MongoClient createClient (MongoClientSettings settings );
5764
5865 @ AfterEach
5966 void afterEach () {
6067 CollectionHelper .dropDatabase (getDefaultDatabaseName ());
68+ commandListener .reset ();
69+ clusterListener .clearClusterDescriptionChangedEvents ();
6170 }
6271
6372 /**
@@ -104,12 +113,10 @@ void retriesOnSameMongosWhenAnotherNotAvailable() {
104113 */
105114 //TODO-BACKPRESSURE Slav Babanin JAVA-6167 add overloadRetargeting into tests.
106115 @ Test
107- void overloadErrorRetriedOnDifferentReplicaSetServer () throws InterruptedException {
116+ void overloadErrorRetriedOnDifferentReplicaSetServer () throws InterruptedException , TimeoutException {
108117 //given
109118 assumeTrue (serverVersionAtLeast (4 , 4 ));
110119 assumeTrue (isDiscoverableReplicaSet ());
111- ServerAddress primaryServerAddress = getPrimary ();
112- TestCommandListener commandListener = new TestCommandListener (asList ("commandFailedEvent" , "commandSucceededEvent" ), emptyList ());
113120 BsonDocument configureFailPoint = BsonDocument .parse (
114121 "{\n "
115122 + " configureFailPoint: \" failCommand\" ,\n "
@@ -121,13 +128,16 @@ void overloadErrorRetriedOnDifferentReplicaSetServer() throws InterruptedExcepti
121128 + " }\n "
122129 + "}\n " );
123130
124- try (FailPoint ignored = FailPoint .enable (configureFailPoint , primaryServerAddress );
131+ try (FailPoint ignored = FailPoint .enable (configureFailPoint , getPrimary () );
125132 MongoClient client = createClient (getMongoClientSettingsBuilder ()
126133 .retryReads (true )
127134 .readPreference (ReadPreference .primaryPreferred ())
128135 .addCommandListener (commandListener )
136+ .applyToClusterSettings (builder -> builder .addClusterListener (clusterListener ))
129137 .build ())) {
130138
139+ waitForPrimaryDiscovery ();
140+
131141 MongoCollection <Document > collection = client .getDatabase (getDefaultDatabaseName ())
132142 .getCollection (COLLECTION_NAME );
133143 commandListener .reset ();
@@ -155,12 +165,10 @@ void overloadErrorRetriedOnDifferentReplicaSetServer() throws InterruptedExcepti
155165 */
156166 //TODO-BACKPRESSURE Slav Babanin JAVA-6167 add overloadRetargeting into tests.
157167 @ Test
158- void nonOverloadErrorRetriedOnSameReplicaSetServer () throws InterruptedException {
168+ void nonOverloadErrorRetriedOnSameReplicaSetServer () throws InterruptedException , TimeoutException {
159169 //given
160170 assumeTrue (serverVersionAtLeast (4 , 4 ));
161171 assumeTrue (isDiscoverableReplicaSet ());
162- ServerAddress primaryServerAddress = getPrimary ();
163- TestCommandListener commandListener = new TestCommandListener (asList ("commandFailedEvent" , "commandSucceededEvent" ), emptyList ());
164172 BsonDocument configureFailPoint = BsonDocument .parse (
165173 "{\n "
166174 + " configureFailPoint: \" failCommand\" ,\n "
@@ -172,13 +180,16 @@ void nonOverloadErrorRetriedOnSameReplicaSetServer() throws InterruptedException
172180 + " }\n "
173181 + "}\n " );
174182
175- try (FailPoint ignored = FailPoint .enable (configureFailPoint , primaryServerAddress );
183+ try (FailPoint ignored = FailPoint .enable (configureFailPoint , getPrimary () );
176184 MongoClient client = createClient (getMongoClientSettingsBuilder ()
177185 .retryReads (true )
178186 .readPreference (ReadPreference .primaryPreferred ())
179187 .addCommandListener (commandListener )
188+ .applyToClusterSettings (builder -> builder .addClusterListener (clusterListener ))
180189 .build ())) {
181190
191+ waitForPrimaryDiscovery ();
192+
182193 MongoCollection <Document > collection = client .getDatabase (getDefaultDatabaseName ())
183194 .getCollection (COLLECTION_NAME );
184195 commandListener .reset ();
@@ -199,4 +210,10 @@ void nonOverloadErrorRetriedOnSameReplicaSetServer() throws InterruptedException
199210 format ("Expected retry on same server but got %s and %s" , failedServer , succeededServer ));
200211 }
201212 }
213+
214+ private void waitForPrimaryDiscovery () throws InterruptedException , TimeoutException {
215+ clusterListener .waitForClusterDescriptionChangedEvents (
216+ event -> event .getNewDescription ().hasReadableServer (ReadPreference .primary ()),
217+ 1 , Duration .ofSeconds (10 ));
218+ }
202219}
0 commit comments