Skip to content

Commit c563b65

Browse files
cleanup
1 parent e33eaa9 commit c563b65

File tree

6 files changed

+55
-37
lines changed

6 files changed

+55
-37
lines changed

internal/integration/cursor_test.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,6 @@ func TestCursor_tailableAwaitData(t *testing.T) {
349349
// Create a find cursor with a 100ms maxAwaitTimeMS and a tailable awaitData
350350
// cursor type.
351351
opts := options.Find().
352-
SetBatchSize(1).
353352
SetMaxAwaitTime(100 * time.Millisecond).
354353
SetCursorType(options.TailableAwait)
355354

@@ -367,10 +366,10 @@ func TestCursor_tailableAwaitData(t *testing.T) {
367366
defer cancel()
368367

369368
// Iterate twice to force a getMore
370-
cursor.Next(ctx)
369+
assert.True(mt, cursor.Next(ctx), "expected Next to return true, got false")
371370

372371
mt.ClearEvents()
373-
cursor.Next(ctx)
372+
assert.False(mt, cursor.Next(ctx), "expected Next to return true, got false")
374373

375374
require.Error(mt, cursor.Err(), "expected error from cursor.Next")
376375
assert.ErrorIs(mt, cursor.Err(), context.DeadlineExceeded, "expected context deadline exceeded error")
@@ -385,6 +384,8 @@ func TestCursor_tailableAwaitData(t *testing.T) {
385384
}
386385
}
387386

387+
require.Len(mt, getMoreStartedEvents, 2, "expected 2 getMore commands, got %d", len(getMoreStartedEvents))
388+
388389
// The first getMore should have a maxTimeMS of <= 100ms.
389390
assert.LessOrEqual(mt, parseMaxAwaitTime(mt, getMoreStartedEvents[0]), int64(100))
390391

@@ -431,14 +432,9 @@ func TestCursor_tailableAwaitData(t *testing.T) {
431432
// Iterate twice to force a getMore
432433
cursor.Next(context.Background())
433434

434-
// We expect 2 calls to getMore. Since batchSize=1 the first call will
435+
// We expect 2 calls to getMore.
435436
mt.ClearEvents()
436-
cursor.Next(context.Background())
437-
438-
m := make(map[string]any)
439-
440-
err = cursor.Decode(&m)
441-
require.NoError(t, err, "expected to decode a document, got error: %v", err)
437+
assert.False(mt, cursor.Next(context.Background()), "expected Next to return false, got true")
442438

443439
require.Error(mt, cursor.Err(), "expected error from cursor.Next")
444440
assert.ErrorIs(mt, cursor.Err(), context.DeadlineExceeded, "expected context deadline exceeded error")
@@ -453,6 +449,8 @@ func TestCursor_tailableAwaitData(t *testing.T) {
453449
}
454450
}
455451

452+
require.Len(mt, getMoreStartedEvents, 2, "expected 2 getMore commands, got %d", len(getMoreStartedEvents))
453+
456454
// The first getMore should have a maxTimeMS of <= 100ms.
457455
assert.LessOrEqual(mt, parseMaxAwaitTime(mt, getMoreStartedEvents[0]), int64(100))
458456

mongo/client_bulk_write.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,11 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
476476
return err
477477
}
478478
var cursor *Cursor
479-
cursor, err = newCursor(bCursor, mb.client.bsonOpts, mb.client.registry)
479+
cursor, err = newCursor(bCursor, mb.client.bsonOpts, mb.client.registry,
480+
481+
// This op doesn't return a cursor to the user, so setting the client
482+
// timeout should be a no-op.
483+
withCursorOptionClientTimeout(mb.client.timeout))
480484
if err != nil {
481485
return err
482486
}

mongo/collection.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,8 @@ func aggregate(a aggregateParams, opts ...options.Lister[options.AggregateOption
10921092
if err != nil {
10931093
return nil, wrapErrors(err)
10941094
}
1095-
cursor, err := newCursorWithSession(bc, a.client.bsonOpts, a.registry, sess)
1095+
cursor, err := newCursorWithSession(bc, a.client.bsonOpts, a.registry, sess,
1096+
withCursorOptionClientTimeout(a.client.timeout))
10961097
return cursor, wrapErrors(err)
10971098
}
10981099

mongo/cursor.go

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,30 @@ type Cursor struct {
3131
// to Next or TryNext. If continued access is required, a copy must be made.
3232
Current bson.Raw
3333

34-
bc batchCursor
35-
batch *bsoncore.Iterator
36-
batchLength int
37-
bsonOpts *options.BSONOptions
38-
registry *bson.Registry
39-
clientSession *session.Client
40-
clientTimeout *time.Duration
34+
bc batchCursor
35+
batch *bsoncore.Iterator
36+
batchLength int
37+
bsonOpts *options.BSONOptions
38+
registry *bson.Registry
39+
clientSession *session.Client
40+
clientTimeout time.Duration
41+
hasClientTimeout bool
4142

4243
err error
4344
}
4445

4546
type cursorOptions struct {
46-
clientTimeout *time.Duration
47+
clientTimeout time.Duration
48+
hasClientTimeout bool
4749
}
4850

4951
type cursorOption func(*cursorOptions)
5052

5153
func withCursorOptionClientTimeout(dur *time.Duration) cursorOption {
5254
return func(opts *cursorOptions) {
53-
if dur != nil {
54-
opts.clientTimeout = dur
55+
if dur != nil && *dur > 0 {
56+
opts.clientTimeout = *dur
57+
opts.hasClientTimeout = true
5558
}
5659
}
5760
}
@@ -60,8 +63,9 @@ func newCursor(
6063
bc batchCursor,
6164
bsonOpts *options.BSONOptions,
6265
registry *bson.Registry,
66+
opts ...cursorOption,
6367
) (*Cursor, error) {
64-
return newCursorWithSession(bc, bsonOpts, registry, nil)
68+
return newCursorWithSession(bc, bsonOpts, registry, nil, opts...)
6569
}
6670

6771
func newCursorWithSession(
@@ -84,11 +88,12 @@ func newCursorWithSession(
8488
}
8589

8690
c := &Cursor{
87-
bc: bc,
88-
bsonOpts: bsonOpts,
89-
registry: registry,
90-
clientSession: clientSession,
91-
clientTimeout: cursorOpts.clientTimeout,
91+
bc: bc,
92+
bsonOpts: bsonOpts,
93+
registry: registry,
94+
clientSession: clientSession,
95+
clientTimeout: cursorOpts.clientTimeout,
96+
hasClientTimeout: cursorOpts.hasClientTimeout,
9297
}
9398
if bc.ID() == 0 {
9499
c.closeImplicitSession()
@@ -163,11 +168,17 @@ func NewCursorFromDocuments(documents []any, preloadedErr error, registry *bson.
163168
// ID returns the ID of this cursor, or 0 if the cursor has been closed or exhausted.
164169
func (c *Cursor) ID() int64 { return c.bc.ID() }
165170

166-
// Next gets the next document for this cursor. It returns true if there were no errors and the cursor has not been
167-
// exhausted.
171+
// Next gets the next document for this cursor. It returns true if there were no
172+
// errors and the cursor has not been exhausted.
173+
//
174+
// Next blocks until a document is available or an error occurs. If the context
175+
// expires, the cursor's error will be set to ctx.Err(). In case of an error,
176+
// Next will return false.
168177
//
169-
// Next blocks until a document is available or an error occurs. If the context expires, the cursor's error will
170-
// be set to ctx.Err(). In case of an error, Next will return false.
178+
// If MaxAwaitTime is set, the operation will be bound by the Context's
179+
// deadline. If the context does not have a deadline, the operation will be
180+
// bound by the client-level timeout, if one is set. If MaxAwaitTime is greater
181+
// than the user-provided timeout, Next will return false.
171182
//
172183
// If Next returns false, subsequent calls will also return false.
173184
func (c *Cursor) Next(ctx context.Context) bool {
@@ -202,9 +213,9 @@ func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
202213

203214
// If the context does not have a deadline we defer to a client-level timeout,
204215
// if one is set.
205-
if _, ok := ctx.Deadline(); !ok && c.clientTimeout != nil {
216+
if _, ok := ctx.Deadline(); !ok && c.hasClientTimeout {
206217
var cancel context.CancelFunc
207-
ctx, cancel = context.WithTimeout(context.Background(), *c.clientTimeout)
218+
ctx, cancel = context.WithTimeout(ctx, c.clientTimeout)
208219

209220
defer cancel()
210221
}

mongo/database.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ func (db *Database) RunCommandCursor(
306306
closeImplicitSession(sess)
307307
return nil, wrapErrors(err)
308308
}
309-
cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess)
309+
cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess,
310+
withCursorOptionClientTimeout(db.client.timeout))
310311
return cursor, wrapErrors(err)
311312
}
312313

@@ -511,7 +512,8 @@ func (db *Database) ListCollections(
511512
closeImplicitSession(sess)
512513
return nil, wrapErrors(err)
513514
}
514-
cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess)
515+
cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess,
516+
withCursorOptionClientTimeout(db.client.timeout))
515517
return cursor, wrapErrors(err)
516518
}
517519

mongo/index_view.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,9 @@ func (iv IndexView) List(ctx context.Context, opts ...options.Lister[options.Lis
131131
closeImplicitSession(sess)
132132
return nil, wrapErrors(err)
133133
}
134-
cursor, err := newCursorWithSession(bc, iv.coll.bsonOpts, iv.coll.registry, sess)
134+
cursor, err := newCursorWithSession(bc, iv.coll.bsonOpts, iv.coll.registry, sess,
135+
withCursorOptionClientTimeout(iv.coll.client.timeout))
136+
135137
return cursor, wrapErrors(err)
136138
}
137139

0 commit comments

Comments
 (0)