Skip to content

Commit b24864a

Browse files
authored
Only sleep when necessary in SyncMongoCursor construction or close (#721)
JAVA-4176
1 parent 105cf7a commit b24864a

File tree

3 files changed

+94
-6
lines changed

3 files changed

+94
-6
lines changed

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,55 @@
3434
import static java.util.Objects.requireNonNull;
3535

3636
public class SyncMongoClient implements MongoClient {
37+
38+
private static long sleepAfterCursorOpenMS;
39+
40+
private static long sleepAfterCursorCloseMS;
41+
42+
/**
43+
* Unfortunately this is the only way to wait for a query to be initiated, since Reactive Streams is asynchronous
44+
* and we have no way of knowing. Tests which require cursor initiation to complete before execution of the next operation
45+
* can set this to a positive value. A value of 256 ms has been shown to work well. The default value is 0.
46+
*/
47+
public static void enableSleepAfterCursorOpen(final long sleepMS) {
48+
if (sleepAfterCursorOpenMS != 0) {
49+
throw new IllegalStateException("Already enabled");
50+
}
51+
if (sleepMS <= 0) {
52+
throw new IllegalArgumentException("sleepMS must be a postive value");
53+
}
54+
sleepAfterCursorOpenMS = sleepMS;
55+
}
56+
57+
/**
58+
* Unfortunately this is the only way to wait for close to complete, since it's asynchronous.
59+
* This is inherently racy but there are not any other good options. Tests which require cursor cancellation to complete before
60+
* execution of the next operation can set this to a positive value. A value of 256 ms has been shown to work well. The default
61+
* value is 0.
62+
*/
63+
public static void enableSleepAfterCursorClose(final long sleepMS) {
64+
if (sleepAfterCursorCloseMS != 0) {
65+
throw new IllegalStateException("Already enabled");
66+
}
67+
if (sleepMS <= 0) {
68+
throw new IllegalArgumentException("sleepMS must be a postive value");
69+
}
70+
sleepAfterCursorCloseMS = sleepMS;
71+
}
72+
73+
public static void disableCursorSleep() {
74+
sleepAfterCursorOpenMS = 0;
75+
sleepAfterCursorCloseMS = 0;
76+
}
77+
78+
public static long getSleepAfterCursorOpen() {
79+
return sleepAfterCursorOpenMS;
80+
}
81+
82+
public static long getSleepAfterCursorClose() {
83+
return sleepAfterCursorCloseMS;
84+
}
85+
3786
private final com.mongodb.reactivestreams.client.MongoClient wrapped;
3887

3988
public SyncMongoClient(final com.mongodb.reactivestreams.client.MongoClient wrapped) {

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.concurrent.TimeUnit;
3434

3535
import static com.mongodb.ClusterFixture.TIMEOUT;
36+
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorClose;
37+
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorOpen;
3638

3739
class SyncMongoCursor<T> implements MongoCursor<T> {
3840
private static final Object COMPLETED = new Object();
@@ -79,9 +81,7 @@ public void onComplete() {
7981
if (!latch.await(TIMEOUT, TimeUnit.SECONDS)) {
8082
throw new MongoTimeoutException("Timeout waiting for subscription");
8183
}
82-
// Unfortunately this is the only way to wait for the query to be initiated, since its asynchronous
83-
// and we have no way of knowing
84-
Thread.sleep(250);
84+
sleep(getSleepAfterCursorOpen());
8585
} catch (InterruptedException e) {
8686
throw new MongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e);
8787
}
@@ -90,10 +90,12 @@ public void onComplete() {
9090
@Override
9191
public void close() {
9292
subscription.cancel();
93-
// Unfortunately this is the only way to wait for cancellation to complete, since it's asynchronous.
94-
// This is inherently racy but there are not any other good options.
93+
sleep(getSleepAfterCursorClose());
94+
}
95+
96+
private static void sleep(final long millis) {
9597
try {
96-
Thread.sleep(250);
98+
Thread.sleep(millis);
9799
} catch (InterruptedException e) {
98100
throw new MongoInterruptedException("Interrupted from nap", e);
99101
}

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/LoadBalancerTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,39 @@
2424
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
2525
import org.bson.BsonArray;
2626
import org.bson.BsonDocument;
27+
import org.junit.After;
2728
import org.junit.runners.Parameterized;
2829

2930
import java.io.IOException;
3031
import java.net.URISyntaxException;
32+
import java.util.Arrays;
3133
import java.util.Collection;
34+
import java.util.List;
3235

36+
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableCursorSleep;
37+
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableSleepAfterCursorClose;
38+
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableSleepAfterCursorOpen;
3339
import static org.junit.Assume.assumeFalse;
3440

3541
public class LoadBalancerTest extends UnifiedTest {
3642

43+
private static final List<String> CURSOR_OPEN_TIMING_SENSITIVE_TESTS =
44+
Arrays.asList(
45+
"pinned connections are returned when the cursor is drained",
46+
"only connections for a specific serviceId are closed when pools are cleared",
47+
"pinned connections are returned to the pool when the cursor is closed",
48+
"no connection is pinned if all documents are returned in the initial batch",
49+
"stale errors are ignored",
50+
"a connection can be shared by a transaction and a cursor",
51+
"wait queue timeout errors include cursor statistics");
52+
53+
private static final List<String> CURSOR_CLOSE_TIMING_SENSITIVE_TESTS =
54+
Arrays.asList(
55+
"pinned connections are returned to the pool when the cursor is closed",
56+
"only connections for a specific serviceId are closed when pools are cleared",
57+
"pinned connections are returned after a network error during a killCursors request",
58+
"a connection can be shared by a transaction and a cursor");
59+
3760
public LoadBalancerTest(@SuppressWarnings("unused") final String fileDescription,
3861
final String testDescription,
3962
final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities,
@@ -46,6 +69,20 @@ public LoadBalancerTest(@SuppressWarnings("unused") final String fileDescription
4669
// Reactive streams driver can't implement this test because there is no way to tell that a change stream cursor
4770
// that has not yet received any results has even initiated the change stream
4871
assumeFalse(testDescription.equals("change streams pin to a connection"));
72+
73+
if (CURSOR_OPEN_TIMING_SENSITIVE_TESTS.contains(testDescription)) {
74+
enableSleepAfterCursorOpen(256);
75+
}
76+
77+
if (CURSOR_CLOSE_TIMING_SENSITIVE_TESTS.contains(testDescription)) {
78+
enableSleepAfterCursorClose(256);
79+
}
80+
}
81+
82+
@After
83+
public void cleanUp() {
84+
super.cleanUp();
85+
disableCursorSleep();
4986
}
5087

5188
@Override

0 commit comments

Comments
 (0)