Skip to content

Commit 4597b33

Browse files
author
Divjot Arora
committed
Call Next() on the underlying cursor after a resumeable change stream error.
GODRIVER-746 Change-Id: I4b62daffa4443a3af1775f51bdb63487dd9f571a
1 parent 80e6fbd commit 4597b33

File tree

2 files changed

+101
-83
lines changed

2 files changed

+101
-83
lines changed

mongo/change_stream.go

Lines changed: 62 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -205,42 +205,13 @@ func (cs *changeStream) runCommand(ctx context.Context, replaceOptions bool) err
205205
}
206206
cs.cursor = cursor
207207

208-
// can get resume token from initial aggregate command if non-empty batch
209-
// operationTime from aggregate saved in the session
210208
cursorValue, err := rdr.LookupErr("cursor")
211209
if err != nil {
212210
return err
213211
}
214212
cursorDoc := cursorValue.Document()
215-
216213
cs.ns = command.ParseNamespace(cursorDoc.Lookup("ns").StringValue())
217214

218-
batchVal := cursorDoc.Lookup("firstBatch")
219-
if err != nil {
220-
return err
221-
}
222-
223-
batch := batchVal.Array()
224-
elements, err := batch.Elements()
225-
if err != nil {
226-
return err
227-
}
228-
229-
if len(elements) == 0 {
230-
return nil // no resume token
231-
}
232-
233-
firstElem, err := batch.IndexErr(0)
234-
if err != nil {
235-
return err
236-
}
237-
238-
tokenDoc, err := bsonx.ReadDoc(firstElem.Value().Document().Lookup("_id").Document())
239-
if err != nil {
240-
return err
241-
}
242-
243-
cs.resumeToken = tokenDoc
244215
return nil
245216
}
246217

@@ -398,6 +369,34 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
398369
return cs, nil
399370
}
400371

372+
func (cs *changeStream) storeResumeToken() error {
373+
br, err := cs.cursor.DecodeBytes()
374+
if err != nil {
375+
return err
376+
}
377+
378+
idVal, err := br.LookupErr("_id")
379+
if err != nil {
380+
_ = cs.Close(context.Background())
381+
return ErrMissingResumeToken
382+
}
383+
384+
var idDoc bson.Raw
385+
idDoc, ok := idVal.DocumentOK()
386+
if !ok {
387+
_ = cs.Close(context.Background())
388+
return ErrMissingResumeToken
389+
}
390+
tokenDoc, err := bsonx.ReadDoc(idDoc)
391+
if err != nil {
392+
_ = cs.Close(context.Background())
393+
return ErrMissingResumeToken
394+
}
395+
396+
cs.resumeToken = tokenDoc
397+
return nil
398+
}
399+
401400
func (cs *changeStream) ID() int64 {
402401
if cs.cursor == nil {
403402
return 0
@@ -407,38 +406,45 @@ func (cs *changeStream) ID() int64 {
407406
}
408407

409408
func (cs *changeStream) Next(ctx context.Context) bool {
410-
if cs.cursor == nil {
411-
return false
412-
}
409+
// execute in a loop to retry resume-able errors and advance the underlying cursor
410+
for {
411+
if cs.cursor == nil {
412+
return false
413+
}
413414

414-
if cs.cursor.Next(ctx) {
415-
return true
416-
}
415+
if cs.cursor.Next(ctx) {
416+
err := cs.storeResumeToken()
417+
if err != nil {
418+
cs.err = err
419+
return false
420+
}
417421

418-
err := cs.cursor.Err()
419-
if err == nil {
420-
return false
421-
}
422+
return true
423+
}
422424

423-
switch t := err.(type) {
424-
case command.Error:
425-
if t.Code == errorInterrupted || t.Code == errorCappedPositionLost || t.Code == errorCursorKilled {
425+
err := cs.cursor.Err()
426+
if err == nil {
426427
return false
427428
}
428-
}
429429

430-
killCursors := command.KillCursors{
431-
NS: cs.ns,
432-
IDs: []int64{cs.ID()},
433-
}
430+
switch t := err.(type) {
431+
case command.Error:
432+
if t.Code == errorInterrupted || t.Code == errorCappedPositionLost || t.Code == errorCursorKilled {
433+
return false
434+
}
435+
}
434436

435-
_, _ = driver.KillCursors(ctx, killCursors, cs.client.topology, cs.db.writeSelector)
436-
cs.err = cs.runCommand(ctx, true)
437-
if cs.err != nil {
438-
return false
439-
}
437+
killCursors := command.KillCursors{
438+
NS: cs.ns,
439+
IDs: []int64{cs.ID()},
440+
}
440441

441-
return true
442+
_, _ = driver.KillCursors(ctx, killCursors, cs.client.topology, cs.db.writeSelector)
443+
cs.err = cs.runCommand(ctx, true)
444+
if cs.err != nil {
445+
return false
446+
}
447+
}
442448
}
443449

444450
func (cs *changeStream) Decode(out interface{}) error {
@@ -458,32 +464,11 @@ func (cs *changeStream) DecodeBytes() (bson.Raw, error) {
458464
if cs.cursor == nil {
459465
return nil, ErrNilCursor
460466
}
461-
462-
br, err := cs.cursor.DecodeBytes()
463-
if err != nil {
464-
return nil, err
465-
}
466-
467-
idVal, err := br.LookupErr("_id")
468-
if err != nil {
469-
_ = cs.Close(context.Background())
470-
return nil, ErrMissingResumeToken
471-
}
472-
473-
var idDoc bson.Raw
474-
idDoc, ok := idVal.DocumentOK()
475-
if !ok {
476-
_ = cs.Close(context.Background())
477-
return nil, ErrMissingResumeToken
478-
}
479-
tokenDoc, err := bsonx.ReadDoc(idDoc)
480-
if err != nil {
481-
_ = cs.Close(context.Background())
482-
return nil, ErrMissingResumeToken
467+
if cs.err != nil {
468+
return nil, cs.err
483469
}
484470

485-
cs.resumeToken = tokenDoc
486-
return br, nil
471+
return cs.cursor.DecodeBytes()
487472
}
488473

489474
func (cs *changeStream) Err() error {

mongo/change_stream_test.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -336,13 +336,14 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
336336
coll.writeConcern = wcMajority
337337
_, err := coll.InsertOne(ctx, doc1)
338338
testhelpers.RequireNil(t, err, "error running insertOne: %s", err)
339-
if !stream.Next(ctx) {
340-
t.Fatal("no change found")
341-
}
342339

343-
_, err = stream.DecodeBytes()
344-
if err == nil || err != ErrMissingResumeToken {
345-
t.Fatalf("expected ErrMissingResumeToken, got %s", err)
340+
// Next should set the change stream error and return false if a document is missing the resume token
341+
if stream.Next(ctx) {
342+
t.Fatal("Next returned true, expected false")
343+
}
344+
err = stream.Err()
345+
if err != ErrMissingResumeToken {
346+
t.Fatalf("expected ErrMissingResumeToken, got %v", err)
346347
}
347348
})
348349

@@ -355,6 +356,8 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
355356
startCmd := (<-startedChan).Command
356357
startPipeline := startCmd.Lookup("pipeline").Array()
357358

359+
// make sure resume token is recorded by the change stream because the resume process will hang otherwise
360+
ensureResumeToken(t, coll, stream)
358361
cs := stream.(*changeStream)
359362

360363
kc := command.KillCursors{
@@ -674,4 +677,34 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
674677
testhelpers.RequireNil(t, stream.Err(), "error while reading stream: %v", err)
675678
})
676679
})
680+
681+
t.Run("ResumeErrorCallsNext", func(t *testing.T) {
682+
// Test that the underlying cursor is advanced after a resumeable error occurs.
683+
684+
coll, stream := createCollectionStream(t, "ResumeNextDB", "ResumeNextColl", nil)
685+
defer closeCursor(stream)
686+
ensureResumeToken(t, coll, stream)
687+
688+
// kill the stream's underlying cursor to force a resumeable error
689+
cs := stream.(*changeStream)
690+
kc := command.KillCursors{
691+
NS: cs.ns,
692+
IDs: []int64{cs.ID()},
693+
}
694+
695+
_, err := driver.KillCursors(ctx, kc, cs.client.topology, cs.db.writeSelector)
696+
testhelpers.RequireNil(t, err, "error running killCursors cmd: %s", err)
697+
698+
ensureResumeToken(t, coll, stream)
699+
})
700+
}
701+
702+
// ensure that a resume token has been recorded by a change stream
703+
func ensureResumeToken(t *testing.T, coll *Collection, cs Cursor) {
704+
_, err := coll.InsertOne(ctx, bsonx.Doc{{"ensureResumeToken", bsonx.Int32(1)}})
705+
testhelpers.RequireNil(t, err, "error inserting doc: %v", err)
706+
707+
if !cs.Next(ctx) {
708+
t.Fatal("Next returned false, expected true")
709+
}
677710
}

0 commit comments

Comments
 (0)