Skip to content

Commit 9b9788a

Browse files
committed
Test improvements to reduce race conditions.
Added a sleep after killing a cursor for the ChangeStreamOperationSpecification. Updated timeouts for the Sync Adapters to 30 secs to help cope with replicaset changes. Added a tryMultipleTimes method to allow multiple testing of a condition before failing. JAVA-3535
1 parent 5233ff8 commit 9b9788a

File tree

6 files changed

+45
-13
lines changed

6 files changed

+45
-13
lines changed

driver-core/src/test/functional/com/mongodb/internal/operation/ChangeStreamOperationSpecification.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,8 +488,13 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
488488
expected = insertDocuments(helper, [5, 6])
489489
helper.killCursor(helper.getNamespace(), cursor.getWrapped().getServerCursor())
490490

491+
def results = nextAndClean(cursor, async)
492+
if (results.size() < expected.size()) {
493+
results.addAll(nextAndClean(cursor, async))
494+
}
495+
491496
then:
492-
nextAndClean(cursor, async) == expected
497+
results == expected
493498

494499
cleanup:
495500
cursor?.close()

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/TestSubscriber.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.List;
2424
import java.util.concurrent.CountDownLatch;
25+
import java.util.function.Supplier;
2526

2627
public class TestSubscriber<T> implements Subscriber<T> {
2728

@@ -165,7 +166,7 @@ public Subscription getSubscription() {
165166
* @throws AssertionError if the sequence of items observed does not exactly match {@code items}
166167
*/
167168
public void assertReceivedOnNext(final List<T> items) {
168-
if (getOnNextEvents().size() != items.size()) {
169+
if (tryMultipleTimes(2, () -> getOnNextEvents().size() != items.size())) {
169170
throw new AssertionError("Number of items does not match. Provided: " + items.size() + " Actual: " + getOnNextEvents().size());
170171
}
171172

@@ -184,6 +185,22 @@ public void assertReceivedOnNext(final List<T> items) {
184185
}
185186
}
186187

188+
private boolean tryMultipleTimes(final int noTimes, final Supplier<Boolean> test) {
189+
int counter = noTimes;
190+
while (counter > 0){
191+
if (test.get()) {
192+
return true;
193+
}
194+
counter--;
195+
try {
196+
Thread.sleep(500);
197+
} catch (InterruptedException e) {
198+
return false;
199+
}
200+
}
201+
return false;
202+
}
203+
187204
/**
188205
* Assert that a single terminal event occurred, either {@link #onComplete} or {@link #onError}.
189206
*

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/gridfs/GridFSPublisherSpecification.groovy

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import java.nio.ByteBuffer
3939
import java.nio.channels.Channels
4040
import java.nio.channels.WritableByteChannel
4141
import java.security.SecureRandom
42+
4243
import static com.mongodb.client.model.Filters.eq
4344
import static com.mongodb.client.model.Updates.unset
4445
import static com.mongodb.reactivestreams.client.Fixture.ObservableSubscriber
@@ -47,6 +48,7 @@ import static com.mongodb.reactivestreams.client.Fixture.getMongoClient
4748
import static com.mongodb.reactivestreams.client.MongoClients.getDefaultCodecRegistry
4849
import static com.mongodb.reactivestreams.client.internal.Publishers.publishAndFlatten
4950
import static java.util.Arrays.asList
51+
import static java.util.concurrent.TimeUnit.MILLISECONDS
5052
import static java.util.concurrent.TimeUnit.MINUTES
5153
import static java.util.concurrent.TimeUnit.SECONDS
5254
import static org.bson.codecs.configuration.CodecRegistries.fromCodecs
@@ -120,7 +122,7 @@ class GridFSPublisherSpecification extends FunctionalSpecification {
120122
def options = new GridFSUploadOptions().chunkSizeBytes(chunkSize)
121123

122124
when:
123-
def fileId = run(gridFSBucket.&uploadFromPublisher, 'myFile', createPublisher(ByteBuffer.wrap(contentBytes)), options)
125+
def fileId = run(MINUTES.toMillis(5), gridFSBucket.&uploadFromPublisher, 'myFile', createPublisher(ByteBuffer.wrap(contentBytes)), options)
124126

125127
then:
126128
run(filesCollection.&countDocuments) == 1
@@ -488,15 +490,23 @@ class GridFSPublisherSpecification extends FunctionalSpecification {
488490
}
489491
}
490492

491-
def run(operation, ... args) {
492-
def result = runAndCollect(operation, args)
493+
def run(Closure<?> operation, ... args) {
494+
run(MINUTES.toMillis(1), operation, *args)
495+
}
496+
497+
def runAndCollect(Closure<?> operation, ... args) {
498+
runAndCollect(MINUTES.toMillis(1), operation, *args)
499+
}
500+
501+
def run(long timeout, Closure<?> operation, ... args) {
502+
def result = runAndCollect(timeout, operation, args)
493503
result != null && !result.isEmpty() ? result.get(0) : result
494504
}
495505

496-
def runAndCollect(operation, ... args) {
506+
def runAndCollect(long timeout, Closure<?> operation, ... args) {
497507
def subscriber = new ObservableSubscriber()
498508
operation.call(args).subscribe(subscriber)
499-
subscriber.get(1, MINUTES)
509+
subscriber.get(timeout, MILLISECONDS)
500510
}
501511

502512
byte[] concatByteBuffers(List<ByteBuffer> buffers) {

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SingleResultSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class SingleResultSubscriber<T> implements Subscriber<T> {
3333
@Nullable
3434
T get() {
3535
try {
36-
if (!latch.await(5, TimeUnit.SECONDS)) {
36+
if (!latch.await(30, TimeUnit.SECONDS)) {
3737
throw new MongoTimeoutException("Timeout waiting for single result");
3838
}
3939
if (exception != null) {

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void onComplete() {
6464
}
6565
});
6666
try {
67-
if (!latch.await(5, TimeUnit.SECONDS)) {
67+
if (!latch.await(30, TimeUnit.SECONDS)) {
6868
throw new MongoTimeoutException("Timeout waiting for subscription");
6969
}
7070
} catch (InterruptedException e) {
@@ -84,9 +84,9 @@ public boolean hasNext() {
8484
return true;
8585
}
8686
try {
87-
Object first = results.pollFirst(5, TimeUnit.SECONDS);
87+
Object first = results.pollFirst(30, TimeUnit.SECONDS);
8888
if (first == null) {
89-
throw new MongoTimeoutException("Time out!!!");
89+
throw new MongoTimeoutException("Time out waiting for result from cursor");
9090
} else if (first instanceof Throwable) {
9191
throw translateError((Throwable) first);
9292
} else if (first == COMPLETED) {

driver-scala/src/it/scala/org/mongodb/scala/syncadapter/SyncMongoCursor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ case class SyncMongoCursor[T](val observable: Observable[T]) extends MongoCursor
5151
}
5252
})
5353
try {
54-
if (!latch.await(5, TimeUnit.SECONDS)) {
54+
if (!latch.await(30, TimeUnit.SECONDS)) {
5555
throw new MongoTimeoutException("Timeout waiting for subscription")
5656
}
5757
} catch {
@@ -68,7 +68,7 @@ case class SyncMongoCursor[T](val observable: Observable[T]) extends MongoCursor
6868
if (nextResult.isDefined) {
6969
return true
7070
}
71-
val first = results.pollFirst(5, TimeUnit.SECONDS)
71+
val first = results.pollFirst(30, TimeUnit.SECONDS)
7272
first match {
7373
case n if n == null => throw new MongoTimeoutException("Time out!!!")
7474
case t if t.isInstanceOf[Throwable] => throw translateError(t.asInstanceOf[Throwable])

0 commit comments

Comments
 (0)