Skip to content

Commit cc41e29

Browse files
Add RC test
1 parent c563b65 commit cc41e29

File tree

2 files changed

+178
-0
lines changed

2 files changed

+178
-0
lines changed

internal/integration/cursor_test.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,179 @@ func parseMaxAwaitTime(mt *mtest.T, evt *event.CommandStartedEvent) int64 {
319319
return got
320320
}
321321

322+
func TestCursor_ClientTimeout_tailableAwaitData(t *testing.T) {
323+
mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
324+
325+
// Helper to set a long block to force client timeout to fire.
326+
setGetMoreBlock := func(mt *mtest.T, times, ms int32) {
327+
mt.Helper()
328+
mt.SetFailPoint(failpoint.FailPoint{
329+
ConfigureFailPoint: "failCommand",
330+
Mode: failpoint.Mode{Times: times},
331+
Data: failpoint.Data{
332+
FailCommands: []string{"getMore"},
333+
BlockConnection: true,
334+
BlockTimeMS: ms,
335+
},
336+
})
337+
}
338+
339+
cappedOpts := options.CreateCollection().SetCapped(true).
340+
SetSizeInBytes(1024 * 64)
341+
342+
// TODO(SERVER-96344): mongos doesn't honor a failpoint's full blockTimeMS.
343+
mtOpts := mtest.NewOptions().
344+
ClientOptions(options.Client().SetTimeout(200*time.Millisecond)).
345+
CollectionCreateOptions(cappedOpts).
346+
Topologies(mtest.ReplicaSet, mtest.LoadBalanced, mtest.Single)
347+
348+
mt.RunOpts("find", mtOpts, func(mt *mtest.T) {
349+
initCollection(mt, mt.Coll)
350+
setGetMoreBlock(mt, 1, 30)
351+
352+
opts := options.Find().
353+
SetMaxAwaitTime(100 * time.Millisecond).
354+
SetCursorType(options.TailableAwait)
355+
356+
cursor, err := mt.Coll.Find(context.Background(), bson.D{{"x", 1}}, opts)
357+
require.NoError(mt, err, "Find error: %v", err)
358+
359+
defer func() {
360+
err := cursor.Close(context.Background())
361+
assert.NoError(mt, err, "Close error: %v", err)
362+
}()
363+
364+
// Iterate twice to force a getMore
365+
assert.True(mt, cursor.Next(context.Background()))
366+
367+
mt.ClearEvents()
368+
369+
assert.False(mt, cursor.Next(context.Background()))
370+
371+
require.Error(mt, cursor.Err(), "expected error from cursor.Next")
372+
assert.ErrorIs(mt, cursor.Err(), context.DeadlineExceeded, "expected context deadline exceeded error")
373+
374+
// Collect all started events to find the getMore commands.
375+
startedEvents := mt.GetAllStartedEvents()
376+
377+
var getMoreStartedEvents []*event.CommandStartedEvent
378+
for _, evt := range startedEvents {
379+
if evt.CommandName == "getMore" {
380+
getMoreStartedEvents = append(getMoreStartedEvents, evt)
381+
}
382+
}
383+
384+
require.Len(mt, getMoreStartedEvents, 2, "expected 2 getMore commands, got %d", len(getMoreStartedEvents))
385+
386+
// The first getMore should have a maxTimeMS of <= 100ms.
387+
assert.LessOrEqual(mt, parseMaxAwaitTime(mt, getMoreStartedEvents[0]), int64(100))
388+
389+
// The second getMore should have a maxTimeMS of <=71, indicating that we
390+
// are using the time remaining in the context rather than the
391+
// maxAwaitTimeMS.
392+
assert.LessOrEqual(mt, parseMaxAwaitTime(mt, getMoreStartedEvents[1]), int64(71))
393+
})
394+
395+
mt.RunOpts("aggregate with $changeStream", mtOpts, func(mt *mtest.T) {
396+
initCollection(mt, mt.Coll)
397+
setGetMoreBlock(mt, 1, 30)
398+
399+
opts := options.Aggregate().SetMaxAwaitTime(100 * time.Millisecond)
400+
pipe := mongo.Pipeline{{{"$changeStream", bson.D{}}}}
401+
402+
cursor, err := mt.Coll.Aggregate(context.Background(), pipe, opts)
403+
require.NoError(mt, err, "Aggregate error: %v", err)
404+
405+
defer func() {
406+
err := cursor.Close(context.Background())
407+
assert.NoError(mt, err, "Close error: %v", err)
408+
}()
409+
410+
assert.False(mt, cursor.Next(context.Background()))
411+
412+
require.Error(mt, cursor.Err(), "expected error from cursor.Next")
413+
assert.ErrorIs(mt, cursor.Err(), context.DeadlineExceeded, "expected context deadline exceeded error")
414+
415+
// Collect all started events to find the getMore commands.
416+
startedEvents := mt.GetAllStartedEvents()
417+
418+
var getMoreStartedEvents []*event.CommandStartedEvent
419+
for _, evt := range startedEvents {
420+
if evt.CommandName == "getMore" {
421+
getMoreStartedEvents = append(getMoreStartedEvents, evt)
422+
}
423+
}
424+
425+
require.Len(mt, getMoreStartedEvents, 2, "expected 2 getMore commands, got %d", len(getMoreStartedEvents))
426+
427+
// The first getMore should have a maxTimeMS of <= 100ms.
428+
assert.LessOrEqual(mt, parseMaxAwaitTime(mt, getMoreStartedEvents[0]), int64(100))
429+
430+
// The second getMore should have a maxTimeMS of <=71, indicating that we
431+
// are using the time remaining in the context rather than the
432+
// maxAwaitTimeMS.
433+
assert.LessOrEqual(mt, parseMaxAwaitTime(mt, getMoreStartedEvents[1]), int64(71))
434+
})
435+
436+
mt.RunOpts("runCommandCursor", mtOpts, func(mt *mtest.T) {
437+
initCollection(mt, mt.Coll)
438+
mt.SetFailPoint(failpoint.FailPoint{
439+
ConfigureFailPoint: "failCommand",
440+
Mode: failpoint.Mode{Times: 1},
441+
Data: failpoint.Data{
442+
FailCommands: []string{"find"},
443+
BlockConnection: true,
444+
BlockTimeMS: 30,
445+
},
446+
})
447+
448+
cur, err := mt.DB.RunCommandCursor(context.Background(), bson.D{
449+
{"find", mt.Coll.Name()},
450+
{"filter", bson.D{{"x", 1}}},
451+
{"tailable", true},
452+
{"awaitData", true},
453+
{"batchSize", int32(1)},
454+
})
455+
456+
require.NoError(mt, err, "RunCommandCursor error: %v", err)
457+
458+
defer func() {
459+
err := cur.Close(context.Background())
460+
assert.NoError(mt, err, "Close error: %v", err)
461+
}()
462+
463+
// Iterate twice to force a getMore
464+
assert.True(mt, cur.Next(context.Background()))
465+
466+
mt.ClearEvents()
467+
468+
assert.False(mt, cur.Next(context.Background()))
469+
470+
require.Error(mt, cur.Err(), "expected error from cursor.Next")
471+
assert.ErrorIs(mt, cur.Err(), context.DeadlineExceeded, "expected context deadline exceeded error")
472+
473+
// Collect all started events to find the getMore commands.
474+
startedEvents := mt.GetAllStartedEvents()
475+
476+
var getMoreStartedEvents []*event.CommandStartedEvent
477+
for _, evt := range startedEvents {
478+
if evt.CommandName == "getMore" {
479+
getMoreStartedEvents = append(getMoreStartedEvents, evt)
480+
}
481+
}
482+
483+
require.Len(mt, getMoreStartedEvents, 2, "expected 2 getMore commands, got %d", len(getMoreStartedEvents))
484+
485+
//// The first getMore should have a maxTimeMS of <= 100ms.
486+
//assert.LessOrEqual(mt, parseMaxAwaitTime(mt, getMoreStartedEvents[0]), int64(100))
487+
488+
//// The second getMore should have a maxTimeMS of <=71, indicating that we
489+
//// are using the time remaining in the context rather than the
490+
//// maxAwaitTimeMS.
491+
//assert.LessOrEqual(mt, parseMaxAwaitTime(mt, getMoreStartedEvents[1]), int64(71))
492+
})
493+
}
494+
322495
func TestCursor_tailableAwaitData(t *testing.T) {
323496
mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
324497

mongo/collection.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,6 +1093,11 @@ func aggregate(a aggregateParams, opts ...options.Lister[options.AggregateOption
10931093
return nil, wrapErrors(err)
10941094
}
10951095
cursor, err := newCursorWithSession(bc, a.client.bsonOpts, a.registry, sess,
1096+
1097+
// The only way the server will return a tailable/awaitData cursor for an
1098+
// aggregate operation is for the first stage in the pipeline to
1099+
// be $changeStream, this is the only time maxAwaitTimeMS should be applied.
1100+
// For this reason, we pass the client timeout to the cursor.
10961101
withCursorOptionClientTimeout(a.client.timeout))
10971102
return cursor, wrapErrors(err)
10981103
}

0 commit comments

Comments
 (0)