Skip to content

Commit 630989b

Browse files
author
Divjot Arora
committed
GODRIVER-1615 Track wire version on ChangeStream
This was needed to fix test errors. Because the deployment was changed and the new deployment closes the connection right after the aggregate, any code that access the wire version after the aggregate finishes would panic. To fix that, we store the wire version of the last used connection on ChangeStream.
1 parent 012750a commit 630989b

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

mongo/change_stream.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type ChangeStream struct {
5959
options *options.ChangeStreamOptions
6060
selector description.ServerSelector
6161
operationTime *primitive.Timestamp
62+
wireVersion *description.VersionRange
6263
}
6364

6465
type changeStreamConfig struct {
@@ -183,11 +184,12 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
183184
}
184185

185186
defer conn.Close()
187+
cs.wireVersion = conn.Description().WireVersion
186188

187189
cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
188190

189191
if resuming {
190-
cs.replaceOptions(ctx, conn.Description().WireVersion) // pass wire version
192+
cs.replaceOptions(ctx, cs.wireVersion) // pass wire version
191193

192194
csOptDoc := cs.createPipelineOptionsDoc()
193195
pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil)
@@ -205,8 +207,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
205207
}
206208

207209
if original := cs.aggregate.Execute(ctx); original != nil {
208-
wireVersion := conn.Description().WireVersion
209-
retryableRead := cs.client.retryReads && wireVersion != nil && wireVersion.Max >= 6
210+
retryableRead := cs.client.retryReads && cs.wireVersion != nil && cs.wireVersion.Max >= 6
210211
if !retryableRead {
211212
cs.err = replaceErrors(original)
212213
return cs.err
@@ -230,9 +231,9 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
230231
break
231232
}
232233
defer conn.Close()
234+
cs.wireVersion = conn.Description().WireVersion
233235

234-
wireVersion := conn.Description().WireVersion
235-
if wireVersion == nil || wireVersion.Max < 6 {
236+
if cs.wireVersion == nil || cs.wireVersion.Max < 6 {
236237
break
237238
}
238239

@@ -258,7 +259,7 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
258259

259260
cs.updatePbrtFromCommand()
260261
if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil &&
261-
cs.options.StartAfter == nil && conn.Description().WireVersion.Max >= 7 &&
262+
cs.options.StartAfter == nil && cs.wireVersion.Max >= 7 &&
262263
cs.emptyBatch() && cs.resumeToken == nil {
263264
cs.operationTime = cs.sess.OperationTime
264265
}

0 commit comments

Comments
 (0)