53
53
import static com .mongodb .internal .operation .OperationHelper .getMoreCursorDocumentToQueryResult ;
54
54
import static com .mongodb .internal .operation .QueryHelper .translateCommandException ;
55
55
import static com .mongodb .internal .operation .ServerVersionHelper .serverIsAtLeastVersionThreeDotTwo ;
56
- import static java .lang .String .format ;
57
56
import static java .util .Collections .singletonList ;
58
57
59
58
class AsyncQueryBatchCursor <T > implements AsyncAggregateResponseBatchCursor <T > {
@@ -152,12 +151,7 @@ public void close() {
152
151
153
152
@ Override
154
153
public void next (final SingleResultCallback <List <T >> callback ) {
155
- next (callback , false );
156
- }
157
-
158
- @ Override
159
- public void tryNext (final SingleResultCallback <List <T >> callback ) {
160
- next (callback , true );
154
+ internalNext (callback );
161
155
}
162
156
163
157
@ Override
@@ -199,16 +193,12 @@ public int getMaxWireVersion() {
199
193
return maxWireVersion ;
200
194
}
201
195
202
- private void next (final SingleResultCallback <List <T >> callback , final boolean tryNext ) {
196
+ private void internalNext (final SingleResultCallback <List <T >> callback ) {
203
197
if (isClosed ()) {
204
- callback .onResult (null , new MongoException (format ("%s called after the cursor was closed." ,
205
- tryNext ? "tryNext()" : "next()" )));
206
- } else if (firstBatch != null && (tryNext || !firstBatch .getResults ().isEmpty ())) {
198
+ callback .onResult (null , new MongoException ("next() called after the cursor was closed." ));
199
+ } else if (firstBatch != null && (!firstBatch .getResults ().isEmpty ())) {
207
200
// May be empty for a tailable cursor
208
201
List <T > results = firstBatch .getResults ();
209
- if (tryNext && results .isEmpty ()) {
210
- results = null ;
211
- }
212
202
firstBatch = null ;
213
203
if (getServerCursor () == null ) {
214
204
close ();
@@ -222,13 +212,12 @@ private void next(final SingleResultCallback<List<T>> callback, final boolean tr
222
212
} else {
223
213
synchronized (this ) {
224
214
if (isClosed ()) {
225
- callback .onResult (null , new MongoException (format ("%s called after the cursor was closed." ,
226
- tryNext ? "tryNext()" : "next()" )));
215
+ callback .onResult (null , new MongoException ("next() called after the cursor was closed." ));
227
216
return ;
228
217
}
229
218
isOperationInProgress = true ;
230
219
}
231
- getMore (localCursor , callback , tryNext );
220
+ getMore (localCursor , callback );
232
221
}
233
222
}
234
223
}
@@ -237,9 +226,9 @@ private boolean limitReached() {
237
226
return Math .abs (limit ) != 0 && count .get () >= Math .abs (limit );
238
227
}
239
228
240
- private void getMore (final ServerCursor cursor , final SingleResultCallback <List <T >> callback , final boolean tryNext ) {
229
+ private void getMore (final ServerCursor cursor , final SingleResultCallback <List <T >> callback ) {
241
230
if (pinnedConnection != null ) {
242
- getMore (pinnedConnection .retain (), cursor , callback , tryNext );
231
+ getMore (pinnedConnection .retain (), cursor , callback );
243
232
} else {
244
233
connectionSource .getConnection (new SingleResultCallback <AsyncConnection >() {
245
234
@ Override
@@ -248,24 +237,23 @@ public void onResult(final AsyncConnection connection, final Throwable t) {
248
237
endOperationInProgress ();
249
238
callback .onResult (null , t );
250
239
} else {
251
- getMore (connection , cursor , callback , tryNext );
240
+ getMore (connection , cursor , callback );
252
241
}
253
242
}
254
243
});
255
244
}
256
245
}
257
246
258
- private void getMore (final AsyncConnection connection , final ServerCursor cursor , final SingleResultCallback <List <T >> callback ,
259
- final boolean tryNext ) {
247
+ private void getMore (final AsyncConnection connection , final ServerCursor cursor , final SingleResultCallback <List <T >> callback ) {
260
248
if (serverIsAtLeastVersionThreeDotTwo (connection .getDescription ())) {
261
249
connection .commandAsync (namespace .getDatabaseName (), asGetMoreCommandDocument (cursor .getId ()), NO_OP_FIELD_NAME_VALIDATOR ,
262
250
ReadPreference .primary (), CommandResultDocumentCodec .create (decoder , "nextBatch" ),
263
251
connectionSource .getSessionContext (), connectionSource .getServerApi (),
264
- new CommandResultSingleResultCallback (connection , cursor , callback , tryNext ));
252
+ new CommandResultSingleResultCallback (connection , cursor , callback ));
265
253
266
254
} else {
267
255
connection .getMoreAsync (namespace , cursor .getId (), getNumberToReturn (limit , batchSize , count .get ()),
268
- decoder , new QueryResultSingleResultCallback (connection , callback , tryNext ));
256
+ decoder , new QueryResultSingleResultCallback (connection , callback ));
269
257
}
270
258
}
271
259
@@ -342,7 +330,7 @@ private BsonDocument asKillCursorsCommandDocument(final ServerCursor localCursor
342
330
}
343
331
344
332
private void endOperationInProgress () {
345
- boolean closePending = false ;
333
+ boolean closePending ;
346
334
synchronized (this ) {
347
335
isOperationInProgress = false ;
348
336
closePending = this .isClosePending ;
@@ -353,7 +341,7 @@ private void endOperationInProgress() {
353
341
}
354
342
355
343
private void handleGetMoreQueryResult (final AsyncConnection connection , final SingleResultCallback <List <T >> callback ,
356
- final QueryResult <T > result , final boolean tryNext ) {
344
+ final QueryResult <T > result ) {
357
345
cursor .set (result .getCursor ());
358
346
if (isClosePending ) {
359
347
try {
@@ -365,8 +353,8 @@ private void handleGetMoreQueryResult(final AsyncConnection connection, final Si
365
353
} finally {
366
354
callback .onResult (null , null );
367
355
}
368
- } else if (! tryNext && result .getResults ().isEmpty () && result .getCursor () != null ) {
369
- getMore (connection , result .getCursor (), callback , false );
356
+ } else if (result .getResults ().isEmpty () && result .getCursor () != null ) {
357
+ getMore (connection , result .getCursor (), callback );
370
358
} else {
371
359
count .addAndGet (result .getResults ().size ());
372
360
if (limitReached ()) {
@@ -392,14 +380,12 @@ private class CommandResultSingleResultCallback implements SingleResultCallback<
392
380
private final AsyncConnection connection ;
393
381
private final ServerCursor cursor ;
394
382
private final SingleResultCallback <List <T >> callback ;
395
- private final boolean tryNext ;
396
383
397
384
CommandResultSingleResultCallback (final AsyncConnection connection , final ServerCursor cursor ,
398
- final SingleResultCallback <List <T >> callback , final boolean tryNext ) {
385
+ final SingleResultCallback <List <T >> callback ) {
399
386
this .connection = connection ;
400
387
this .cursor = cursor ;
401
388
this .callback = errorHandlingCallback (callback , LOGGER );
402
- this .tryNext = tryNext ;
403
389
}
404
390
405
391
@ Override
@@ -415,21 +401,18 @@ public void onResult(final BsonDocument result, final Throwable t) {
415
401
QueryResult <T > queryResult = getMoreCursorDocumentToQueryResult (result .getDocument (CURSOR ),
416
402
connection .getDescription ().getServerAddress ());
417
403
postBatchResumeToken = getPostBatchResumeTokenFromResponse (result );
418
- handleGetMoreQueryResult (connection , callback , queryResult , tryNext );
404
+ handleGetMoreQueryResult (connection , callback , queryResult );
419
405
}
420
406
}
421
407
}
422
408
423
409
private class QueryResultSingleResultCallback implements SingleResultCallback <QueryResult <T >> {
424
410
private final AsyncConnection connection ;
425
411
private final SingleResultCallback <List <T >> callback ;
426
- private final boolean tryNext ;
427
412
428
- QueryResultSingleResultCallback (final AsyncConnection connection , final SingleResultCallback <List <T >> callback ,
429
- final boolean tryNext ) {
413
+ QueryResultSingleResultCallback (final AsyncConnection connection , final SingleResultCallback <List <T >> callback ) {
430
414
this .connection = connection ;
431
415
this .callback = errorHandlingCallback (callback , LOGGER );
432
- this .tryNext = tryNext ;
433
416
}
434
417
435
418
@ Override
@@ -439,7 +422,7 @@ public void onResult(final QueryResult<T> result, final Throwable t) {
439
422
endOperationInProgress ();
440
423
callback .onResult (null , t );
441
424
} else {
442
- handleGetMoreQueryResult (connection , callback , result , tryNext );
425
+ handleGetMoreQueryResult (connection , callback , result );
443
426
}
444
427
}
445
428
}
0 commit comments