|
| 1 | +// Copyright (C) MongoDB, Inc. 2017-present. |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 4 | +// not use this file except in compliance with the License. You may obtain |
| 5 | +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 |
| 6 | + |
| 7 | +package integration |
| 8 | + |
| 9 | +import ( |
| 10 | + "context" |
| 11 | + "fmt" |
| 12 | + "os" |
| 13 | + "testing" |
| 14 | + "time" |
| 15 | + |
| 16 | + "github.com/mongodb/mongo-go-driver/bson" |
| 17 | + "github.com/mongodb/mongo-go-driver/core/command" |
| 18 | + "github.com/mongodb/mongo-go-driver/core/description" |
| 19 | + "github.com/mongodb/mongo-go-driver/core/event" |
| 20 | + "github.com/mongodb/mongo-go-driver/core/option" |
| 21 | + "github.com/mongodb/mongo-go-driver/internal/testutil" |
| 22 | + "github.com/stretchr/testify/assert" |
| 23 | +) |
| 24 | + |
| 25 | +func initMonitor() (chan *event.CommandStartedEvent, chan *event.CommandSucceededEvent, chan *event.CommandFailedEvent, *event.CommandMonitor) { |
| 26 | + startedChan := make(chan *event.CommandStartedEvent, 100) |
| 27 | + succeededChan := make(chan *event.CommandSucceededEvent, 100) |
| 28 | + failedChan := make(chan *event.CommandFailedEvent, 100) |
| 29 | + monitor := &event.CommandMonitor{ |
| 30 | + Started: func(ctx context.Context, cse *event.CommandStartedEvent) { |
| 31 | + startedChan <- cse |
| 32 | + }, |
| 33 | + Succeeded: func(ctx context.Context, cse *event.CommandSucceededEvent) { |
| 34 | + succeededChan <- cse |
| 35 | + }, |
| 36 | + Failed: func(ctx context.Context, cfe *event.CommandFailedEvent) { |
| 37 | + failedChan <- cfe |
| 38 | + }, |
| 39 | + } |
| 40 | + |
| 41 | + return startedChan, succeededChan, failedChan, monitor |
| 42 | +} |
| 43 | + |
| 44 | +func TestFindPassesMaxAwaitTimeMSThroughToGetMore(t *testing.T) { |
| 45 | + startedChan, succeededChan, failedChan, monitor := initMonitor() |
| 46 | + |
| 47 | + dbName := fmt.Sprintf("mongo-go-driver-%d-find", os.Getpid()) |
| 48 | + colName := testutil.ColName(t) |
| 49 | + |
| 50 | + server, err := testutil.MonitoredTopology(t, dbName, monitor).SelectServer(context.Background(), description.WriteSelector()) |
| 51 | + noerr(t, err) |
| 52 | + |
| 53 | + // create capped collection |
| 54 | + createCmd := bson.NewDocument( |
| 55 | + bson.EC.String("create", colName), |
| 56 | + bson.EC.Boolean("capped", true), |
| 57 | + bson.EC.Int32("size", 1000)) |
| 58 | + _, err = testutil.RunCommand(t, server.Server, dbName, createCmd) |
| 59 | + noerr(t, err) |
| 60 | + |
| 61 | + // insert some documents |
| 62 | + insertCmd := bson.NewDocument( |
| 63 | + bson.EC.String("insert", colName), |
| 64 | + bson.EC.ArrayFromElements("documents", |
| 65 | + bson.VC.Document(bson.NewDocument(bson.EC.Int32("_id", 1))), |
| 66 | + bson.VC.Document(bson.NewDocument(bson.EC.Int32("_id", 2))), |
| 67 | + bson.VC.Document(bson.NewDocument(bson.EC.Int32("_id", 3))), |
| 68 | + bson.VC.Document(bson.NewDocument(bson.EC.Int32("_id", 4))), |
| 69 | + bson.VC.Document(bson.NewDocument(bson.EC.Int32("_id", 5))))) |
| 70 | + _, err = testutil.RunCommand(t, server.Server, dbName, insertCmd) |
| 71 | + |
| 72 | + conn, err := server.Connection(context.Background()) |
| 73 | + noerr(t, err) |
| 74 | + |
| 75 | + // find those documents, setting cursor type to TAILABLEAWAIT |
| 76 | + cursor, err := (&command.Find{ |
| 77 | + NS: command.Namespace{DB: dbName, Collection: colName}, |
| 78 | + Filter: bson.NewDocument(bson.EC.SubDocument("_id", bson.NewDocument(bson.EC.Int32("$gte", 1)))), |
| 79 | + Opts: []option.FindOptioner{ |
| 80 | + option.OptBatchSize(3), |
| 81 | + option.OptMaxAwaitTime(time.Millisecond * 250), |
| 82 | + option.OptCursorType(option.TailableAwait)}, |
| 83 | + }).RoundTrip(context.Background(), server.SelectedDescription(), server, conn) |
| 84 | + noerr(t, err) |
| 85 | + |
| 86 | + // exhaust the cursor, triggering getMore commands |
| 87 | + for i := 0; i < 4; i++ { |
| 88 | + cursor.Next(context.Background()) |
| 89 | + } |
| 90 | + |
| 91 | + // allow for iteration over range chan |
| 92 | + close(startedChan) |
| 93 | + close(succeededChan) |
| 94 | + close(failedChan) |
| 95 | + |
| 96 | + // no commands should have failed |
| 97 | + if len(failedChan) != 0 { |
| 98 | + t.Errorf("%d command(s) failed", len(failedChan)) |
| 99 | + } |
| 100 | + |
| 101 | + // check that the expected commands were started |
| 102 | + for started := range startedChan { |
| 103 | + switch started.CommandName { |
| 104 | + case "find": |
| 105 | + assert.Equal(t, 3, int(started.Command.Lookup("batchSize").Int32())) |
| 106 | + assert.True(t, started.Command.Lookup("tailable").Boolean()) |
| 107 | + assert.True(t, started.Command.Lookup("awaitData").Boolean()) |
| 108 | + assert.Nil(t, started.Command.Lookup("maxAwaitTimeMS"), |
| 109 | + "Should not have sent maxAwaitTimeMS in find command") |
| 110 | + case "getMore": |
| 111 | + assert.Equal(t, 3, int(started.Command.Lookup("batchSize").Int32())) |
| 112 | + assert.Equal(t, 250, int(started.Command.Lookup("maxTimeMS").Int64()), |
| 113 | + "Should have sent maxTimeMS in getMore command") |
| 114 | + default: |
| 115 | + continue |
| 116 | + } |
| 117 | + } |
| 118 | + |
| 119 | + // to keep track of seen documents |
| 120 | + id := 1 |
| 121 | + |
| 122 | + // check expected commands succeeded |
| 123 | + for succeeded := range succeededChan { |
| 124 | + switch succeeded.CommandName { |
| 125 | + case "find": |
| 126 | + assert.Equal(t, 1, int(succeeded.Reply.Lookup("ok").Double())) |
| 127 | + |
| 128 | + actual := succeeded.Reply.Lookup("cursor", "firstBatch").MutableArray() |
| 129 | + |
| 130 | + for i := 0; i < actual.Len(); i++ { |
| 131 | + v, _ := actual.Lookup(uint(i)) |
| 132 | + assert.Equal(t, id, int(v.MutableDocument().Lookup("_id").Int32())) |
| 133 | + id++ |
| 134 | + } |
| 135 | + case "getMore": |
| 136 | + assert.Equal(t, "getMore", succeeded.CommandName) |
| 137 | + assert.Equal(t, 1, int(succeeded.Reply.Lookup("ok").Double())) |
| 138 | + |
| 139 | + actual := succeeded.Reply.Lookup("cursor", "nextBatch").MutableArray() |
| 140 | + |
| 141 | + for i := 0; i < actual.Len(); i++ { |
| 142 | + v, _ := actual.Lookup(uint(i)) |
| 143 | + assert.Equal(t, id, int(v.MutableDocument().Lookup("_id").Int32())) |
| 144 | + id++ |
| 145 | + } |
| 146 | + default: |
| 147 | + continue |
| 148 | + } |
| 149 | + } |
| 150 | + |
| 151 | + if id <= 5 { |
| 152 | + t.Errorf("not all documents returned; last seen id = %d", id-1) |
| 153 | + } |
| 154 | +} |
0 commit comments