Skip to content

Commit 5590b96

Browse files
fixup! Update existing TAD tests; add new ones for short circuiting
1 parent 5006d50 commit 5590b96

File tree

2 files changed

+110
-115
lines changed

2 files changed

+110
-115
lines changed

internal/integration/collection_test.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2028,17 +2028,6 @@ func TestCollection(t *testing.T) {
20282028
})
20292029
}
20302030

2031-
func newCappedCollection(mt *mtest.T, name string) *mongo.Collection {
2032-
// Create a capped collection to test with a tailable awaitData cursor.
2033-
cappedOpts := options.CreateCollection().SetCapped(true).SetSizeInBytes(1024 * 64)
2034-
cappedColl := mt.CreateCollection(mtest.Collection{
2035-
Name: name,
2036-
CreateOpts: cappedOpts,
2037-
}, true)
2038-
2039-
return cappedColl
2040-
}
2041-
20422031
func initCollection(tb testing.TB, coll *mongo.Collection) {
20432032
tb.Helper()
20442033

internal/integration/cursor_test.go

Lines changed: 110 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,16 @@ func parseMaxAwaitTime(mt *mtest.T, evt *event.CommandStartedEvent) int64 {
322322
func TestCursor_tailableAwaitData(t *testing.T) {
323323
mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
324324

325+
cappedOpts := options.CreateCollection().SetCapped(true).
326+
SetSizeInBytes(1024 * 64)
327+
325328
// TODO(SERVER-96344): mongos doesn't honor a failpoint's full blockTimeMS.
326329
mtOpts := mtest.NewOptions().MinServerVersion("4.4").
327-
Topologies(mtest.ReplicaSet, mtest.LoadBalanced, mtest.Single)
330+
Topologies(mtest.ReplicaSet, mtest.LoadBalanced, mtest.Single).
331+
CollectionCreateOptions(cappedOpts)
328332

329333
mt.RunOpts("apply remaining timeoutMS if less than maxAwaitTimeMS", mtOpts, func(mt *mtest.T) {
330-
cappedColl := newCappedCollection(mt, "tailable_awaitData_capped")
331-
initCollection(mt, cappedColl)
334+
initCollection(mt, mt.Coll)
332335

333336
// Create a 30ms failpoint for getMore.
334337
mt.SetFailPoint(failpoint.FailPoint{
@@ -350,7 +353,7 @@ func TestCursor_tailableAwaitData(t *testing.T) {
350353
SetMaxAwaitTime(100 * time.Millisecond).
351354
SetCursorType(options.TailableAwait)
352355

353-
cursor, err := cappedColl.Find(context.Background(), bson.D{{"x", 2}}, opts)
356+
cursor, err := mt.Coll.Find(context.Background(), bson.D{{"x", 2}}, opts)
354357
require.NoError(mt, err)
355358

356359
defer cursor.Close(context.Background())
@@ -424,121 +427,124 @@ func TestCursor_tailableAwaitData(t *testing.T) {
424427

425428
assert.LessOrEqual(mt, got, int64(50))
426429
})
430+
}
427431

428-
mt.Run("short-circuiting getMore", func(mt *mtest.T) {
429-
tests := []struct {
430-
name string
431-
deadline time.Duration
432-
maxAwaitTime time.Duration
433-
wantShortCircuit bool
434-
}{
435-
{
436-
name: "maxAwaitTime less than operation timeout",
437-
deadline: 200 * time.Millisecond,
438-
maxAwaitTime: 100 * time.Millisecond,
439-
wantShortCircuit: false,
440-
},
441-
{
442-
name: "maxAwaitTime equal to operation timeout",
443-
deadline: 200 * time.Millisecond,
444-
maxAwaitTime: 200 * time.Millisecond,
445-
wantShortCircuit: true,
446-
},
447-
{
448-
name: "maxAwaitTime greater than operation timeout",
449-
deadline: 200 * time.Millisecond,
450-
maxAwaitTime: 300 * time.Millisecond,
451-
wantShortCircuit: true,
452-
},
453-
}
432+
func TestCursor_tailableAwaitData_ShortCircuitingGetMore(t *testing.T) {
433+
mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
434+
435+
cappedOpts := options.CreateCollection().SetCapped(true).
436+
SetSizeInBytes(1024 * 64)
437+
438+
mtOpts := mtest.NewOptions().CollectionCreateOptions(cappedOpts)
439+
tests := []struct {
440+
name string
441+
deadline time.Duration
442+
maxAwaitTime time.Duration
443+
wantShortCircuit bool
444+
}{
445+
{
446+
name: "maxAwaitTime less than operation timeout",
447+
deadline: 200 * time.Millisecond,
448+
maxAwaitTime: 100 * time.Millisecond,
449+
wantShortCircuit: false,
450+
},
451+
{
452+
name: "maxAwaitTime equal to operation timeout",
453+
deadline: 200 * time.Millisecond,
454+
maxAwaitTime: 200 * time.Millisecond,
455+
wantShortCircuit: true,
456+
},
457+
{
458+
name: "maxAwaitTime greater than operation timeout",
459+
deadline: 200 * time.Millisecond,
460+
maxAwaitTime: 300 * time.Millisecond,
461+
wantShortCircuit: true,
462+
},
463+
}
464+
465+
for _, tt := range tests {
466+
mt.Run(tt.name, func(mt *mtest.T) {
467+
mt.RunOpts("find", mtOpts, func(mt *mtest.T) {
468+
initCollection(mt, mt.Coll)
469+
470+
// Create a find cursor
471+
opts := options.Find().
472+
SetBatchSize(1).
473+
SetMaxAwaitTime(tt.maxAwaitTime).
474+
SetCursorType(options.TailableAwait)
475+
476+
ctx, cancel := context.WithTimeout(context.Background(), tt.deadline)
477+
defer cancel()
478+
479+
cur, err := mt.Coll.Find(ctx, bson.D{{Key: "x", Value: 3}}, opts)
480+
require.NoError(mt, err, "Find error: %v", err)
481+
482+
// Close to return the session to the pool.
483+
defer cur.Close(context.Background())
484+
485+
ok := cur.Next(ctx)
486+
if tt.wantShortCircuit {
487+
assert.False(mt, ok, "expected Next to return false, got true")
488+
assert.EqualError(t, cur.Err(), "MaxAwaitTime must be less than the operation timeout")
489+
} else {
490+
assert.True(mt, ok, "expected Next to return true, got false")
491+
assert.NoError(mt, cur.Err(), "expected no error, got %v", cur.Err())
492+
}
493+
})
454494

455-
for _, tt := range tests {
456-
mt.Run(tt.name, func(mt *mtest.T) {
457-
mt.Run("find", func(mt *mtest.T) {
458-
cappedColl := newCappedCollection(mt, "xtailable_awaitData_capped")
459-
initCollection(mt, cappedColl)
495+
mt.RunOpts("aggregate", mtOpts, func(mt *mtest.T) {
496+
initCollection(mt, mt.Coll)
460497

461-
// Create a find cursor
462-
opts := options.Find().
463-
SetBatchSize(1).
464-
SetMaxAwaitTime(tt.maxAwaitTime).
465-
SetCursorType(options.TailableAwait)
498+
// Create a find cursor
499+
opts := options.Aggregate().
500+
SetBatchSize(1).
501+
SetMaxAwaitTime(tt.maxAwaitTime)
466502

467-
ctx, cancel := context.WithTimeout(context.Background(), tt.deadline)
468-
defer cancel()
503+
ctx, cancel := context.WithTimeout(context.Background(), tt.deadline)
504+
defer cancel()
469505

470-
cur, err := cappedColl.Find(ctx, bson.D{{Key: "x", Value: 3}}, opts)
471-
require.NoError(mt, err, "Find error: %v", err)
506+
cur, err := mt.Coll.Aggregate(ctx, []bson.D{}, opts)
507+
require.NoError(mt, err, "Aggregate error: %v", err)
472508

473-
// Close to return the session to the pool.
474-
defer cur.Close(context.Background())
509+
// Close to return the session to the pool.
510+
defer cur.Close(context.Background())
475511

476-
ok := cur.Next(ctx)
477-
if tt.wantShortCircuit {
478-
assert.False(mt, ok, "expected Next to return false, got true")
479-
assert.EqualError(t, cur.Err(), "MaxAwaitTime must be less than the operation timeout")
480-
} else {
481-
assert.True(mt, ok, "expected Next to return true, got false")
482-
assert.NoError(mt, cur.Err(), "expected no error, got %v", cur.Err())
483-
}
484-
})
512+
ok := cur.Next(ctx)
513+
if tt.wantShortCircuit {
514+
assert.False(mt, ok, "expected Next to return false, got true")
515+
assert.EqualError(t, cur.Err(), "MaxAwaitTime must be less than the operation timeout")
516+
} else {
517+
assert.True(mt, ok, "expected Next to return true, got false")
518+
assert.NoError(mt, cur.Err(), "expected no error, got %v", cur.Err())
519+
}
520+
})
485521

486-
mt.Run("aggregate", func(mt *mtest.T) {
487-
cappedColl := newCappedCollection(mt, "xtailable_awaitData_capped")
488-
initCollection(mt, cappedColl)
522+
// The $changeStream stage is only supported on replica sets.
523+
watchOpts := mtOpts.Topologies(mtest.ReplicaSet, mtest.Sharded)
524+
mt.RunOpts("watch", watchOpts, func(mt *mtest.T) {
525+
initCollection(mt, mt.Coll)
489526

490-
// Create a find cursor
491-
opts := options.Aggregate().
492-
SetBatchSize(1).
493-
SetMaxAwaitTime(tt.maxAwaitTime)
527+
// Create a find cursor
528+
opts := options.ChangeStream().SetMaxAwaitTime(tt.maxAwaitTime)
494529

495-
ctx, cancel := context.WithTimeout(context.Background(), tt.deadline)
496-
defer cancel()
530+
ctx, cancel := context.WithTimeout(context.Background(), tt.deadline)
531+
defer cancel()
497532

498-
cur, err := cappedColl.Aggregate(ctx, []bson.D{}, opts)
499-
require.NoError(mt, err, "Aggregate error: %v", err)
533+
cur, err := mt.Coll.Watch(ctx, []bson.D{}, opts)
534+
require.NoError(mt, err, "Watch error: %v", err)
500535

501-
// Close to return the session to the pool.
502-
defer cur.Close(context.Background())
536+
// Close to return the session to the pool.
537+
defer cur.Close(context.Background())
503538

539+
if tt.wantShortCircuit {
504540
ok := cur.Next(ctx)
505-
if tt.wantShortCircuit {
506-
assert.False(mt, ok, "expected Next to return false, got true")
507-
assert.EqualError(t, cur.Err(), "MaxAwaitTime must be less than the operation timeout")
508-
} else {
509-
assert.True(mt, ok, "expected Next to return true, got false")
510-
assert.NoError(mt, cur.Err(), "expected no error, got %v", cur.Err())
511-
}
512-
})
513-
514-
// The $changeStream stage is only supported on replica sets.
515-
watchOpts := mtest.NewOptions().Topologies(mtest.ReplicaSet, mtest.Sharded)
516-
mt.RunOpts("watch", watchOpts, func(mt *mtest.T) {
517-
cappedColl := newCappedCollection(mt, "xtailable_awaitData_capped")
518-
initCollection(mt, cappedColl)
519-
520-
// Create a find cursor
521-
opts := options.ChangeStream().SetMaxAwaitTime(tt.maxAwaitTime)
522-
523-
ctx, cancel := context.WithTimeout(context.Background(), tt.deadline)
524-
defer cancel()
525-
526-
cur, err := cappedColl.Watch(ctx, []bson.D{}, opts)
527-
require.NoError(mt, err, "Watch error: %v", err)
528-
529-
// Close to return the session to the pool.
530-
defer cur.Close(context.Background())
531-
532-
if tt.wantShortCircuit {
533-
ok := cur.Next(ctx)
534-
535-
assert.False(mt, ok, "expected Next to return false, got true")
536-
assert.EqualError(mt, cur.Err(), "MaxAwaitTime must be less than the operation timeout")
537-
}
538-
})
541+
542+
assert.False(mt, ok, "expected Next to return false, got true")
543+
assert.EqualError(mt, cur.Err(), "MaxAwaitTime must be less than the operation timeout")
544+
}
539545
})
540-
}
541-
})
546+
})
547+
}
542548
}
543549

544550
type tryNextCursor interface {

0 commit comments

Comments
 (0)