Skip to content

Commit ff0d3c2

Browse files
Use mongoutil.TimeoutWithinContext over validChangeStreamTimeouts for change streams
1 parent 540e8af commit ff0d3c2

File tree

2 files changed

+26
-127
lines changed

2 files changed

+26
-127
lines changed

mongo/change_stream.go

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"fmt"
1313
"reflect"
1414
"strconv"
15-
"time"
1615

1716
"go.mongodb.org/mongo-driver/v2/bson"
1817
"go.mongodb.org/mongo-driver/v2/internal/csot"
@@ -103,33 +102,6 @@ type changeStreamConfig struct {
103102
crypt driver.Crypt
104103
}
105104

106-
// validChangeStreamTimeouts will return "false" if maxAwaitTimeMS is set,
107-
// timeoutMS is set to a non-zero value, and maxAwaitTimeMS is greater than or
108-
// equal to timeoutMS. Otherwise, the timeouts are valid.
109-
func validChangeStreamTimeouts(ctx context.Context, cs *ChangeStream) bool {
110-
if cs.options == nil || cs.client == nil {
111-
return true
112-
}
113-
114-
maxAwaitTime := cs.options.MaxAwaitTime
115-
timeout := cs.client.timeout
116-
117-
if maxAwaitTime == nil {
118-
return true
119-
}
120-
121-
if deadline, ok := ctx.Deadline(); ok {
122-
ctxTimeout := time.Until(deadline)
123-
timeout = &ctxTimeout
124-
}
125-
126-
if timeout == nil {
127-
return true
128-
}
129-
130-
return *timeout <= 0 || *maxAwaitTime < *timeout
131-
}
132-
133105
func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
134106
opts ...options.Lister[options.ChangeStreamOptions]) (*ChangeStream, error) {
135107
if ctx == nil {
@@ -696,10 +668,33 @@ func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
696668
}
697669

698670
func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
699-
if !validChangeStreamTimeouts(ctx, cs) {
700-
cs.err = fmt.Errorf("MaxAwaitTime must be less than the operation timeout")
671+
// To avoid unnecessary socket timeouts, we attempt to short-circuit tailable
672+
// awaitData "getMore" operations by ensuring that the maxAwaitTimeMS is less
673+
// than the operation timeout.
674+
//
675+
// The specifications assume that drivers iteratively apply the timeout
676+
// provided at the constructor level (e.g., (*collection).Find) for tailable
677+
// awaitData cursors:
678+
//
679+
// If set, drivers MUST apply the timeoutMS option to the initial aggregate
680+
// operation. Drivers MUST also apply the original timeoutMS value to each
681+
// next call on the change stream but MUST NOT use it to derive a maxTimeMS
682+
// field for getMore commands.
683+
//
684+
// The Go Driver might decide to support the above behavior with DRIVERS-2722.
685+
// The principal concern is that it would be unexpected for users to apply an
686+
// operation-level timeout via contexts to a constructor and then that timeout
687+
// later be applied while working with a resulting cursor. Instead, it is more
688+
// idiomatic to apply the timeout to the context passed to Next or TryNext.
689+
if cs.options != nil && !nonBlocking {
690+
maxAwaitTime := cs.cursorOptions.MaxAwaitTime
691+
692+
// If maxAwaitTime is not set, this check is unnecessary.
693+
if maxAwaitTime != nil && !mongoutil.TimeoutWithinContext(ctx, *maxAwaitTime) {
694+
cs.err = fmt.Errorf("MaxAwaitTime must be less than the operation timeout")
701695

702-
return
696+
return
697+
}
703698
}
704699

705700
// Apply the client-level timeout if the operation-level timeout is not set.

mongo/change_stream_test.go

Lines changed: 0 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,9 @@
77
package mongo
88

99
import (
10-
"context"
1110
"testing"
12-
"time"
1311

1412
"go.mongodb.org/mongo-driver/v2/internal/assert"
15-
"go.mongodb.org/mongo-driver/v2/mongo/options"
1613
)
1714

1815
func TestChangeStream(t *testing.T) {
@@ -30,96 +27,3 @@ func TestChangeStream(t *testing.T) {
3027
assert.Nil(t, err, "Close error: %v", err)
3128
})
3229
}
33-
34-
func TestValidChangeStreamTimeouts(t *testing.T) {
35-
t.Parallel()
36-
37-
newDurPtr := func(dur time.Duration) *time.Duration {
38-
return &dur
39-
}
40-
41-
tests := []struct {
42-
name string
43-
parent context.Context
44-
maxAwaitTimeout, timeout *time.Duration
45-
wantTimeout time.Duration
46-
want bool
47-
}{
48-
{
49-
name: "no context deadline and no timeouts",
50-
parent: context.Background(),
51-
maxAwaitTimeout: nil,
52-
timeout: nil,
53-
wantTimeout: 0,
54-
want: true,
55-
},
56-
{
57-
name: "no context deadline and maxAwaitTimeout",
58-
parent: context.Background(),
59-
maxAwaitTimeout: newDurPtr(1),
60-
timeout: nil,
61-
wantTimeout: 0,
62-
want: true,
63-
},
64-
{
65-
name: "no context deadline and timeout",
66-
parent: context.Background(),
67-
maxAwaitTimeout: nil,
68-
timeout: newDurPtr(1),
69-
wantTimeout: 0,
70-
want: true,
71-
},
72-
{
73-
name: "no context deadline and maxAwaitTime gt timeout",
74-
parent: context.Background(),
75-
maxAwaitTimeout: newDurPtr(2),
76-
timeout: newDurPtr(1),
77-
wantTimeout: 0,
78-
want: false,
79-
},
80-
{
81-
name: "no context deadline and maxAwaitTime lt timeout",
82-
parent: context.Background(),
83-
maxAwaitTimeout: newDurPtr(1),
84-
timeout: newDurPtr(2),
85-
wantTimeout: 0,
86-
want: true,
87-
},
88-
{
89-
name: "no context deadline and maxAwaitTime eq timeout",
90-
parent: context.Background(),
91-
maxAwaitTimeout: newDurPtr(1),
92-
timeout: newDurPtr(1),
93-
wantTimeout: 0,
94-
want: false,
95-
},
96-
{
97-
name: "no context deadline and maxAwaitTime with negative timeout",
98-
parent: context.Background(),
99-
maxAwaitTimeout: newDurPtr(1),
100-
timeout: newDurPtr(-1),
101-
wantTimeout: 0,
102-
want: true,
103-
},
104-
}
105-
106-
for _, test := range tests {
107-
test := test // Capture the range variable
108-
109-
t.Run(test.name, func(t *testing.T) {
110-
t.Parallel()
111-
112-
cs := &ChangeStream{
113-
options: &options.ChangeStreamOptions{
114-
MaxAwaitTime: test.maxAwaitTimeout,
115-
},
116-
client: &Client{
117-
timeout: test.timeout,
118-
},
119-
}
120-
121-
got := validChangeStreamTimeouts(test.parent, cs)
122-
assert.Equal(t, test.want, got)
123-
})
124-
}
125-
}

0 commit comments

Comments
 (0)