Skip to content

Commit 471227f

Browse files
Extend mongo.batchCursor with MaxAwaitTime for validation
1 parent ff0d3c2 commit 471227f

File tree

4 files changed

+41
-0
lines changed

4 files changed

+41
-0
lines changed

mongo/batch_cursor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ type batchCursor interface {
5151
// SetComment will set a user-configurable comment that can be used to
5252
// identify the operation in server logs.
5353
SetComment(interface{})
54+
55+
// MaxAwaitTime returns the maximum amount of time the server will allow
56+
// the operations to execute. This is only valid for tailable awaitData
57+
// cursors.
58+
MaxAwaitTime() *time.Duration
5459
}
5560

5661
// changeStreamCursor is the interface implemented by batch cursors that also provide the functionality for retrieving

mongo/cursor.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"time"
1717

1818
"go.mongodb.org/mongo-driver/v2/bson"
19+
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
1920
"go.mongodb.org/mongo-driver/v2/mongo/options"
2021
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
2122
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
@@ -175,6 +176,32 @@ func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
175176
if ctx == nil {
176177
ctx = context.Background()
177178
}
179+
180+
// To avoid unnecessary socket timeouts, we attempt to short-circuit tailable
181+
// awaitData "getMore" operations by ensuring that the maxAwaitTimeMS is less
182+
// than the operation timeout.
183+
//
184+
// The specifications assume that drivers iteratively apply the timeout
185+
// provided at the constructor level (e.g., (*collection).Find) for tailable
186+
// awaitData cursors:
187+
//
188+
// If set, drivers MUST apply the timeoutMS option to the initial aggregate
189+
// operation. Drivers MUST also apply the original timeoutMS value to each
190+
// next call on the change stream but MUST NOT use it to derive a maxTimeMS
191+
// field for getMore commands.
192+
//
193+
// The Go Driver might decide to support the above behavior with DRIVERS-2722.
194+
// The principal concern is that it would be unexpected for users to apply an
195+
// operation-level timeout via contexts to a constructor and then that timeout
196+
// later be applied while working with a resulting cursor. Instead, it is more
197+
// idiomatic to apply the timeout to the context passed to Next or TryNext.
198+
maxAwaitTime := c.bc.MaxAwaitTime() //
199+
if maxAwaitTime != nil && !nonBlocking && !mongoutil.TimeoutWithinContext(ctx, *maxAwaitTime) {
200+
c.err = fmt.Errorf("MaxAwaitTime must be less than the operation timeout")
201+
202+
return false
203+
}
204+
178205
val, err := c.batch.Next()
179206
switch {
180207
case err == nil:

mongo/cursor_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type testBatchCursor struct {
2727
closed bool
2828
}
2929

30+
var _ batchCursor = (*testBatchCursor)(nil)
31+
3032
func newTestBatchCursor(numBatches, batchSize int) *testBatchCursor {
3133
batches := make([]*bsoncore.Iterator, 0, numBatches)
3234

@@ -99,6 +101,7 @@ func (tbc *testBatchCursor) Close(context.Context) error {
99101
func (tbc *testBatchCursor) SetBatchSize(int32) {}
100102
func (tbc *testBatchCursor) SetComment(interface{}) {}
101103
func (tbc *testBatchCursor) SetMaxAwaitTime(time.Duration) {}
104+
func (tbc *testBatchCursor) MaxAwaitTime() *time.Duration { return nil }
102105

103106
func TestCursor(t *testing.T) {
104107
t.Run("TestAll", func(t *testing.T) {

x/mongo/driver/batch_cursor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,12 @@ func (bc *BatchCursor) getOperationDeployment() Deployment {
545545
return SingleServerDeployment{bc.server}
546546
}
547547

548+
// MaxAwaitTime returns the maximum amount of time the server will allow
549+
// the operations to execute. This is only valid for tailable awaitData cursors.
550+
func (bc *BatchCursor) MaxAwaitTime() *time.Duration {
551+
return bc.maxAwaitTime
552+
}
553+
548554
// loadBalancedCursorDeployment is used as a Deployment for getMore and killCursors commands when pinning to a
549555
// connection in load balanced mode. This type also functions as an ErrorProcessor to ensure that SDAM errors are
550556
// handled for these commands in this mode.

0 commit comments

Comments
 (0)