Skip to content

Commit 3ffbe70

Browse files
committed
JAVA-2020: Add support for maxAwaitTime, which for a tailable cursor specifies the maximum time for the getMore command to wait for more documents before replying
1 parent 942e1bd commit 3ffbe70

21 files changed

+465
-44
lines changed

driver-async/src/main/com/mongodb/async/client/FindIterable.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,26 @@ public interface FindIterable<T> extends MongoIterable<T> {
6565
*/
6666
FindIterable<T> maxTime(long maxTime, TimeUnit timeUnit);
6767

68+
/**
69+
* The maximum amount of time for the server to wait on new documents to satisfy a tailable cursor
70+
* query. This only applies to a TAILABLE_AWAIT cursor. When the cursor is not a TAILABLE_AWAIT cursor,
71+
* this option is ignored.
72+
*
73+
* On servers >= 3.2, this option will be specified on the getMore command as "maxTimeMS". The default
74+
* is no value: no "maxTimeMS" is sent to the server with the getMore command.
75+
*
76+
* On servers < 3.2, this option is ignored, and indicates that the driver should respect the server's default value
77+
*
78+
* A zero value will be ignored.
79+
*
80+
* @param maxAwaitTime the max await time
81+
* @param timeUnit the time unit to return the result in
82+
* @return the maximum await execution time in the given time unit
83+
* @mongodb.driver.manual reference/method/cursor.maxTimeMS/#cursor.maxTimeMS Max Time
84+
* @since 3.2
85+
*/
86+
FindIterable<T> maxAwaitTime(long maxAwaitTime, TimeUnit timeUnit);
87+
6888
/**
6989
* Sets the query modifiers to apply to this operation.
7090
*

driver-async/src/main/com/mongodb/async/client/FindIterableImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ public FindIterable<TResult> maxTime(final long maxTime, final TimeUnit timeUnit
8787
return this;
8888
}
8989

90+
@Override
91+
public FindIterable<TResult> maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) {
92+
notNull("timeUnit", timeUnit);
93+
findOptions.maxAwaitTime(maxAwaitTime, timeUnit);
94+
return this;
95+
}
96+
9097
@Override
9198
public FindIterable<TResult> batchSize(final int batchSize) {
9299
findOptions.batchSize(batchSize);
@@ -175,6 +182,7 @@ private FindOperation<TResult> createQueryOperation() {
175182
.skip(findOptions.getSkip())
176183
.limit(findOptions.getLimit())
177184
.maxTime(findOptions.getMaxTime(MILLISECONDS), MILLISECONDS)
185+
.maxAwaitTime(findOptions.getMaxAwaitTime(MILLISECONDS), MILLISECONDS)
178186
.modifiers(toBsonDocument(findOptions.getModifiers()))
179187
.projection(toBsonDocument(findOptions.getProjection()))
180188
.sort(toBsonDocument(findOptions.getSort()))

driver-async/src/test/unit/com/mongodb/async/client/FindIterableSpecification.groovy

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import spock.lang.Specification
3737
import static com.mongodb.CustomMatchers.isTheSameAs
3838
import static com.mongodb.ReadPreference.secondary
3939
import static java.util.concurrent.TimeUnit.MILLISECONDS
40+
import static java.util.concurrent.TimeUnit.SECONDS
4041
import static org.bson.codecs.configuration.CodecRegistries.fromProviders
4142
import static spock.util.matcher.HamcrestSupport.expect
4243

@@ -58,7 +59,8 @@ class FindIterableSpecification extends Specification {
5859
def findOptions = new FindOptions().sort(new Document('sort', 1))
5960
.modifiers(new Document('modifier', 1))
6061
.projection(new Document('projection', 1))
61-
.maxTime(1000, MILLISECONDS)
62+
.maxTime(10, SECONDS)
63+
.maxAwaitTime(20, SECONDS)
6264
.batchSize(100)
6365
.limit(100)
6466
.skip(10)
@@ -81,7 +83,8 @@ class FindIterableSpecification extends Specification {
8183
.sort(new BsonDocument('sort', new BsonInt32(1)))
8284
.modifiers(new BsonDocument('modifier', new BsonInt32(1)))
8385
.projection(new BsonDocument('projection', new BsonInt32(1)))
84-
.maxTime(1000, MILLISECONDS)
86+
.maxTime(10000, MILLISECONDS)
87+
.maxAwaitTime(20000, MILLISECONDS)
8588
.batchSize(100)
8689
.limit(100)
8790
.skip(10)
@@ -95,7 +98,8 @@ class FindIterableSpecification extends Specification {
9598
.sort(new Document('sort', 2))
9699
.modifiers(new Document('modifier', 2))
97100
.projection(new Document('projection', 2))
98-
.maxTime(999, MILLISECONDS)
101+
.maxTime(9, SECONDS)
102+
.maxAwaitTime(18, SECONDS)
99103
.batchSize(99)
100104
.limit(99)
101105
.skip(9)
@@ -113,7 +117,8 @@ class FindIterableSpecification extends Specification {
113117
.sort(new BsonDocument('sort', new BsonInt32(2)))
114118
.modifiers(new BsonDocument('modifier', new BsonInt32(2)))
115119
.projection(new BsonDocument('projection', new BsonInt32(2)))
116-
.maxTime(999, MILLISECONDS)
120+
.maxTime(9000, MILLISECONDS)
121+
.maxAwaitTime(18000, MILLISECONDS)
117122
.batchSize(99)
118123
.limit(99)
119124
.skip(9)

driver-core/src/main/com/mongodb/client/model/FindOptions.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.concurrent.TimeUnit;
2323

24+
import static com.mongodb.assertions.Assertions.isTrueArgument;
2425
import static com.mongodb.assertions.Assertions.notNull;
2526

2627
/**
@@ -36,6 +37,7 @@ public final class FindOptions {
3637
private Bson modifiers;
3738
private Bson projection;
3839
private long maxTimeMS;
40+
private long maxAwaitTimeMS;
3941
private int skip;
4042
private Bson sort;
4143
private CursorType cursorType = CursorType.NonTailable;
@@ -59,6 +61,7 @@ public FindOptions(final FindOptions from) {
5961
modifiers = from.modifiers;
6062
projection = from.projection;
6163
maxTimeMS = from.maxTimeMS;
64+
maxAwaitTimeMS = from.maxAwaitTimeMS;
6265
skip = from.skip;
6366
sort = from.sort;
6467
cursorType = from.cursorType;
@@ -133,10 +136,50 @@ public long getMaxTime(final TimeUnit timeUnit) {
133136
*/
134137
public FindOptions maxTime(final long maxTime, final TimeUnit timeUnit) {
135138
notNull("timeUnit", timeUnit);
139+
isTrueArgument("maxTime > = 0", maxTime >= 0);
136140
this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
137141
return this;
138142
}
139143

144+
/**
145+
* The maximum amount of time for the server to wait on new documents to satisfy a tailable cursor
146+
* query. This only applies to a TAILABLE_AWAIT cursor. When the cursor is not a TAILABLE_AWAIT cursor,
147+
* this option is ignored.
148+
*
149+
* On servers >= 3.2, this option will be specified on the getMore command as "maxTimeMS". The default
150+
* is no value: no "maxTimeMS" is sent to the server with the getMore command.
151+
*
152+
* On servers < 3.2, this option is ignored, and indicates that the driver should respect the server's default value
153+
*
154+
* A zero value will be ignored.
155+
*
156+
* @param timeUnit the time unit to return the result in
157+
* @return the maximum await execution time in the given time unit
158+
* @since 3.2
159+
* @mongodb.driver.manual reference/method/cursor.maxTimeMS/#cursor.maxTimeMS Max Time
160+
*/
161+
public long getMaxAwaitTime(final TimeUnit timeUnit) {
162+
notNull("timeUnit", timeUnit);
163+
return timeUnit.convert(maxAwaitTimeMS, TimeUnit.MILLISECONDS);
164+
}
165+
166+
/**
167+
* Sets the maximum await execution time on the server for this operation.
168+
*
169+
* @param maxAwaitTime the max await time. A zero value will be ignored, and indicates that the driver should respect the server's
170+
* default value
171+
* @param timeUnit the time unit, which may not be null
172+
* @return this
173+
* @since 3.2
174+
* @mongodb.driver.manual reference/method/cursor.maxTimeMS/#cursor.maxTimeMS Max Time
175+
*/
176+
public FindOptions maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) {
177+
notNull("timeUnit", timeUnit);
178+
isTrueArgument("maxAwaitTime > = 0", maxAwaitTime >= 0);
179+
this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit);
180+
return this;
181+
}
182+
140183
/**
141184
* Gets the number of documents to return per batch. Default to 0, which indicates that the server chooses an appropriate batch
142185
* size.

driver-core/src/main/com/mongodb/operation/AggregateOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ private CommandTransformer<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(f
338338
@Override
339339
public AsyncBatchCursor<T> apply(final BsonDocument result, final ServerAddress serverAddress) {
340340
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
341-
return new AsyncQueryBatchCursor<T>(queryResult, 0, batchSize != null ? batchSize : 0, decoder, source, connection);
341+
return new AsyncQueryBatchCursor<T>(queryResult, 0, batchSize != null ? batchSize : 0, 0, decoder, source, connection);
342342
}
343343
};
344344
}

driver-core/src/main/com/mongodb/operation/AsyncQueryBatchCursor.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.List;
3535

3636
import static com.mongodb.assertions.Assertions.isTrue;
37+
import static com.mongodb.assertions.Assertions.isTrueArgument;
3738
import static com.mongodb.assertions.Assertions.notNull;
3839
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
3940
import static com.mongodb.operation.CursorHelper.getNumberToReturn;
@@ -47,6 +48,7 @@ class AsyncQueryBatchCursor<T> implements AsyncBatchCursor<T> {
4748
private final MongoNamespace namespace;
4849
private final int limit;
4950
private final Decoder<T> decoder;
51+
private final long maxTimeMS;
5052
private volatile AsyncConnectionSource connectionSource;
5153
private volatile QueryResult<T> firstBatch;
5254
private volatile int batchSize;
@@ -56,11 +58,13 @@ class AsyncQueryBatchCursor<T> implements AsyncBatchCursor<T> {
5658

5759
AsyncQueryBatchCursor(final QueryResult<T> firstBatch, final int limit, final int batchSize,
5860
final Decoder<T> decoder) {
59-
this(firstBatch, limit, batchSize, decoder, null, null);
61+
this(firstBatch, limit, batchSize, 0, decoder, null, null);
6062
}
6163

62-
AsyncQueryBatchCursor(final QueryResult<T> firstBatch, final int limit, final int batchSize,
64+
AsyncQueryBatchCursor(final QueryResult<T> firstBatch, final int limit, final int batchSize, final long maxTimeMS,
6365
final Decoder<T> decoder, final AsyncConnectionSource connectionSource, final AsyncConnection connection) {
66+
isTrueArgument("maxTimeMS >= 0", maxTimeMS >= 0);
67+
this.maxTimeMS = maxTimeMS;
6468
this.namespace = firstBatch.getNamespace();
6569
this.firstBatch = firstBatch;
6670
this.limit = limit;
@@ -166,7 +170,9 @@ private BsonDocument asGetMoreCommandDocument() {
166170
if (batchSizeForGetMoreCommand != 0) {
167171
document.append("batchSize", new BsonInt32(batchSizeForGetMoreCommand));
168172
}
169-
173+
if (maxTimeMS != 0) {
174+
document.append("maxTimeMS", new BsonInt64(maxTimeMS));
175+
}
170176
return document;
171177
}
172178

driver-core/src/main/com/mongodb/operation/FindOperation.java

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.concurrent.TimeUnit;
5454

5555
import static com.mongodb.ReadPreference.primary;
56+
import static com.mongodb.assertions.Assertions.isTrueArgument;
5657
import static com.mongodb.assertions.Assertions.notNull;
5758
import static com.mongodb.connection.ServerType.SHARD_ROUTER;
5859
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
@@ -84,6 +85,7 @@ public class FindOperation<T> implements AsyncReadOperation<AsyncBatchCursor<T>>
8485
private BsonDocument modifiers;
8586
private BsonDocument projection;
8687
private long maxTimeMS;
88+
private long maxAwaitTimeMS;
8789
private int skip;
8890
private BsonDocument sort;
8991
private CursorType cursorType = CursorType.NonTailable;
@@ -252,10 +254,49 @@ public long getMaxTime(final TimeUnit timeUnit) {
252254
*/
253255
public FindOperation<T> maxTime(final long maxTime, final TimeUnit timeUnit) {
254256
notNull("timeUnit", timeUnit);
257+
isTrueArgument("maxTime >= 0", maxTime >= 0);
255258
this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
256259
return this;
257260
}
258261

262+
/**
263+
* The maximum amount of time for the server to wait on new documents to satisfy a tailable cursor
264+
* query. This only applies to a TAILABLE_AWAIT cursor. When the cursor is not a TAILABLE_AWAIT cursor,
265+
* this option is ignored.
266+
*
267+
* On servers >= 3.2, this option will be specified on the getMore command as "maxTimeMS". The default
268+
* is no value: no "maxTimeMS" is sent to the server with the getMore command.
269+
*
270+
* On servers < 3.2, this option is ignored, and indicates that the driver should respect the server's default value
271+
*
272+
* A zero value will be ignored.
273+
*
274+
* @param timeUnit the time unit to return the result in
275+
* @return the maximum await execution time in the given time unit
276+
* @since 3.2
277+
* @mongodb.driver.manual reference/method/cursor.maxTimeMS/#cursor.maxTimeMS Max Time
278+
*/
279+
public long getMaxAwaitTime(final TimeUnit timeUnit) {
280+
notNull("timeUnit", timeUnit);
281+
return timeUnit.convert(maxAwaitTimeMS, TimeUnit.MILLISECONDS);
282+
}
283+
284+
/**
285+
* Sets the maximum await execution time on the server for this operation.
286+
*
287+
* @param maxAwaitTime the max await time. A zero value will be ignored, and indicates that the driver should respect the server's
288+
* default value
289+
* @param timeUnit the time unit, which may not be null
290+
* @return this
291+
* @since 3.2
292+
* @mongodb.driver.manual reference/method/cursor.maxTimeMS/#cursor.maxTimeMS Max Time
293+
*/
294+
public FindOperation<T> maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) {
295+
notNull("timeUnit", timeUnit);
296+
isTrueArgument("maxAwaitTime >= 0", maxAwaitTime >= 0);
297+
this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit);
298+
return this;
299+
}
259300
/**
260301
* Gets the number of documents to skip. The default is 0.
261302
*
@@ -460,7 +501,7 @@ public BatchCursor<T> call(final ConnectionSource source, final Connection conne
460501
isPartial(),
461502
isOplogReplay(),
462503
decoder);
463-
return new QueryBatchCursor<T>(queryResult, limit, batchSize, decoder, source, connection);
504+
return new QueryBatchCursor<T>(queryResult, limit, batchSize, getMaxTimeForCursor(), decoder, source, connection);
464505
}
465506
}
466507
});
@@ -499,6 +540,7 @@ public void onResult(final QueryResult<T> result, final Throwable t) {
499540
wrappedCallback.onResult(null, t);
500541
} else {
501542
wrappedCallback.onResult(new AsyncQueryBatchCursor<T>(result, limit, batchSize,
543+
getMaxTimeForCursor(),
502544
decoder, source, connection), null);
503545
}
504546
}
@@ -771,19 +813,23 @@ private CommandTransformer<BsonDocument, BatchCursor<T>> transformer(final Conne
771813
public BatchCursor<T> apply(final BsonDocument result, final ServerAddress serverAddress) {
772814
QueryResult<T> queryResult = cursorDocumentToQueryResult(result.getDocument("cursor"),
773815
connection.getDescription().getServerAddress());
774-
return new QueryBatchCursor<T>(queryResult, limit, batchSize, decoder, source);
816+
return new QueryBatchCursor<T>(queryResult, limit, batchSize, getMaxTimeForCursor(), decoder, source, connection);
775817
}
776818
};
777819
}
778820

821+
private long getMaxTimeForCursor() {
822+
return cursorType == CursorType.TailableAwait ? maxAwaitTimeMS : 0;
823+
}
824+
779825
private CommandTransformer<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final AsyncConnectionSource source,
780826
final AsyncConnection connection) {
781827
return new CommandTransformer<BsonDocument, AsyncBatchCursor<T>>() {
782828
@Override
783829
public AsyncBatchCursor<T> apply(final BsonDocument result, final ServerAddress serverAddress) {
784830
QueryResult<T> queryResult = cursorDocumentToQueryResult(result.getDocument("cursor"),
785831
connection.getDescription().getServerAddress());
786-
return new AsyncQueryBatchCursor<T>(queryResult, limit, batchSize, decoder, source, connection);
832+
return new AsyncQueryBatchCursor<T>(queryResult, limit, batchSize, getMaxTimeForCursor(), decoder, source, connection);
787833
}
788834
};
789835
}

driver-core/src/main/com/mongodb/operation/ListCollectionsOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ public void onResult(final QueryResult<BsonDocument> result, final Throwable t)
224224
} else {
225225
wrappedCallback.onResult(new ProjectingAsyncBatchCursor(
226226
new AsyncQueryBatchCursor<BsonDocument>(result, 0,
227-
batchSize, new BsonDocumentCodec(), source, connection)
227+
batchSize, 0, new BsonDocumentCodec(), source, connection)
228228
), null);
229229
}
230230
}

driver-core/src/main/com/mongodb/operation/ListDatabasesOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private CommandTransformer<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(f
137137
return new CommandTransformer<BsonDocument, AsyncBatchCursor<T>>() {
138138
@Override
139139
public AsyncBatchCursor<T> apply(final BsonDocument result, final ServerAddress serverAddress) {
140-
return new AsyncQueryBatchCursor<T>(createQueryResult(result, connection.getDescription()), 0, 0, decoder, source,
140+
return new AsyncQueryBatchCursor<T>(createQueryResult(result, connection.getDescription()), 0, 0, 0, decoder, source,
141141
connection);
142142
}
143143
};

driver-core/src/main/com/mongodb/operation/ListIndexesOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public void onResult(final QueryResult<T> result, final Throwable t) {
191191
if (t != null) {
192192
wrappedCallback.onResult(null, t);
193193
} else {
194-
wrappedCallback.onResult(new AsyncQueryBatchCursor<T>(result, 0, batchSize, decoder, source,
194+
wrappedCallback.onResult(new AsyncQueryBatchCursor<T>(result, 0, batchSize, 0, decoder, source,
195195
connection),
196196
null);
197197
}

0 commit comments

Comments
 (0)