@@ -319,116 +319,181 @@ func parseMaxAwaitTime(mt *mtest.T, evt *event.CommandStartedEvent) int64 {
319319 return got
320320}
321321
322- func TestCursor_tailableAwaitData ( t * testing. T ) {
323- mt := mtest . New ( t , mtest . NewOptions (). CreateClient ( false ) )
322+ func tadcFindFactory ( ctx context. Context , mt * mtest. T ) ( * mongo. Cursor , func () error ) {
323+ mt . Helper ( )
324324
325- cappedOpts := options .CreateCollection ().SetCapped (true ).
326- SetSizeInBytes (1024 * 64 )
325+ initCollection (mt , mt .Coll )
326+ cur , err := mt .Coll .Find (ctx , bson.D {{"x" , 1 }},
327+ options .Find ().SetBatchSize (1 ).SetCursorType (options .TailableAwait ))
328+ require .NoError (mt , err , "Find error: %v" , err )
327329
328- // TODO(SERVER-96344): mongos doesn't honor a failpoint's full blockTimeMS.
329- mtOpts := mtest .NewOptions ().MinServerVersion ("4.4" ).
330- Topologies (mtest .ReplicaSet , mtest .LoadBalanced , mtest .Single ).
331- CollectionCreateOptions (cappedOpts )
330+ return cur , func () error { return cur .Close (context .Background ()) }
331+ }
332332
333- mt . RunOpts ( "apply remaining timeoutMS if less than maxAwaitTimeMS" , mtOpts , func ( mt * mtest.T ) {
334- initCollection ( mt , mt . Coll )
333+ func tadcAggregateFactory ( ctx context. Context , mt * mtest.T ) ( * mongo. Cursor , func () error ) {
334+ mt . Helper ( )
335335
336- // Create a 30ms failpoint for getMore.
337- mt .SetFailPoint (failpoint.FailPoint {
338- ConfigureFailPoint : "failCommand" ,
339- Mode : failpoint.Mode {
340- Times : 1 ,
341- },
342- Data : failpoint.Data {
343- FailCommands : []string {"getMore" },
344- BlockConnection : true ,
345- BlockTimeMS : 30 ,
346- },
347- })
336+ initCollection (mt , mt .Coll )
348337
349- // Create a find cursor with a 100ms maxAwaitTimeMS and a tailable awaitData
350- // cursor type.
351- opts := options .Find ().
352- SetBatchSize (1 ).
353- SetMaxAwaitTime (100 * time .Millisecond ).
354- SetCursorType (options .TailableAwait )
338+ opts := options .Aggregate ().SetMaxAwaitTime (100 * time .Millisecond )
339+ pipe := mongo.Pipeline {{{"$changeStream" , bson.D {}}}}
355340
356- cursor , err := mt .Coll .Find ( context . Background (), bson. D {{ "x" , 2 }} , opts )
357- require .NoError (mt , err )
341+ cursor , err := mt .Coll .Aggregate ( ctx , pipe , opts )
342+ require .NoError (mt , err , "Aggregate error: %v" , err )
358343
359- defer cursor .Close (context .Background ())
344+ return cursor , func () error { return cursor .Close (context .Background ()) }
345+ }
360346
361- // Use a 200ms timeout that caps the lifetime of cursor.Next. The underlying
362- // getMore loop should run at least two times: the first getMore will block
363- // for 30ms on the getMore and then an additional 100ms for the
364- // maxAwaitTimeMS. The second getMore will then use the remaining ~70ms
365- // left on the timeout.
366- ctx , cancel := context .WithTimeout (context .Background (), 200 * time .Millisecond )
367- defer cancel ()
347+ func tadcRunCommandCursorFactory (ctx context.Context , mt * mtest.T ) (* mongo.Cursor , func () error ) {
348+ mt .Helper ()
368349
369- // Iterate twice to force a getMore
370- cursor .Next (ctx )
350+ initCollection (mt , mt .Coll )
371351
372- mt .ClearEvents ()
373- cursor .Next (ctx )
352+ cur , err := mt .DB .RunCommandCursor (ctx , bson.D {
353+ {"find" , mt .Coll .Name ()},
354+ {"filter" , bson.D {{"x" , 1 }}},
355+ {"tailable" , true },
356+ {"awaitData" , true },
357+ {"batchSize" , int32 (1 )},
358+ })
359+ require .NoError (mt , err , "RunCommandCursor error: %v" , err )
374360
375- require . Error ( mt , cursor . Err (), "expected error from cursor.Next" )
376- assert . ErrorIs ( mt , cursor . Err (), context . DeadlineExceeded , "expected context deadline exceeded error" )
361+ return cur , func () error { return cur . Close ( context . Background ()) }
362+ }
377363
378- // Collect all started events to find the getMore commands.
379- startedEvents := mt .GetAllStartedEvents ()
364+ // For tailable awaitData cursors, the maxTimeMS for a getMore should be
365+ // min(maxAwaitTimeMS, remaining timeoutMS - minRoundTripTime) to allow the
366+ // server more opportunities to respond with an empty batch before a
367+ // client-side timeout.
368+ func TestCursor_tailableAwaitData_applyRemainingTimeout (t * testing.T ) {
369+ const timeout = 200 * time .Millisecond
380370
381- var getMoreStartedEvents []* event.CommandStartedEvent
382- for _ , evt := range startedEvents {
383- if evt .CommandName == "getMore" {
384- getMoreStartedEvents = append (getMoreStartedEvents , evt )
385- }
386- }
371+ // Setup mtest instance.
372+ mt := mtest .New (t , mtest .NewOptions ().CreateClient (false ))
387373
388- // The first getMore should have a maxTimeMS of <= 100ms .
389- assert . LessOrEqual ( mt , parseMaxAwaitTime ( mt , getMoreStartedEvents [ 0 ]), int64 ( 100 ) )
374+ cappedOpts := options . CreateCollection (). SetCapped ( true ) .
375+ SetSizeInBytes ( 1024 * 64 )
390376
391- // The second getMore should have a maxTimeMS of <=71, indicating that we
392- // are using the time remaining in the context rather than the
393- // maxAwaitTimeMS.
394- assert .LessOrEqual (mt , parseMaxAwaitTime (mt , getMoreStartedEvents [1 ]), int64 (71 ))
395- })
377+ type testCase struct {
378+ name string
379+ factory func (ctx context.Context , mt * mtest.T ) (* mongo.Cursor , func () error )
380+ opTimeout bool
396381
397- mtOpts .Topologies (mtest .ReplicaSet , mtest .Sharded , mtest .LoadBalanced , mtest .Single )
382+ // Operations that insert a document into the collection will require that
383+ // an initial batch be consumed to ensure that the getMore is sent in
384+ // subsequent Next calls.
385+ consumeFirstBatch bool
386+ }
398387
399- mt .RunOpts ("apply maxAwaitTimeMS if less than remaining timeout" , mtOpts , func (mt * mtest.T ) {
400- initCollection (mt , mt .Coll )
401- mt .ClearEvents ()
388+ cases := []testCase {
389+ {
390+ name : "find client-level timeout" ,
391+ factory : tadcFindFactory ,
392+ opTimeout : false ,
393+ consumeFirstBatch : true ,
394+ },
395+ {
396+ name : "find operation-level timeout" ,
397+ factory : tadcFindFactory ,
398+ opTimeout : true ,
399+ consumeFirstBatch : true ,
400+ },
401+ {
402+ name : "aggregate with $changeStream client-level timeout" ,
403+ factory : tadcAggregateFactory ,
404+ opTimeout : false ,
405+ consumeFirstBatch : false ,
406+ },
407+ {
408+ name : "aggregate with $changeStream operation-level timeout" ,
409+ factory : tadcAggregateFactory ,
410+ opTimeout : true ,
411+ consumeFirstBatch : false ,
412+ },
413+ {
414+ name : "runCommandCursor client-level timeout" ,
415+ factory : tadcRunCommandCursorFactory ,
416+ opTimeout : false ,
417+ consumeFirstBatch : true ,
418+ },
419+ {
420+ name : "runCommandCursor operation-level timeout" ,
421+ factory : tadcRunCommandCursorFactory ,
422+ opTimeout : true ,
423+ consumeFirstBatch : true ,
424+ },
425+ }
426+
427+ // TODO(SERVER-96344): mongos doesn't honor a failpoint's full blockTimeMS.
428+ mtOpts := mtest .NewOptions ().
429+ CollectionCreateOptions (cappedOpts ).
430+ Topologies (mtest .ReplicaSet , mtest .LoadBalanced , mtest .Single )
431+
432+ for _ , tc := range cases {
433+ caseOpts := mtOpts
434+ if ! tc .opTimeout {
435+ caseOpts = mtOpts .ClientOptions (options .Client ().SetTimeout (timeout ))
436+ }
437+
438+ mt .RunOpts (tc .name , caseOpts , func (mt * mtest.T ) {
439+ mt .SetFailPoint (failpoint.FailPoint {
440+ ConfigureFailPoint : "failCommand" ,
441+ Mode : failpoint.Mode {Times : 1 },
442+ Data : failpoint.Data {
443+ FailCommands : []string {"getMore" },
444+ BlockConnection : true ,
445+ BlockTimeMS : 30 ,
446+ },
447+ })
402448
403- // Create a find cursor
404- opts := options .Find ().SetBatchSize (1 ).SetMaxAwaitTime (50 * time .Millisecond )
449+ ctx := context .Background ()
405450
406- cursor , err := mt .Coll .Find (context .Background (), bson.D {}, opts )
407- require .NoError (mt , err )
451+ var cancel context.CancelFunc
452+ if tc .opTimeout {
453+ ctx , cancel = context .WithTimeout (ctx , timeout )
454+ defer cancel ()
455+ }
408456
409- _ = mt .GetStartedEvent () // Empty find from started list.
457+ cur , cleanup := tc .factory (ctx , mt )
458+ defer func () { assert .NoError (mt , cleanup ()) }()
410459
411- defer cursor .Close (context .Background ())
460+ require .NoError (mt , cur .Err ())
461+
462+ cur .SetMaxAwaitTime (100 * time .Millisecond )
463+
464+ if tc .consumeFirstBatch {
465+ assert .True (mt , cur .Next (ctx )) // consume first batch item
466+ }
412467
413- ctx , cancel := context . WithTimeout ( context . Background (), 100 * time . Millisecond )
414- defer cancel ( )
468+ mt . ClearEvents ( )
469+ assert . False ( mt , cur . Next ( ctx ) )
415470
416- // Iterate twice to force a getMore
417- cursor .Next (ctx )
418- cursor .Next (ctx )
471+ require .Error (mt , cur .Err (), "expected error from cursor.Next" )
472+ assert .ErrorIs (mt , cur .Err (), context .DeadlineExceeded , "expected context deadline exceeded error" )
419473
420- cmd := mt .GetStartedEvent ().Command
474+ getMoreEvts := []* event.CommandStartedEvent {}
475+ for _ , evt := range mt .GetAllStartedEvents () {
476+ if evt .CommandName == "getMore" {
477+ getMoreEvts = append (getMoreEvts , evt )
478+ }
479+ }
421480
422- maxTimeMSRaw , err := cmd .LookupErr ("maxTimeMS" )
423- require .NoError (mt , err )
481+ require .Len (mt , getMoreEvts , 2 )
424482
425- got , ok := maxTimeMSRaw .AsInt64OK ()
426- require .True (mt , ok )
483+ // The first getMore should have a maxTimeMS of <= 100ms but greater
484+ // than 71ms, indicating that the maxAwaitTimeMS was used.
485+ assert .LessOrEqual (mt , parseMaxAwaitTime (mt , getMoreEvts [0 ]), int64 (100 ))
486+ assert .Greater (mt , parseMaxAwaitTime (mt , getMoreEvts [0 ]), int64 (71 ))
427487
428- assert .LessOrEqual (mt , got , int64 (50 ))
429- })
488+ // The second getMore should have a maxTimeMS of <=71, indicating that we
489+ // are using the time remaining in the context rather than the
490+ // maxAwaitTimeMS.
491+ assert .LessOrEqual (mt , parseMaxAwaitTime (mt , getMoreEvts [1 ]), int64 (71 ))
492+ })
493+ }
430494}
431495
496+ // For tailable awaitData cursors, the maxTimeMS for a getMore should be
432497func TestCursor_tailableAwaitData_ShortCircuitingGetMore (t * testing.T ) {
433498 mt := mtest .New (t , mtest .NewOptions ().CreateClient (false ))
434499
0 commit comments