Skip to content

Commit bf49d4a

Browse files
author
Divjot Arora
committed
GODRIVER-1858 Use mock deployment for slow BulkWrite test (#584)
1 parent 3dabd1e commit bf49d4a

File tree

4 files changed

+51
-21
lines changed

4 files changed

+51
-21
lines changed

mongo/integration/change_stream_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
236236
Code: errorHostUnreachable,
237237
Name: "foo",
238238
Message: "bar",
239+
Labels: []string{"ResumableChangeStreamError"},
239240
})
240241
killCursorsRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
241242
Code: errorInterrupted,

mongo/integration/collection_test.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1354,9 +1354,10 @@ func TestCollection(t *testing.T) {
13541354
mt.Fatalf("expected BulkWrite error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
13551355
}
13561356
})
1357-
mt.Run("insert and delete with batches", func(mt *mtest.T) {
1357+
mt.RunOpts("insert and delete with batches", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
13581358
// grouped together because delete requires the documents to be inserted
1359-
numDocs := 100050
1359+
maxBatchCount := int(mtest.MockDescription.MaxBatchCount)
1360+
numDocs := maxBatchCount + 50
13601361
var insertModels []mongo.WriteModel
13611362
var deleteModels []mongo.WriteModel
13621363
for i := 0; i < numDocs; i++ {
@@ -1368,6 +1369,21 @@ func TestCollection(t *testing.T) {
13681369
insertModels = append(insertModels, mongo.NewInsertOneModel().SetDocument(d))
13691370
deleteModels = append(deleteModels, mongo.NewDeleteOneModel().SetFilter(bson.D{}))
13701371
}
1372+
1373+
// Seed mock responses. Both insert and delete respones look like {ok: 1, n: <inserted/deleted count>}.
1374+
// This loop only creates one set of responses, but the sets for insert and delete should be equivalent,
1375+
// so we can duplicate the generated set before calling mt.AddMockResponses().
1376+
var responses []bson.D
1377+
for i := numDocs; i > 0; i -= maxBatchCount {
1378+
count := maxBatchCount
1379+
if i < maxBatchCount {
1380+
count = i
1381+
}
1382+
res := mtest.CreateSuccessResponse(bson.E{"n", count})
1383+
responses = append(responses, res)
1384+
}
1385+
mt.AddMockResponses(append(responses, responses...)...)
1386+
13711387
mt.ClearEvents()
13721388
res, err := mt.Coll.BulkWrite(mtest.Background, insertModels)
13731389
assert.Nil(mt, err, "BulkWrite error: %v", err)

mongo/integration/mtest/opmsg_deployment.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.mongodb.org/mongo-driver/x/mongo/driver"
1616
"go.mongodb.org/mongo-driver/x/mongo/driver/address"
1717
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
18+
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
1819
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
1920
)
2021

@@ -24,7 +25,22 @@ const (
2425
maxMessageSize uint32 = 48000000
2526
maxBatchCount uint32 = 100000
2627
sessionTimeoutMinutes uint32 = 30
27-
maxWireVersion int32 = 8
28+
)
29+
30+
var (
31+
// MockDescription is the server description used for the mock deployment. Each mocked connection returns this
32+
// value from its Description method.
33+
MockDescription = description.Server{
34+
CanonicalAddr: serverAddress,
35+
MaxDocumentSize: maxDocumentSize,
36+
MaxMessageSize: maxMessageSize,
37+
MaxBatchCount: maxBatchCount,
38+
SessionTimeoutMinutes: sessionTimeoutMinutes,
39+
Kind: description.RSPrimary,
40+
WireVersion: &description.VersionRange{
41+
Max: topology.SupportedWireVersions.Max,
42+
},
43+
}
2844
)
2945

3046
// connection implements the driver.Connection interface and responds to wire messages with pre-configured responses.
@@ -59,17 +75,7 @@ func (c *connection) ReadWireMessage(_ context.Context, dst []byte) ([]byte, err
5975

6076
// Description returns a fixed server description for the connection.
6177
func (c *connection) Description() description.Server {
62-
return description.Server{
63-
CanonicalAddr: serverAddress,
64-
MaxDocumentSize: maxDocumentSize,
65-
MaxMessageSize: maxMessageSize,
66-
MaxBatchCount: maxBatchCount,
67-
SessionTimeoutMinutes: sessionTimeoutMinutes,
68-
Kind: description.RSPrimary,
69-
WireVersion: &description.VersionRange{
70-
Max: maxWireVersion,
71-
},
72-
}
78+
return MockDescription
7379
}
7480

7581
// Close is a no-op operation.

x/mongo/driver/topology/fsm.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,15 @@ import (
1616
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
1717
)
1818

19-
var supportedWireVersions = description.NewVersionRange(2, 9)
20-
var minSupportedMongoDBVersion = "2.6"
19+
var (
20+
// SupportedWireVersions is the range of wire versions supported by the driver.
21+
SupportedWireVersions = description.NewVersionRange(2, 9)
22+
)
23+
24+
const (
25+
// MinSupportedMongoDBVersion is the version string for the lowest MongoDB version supported by the driver.
26+
MinSupportedMongoDBVersion = "2.6"
27+
)
2128

2229
type fsm struct {
2330
description.Topology
@@ -89,26 +96,26 @@ func (f *fsm) apply(s description.Server) (description.Topology, description.Ser
8996

9097
for _, server := range f.Servers {
9198
if server.WireVersion != nil {
92-
if server.WireVersion.Max < supportedWireVersions.Min {
99+
if server.WireVersion.Max < SupportedWireVersions.Min {
93100
f.compatible.Store(false)
94101
f.compatibilityErr = fmt.Errorf(
95102
"server at %s reports wire version %d, but this version of the Go driver requires "+
96103
"at least %d (MongoDB %s)",
97104
server.Addr.String(),
98105
server.WireVersion.Max,
99-
supportedWireVersions.Min,
100-
minSupportedMongoDBVersion,
106+
SupportedWireVersions.Min,
107+
MinSupportedMongoDBVersion,
101108
)
102109
return description.Topology{}, s, f.compatibilityErr
103110
}
104111

105-
if server.WireVersion.Min > supportedWireVersions.Max {
112+
if server.WireVersion.Min > SupportedWireVersions.Max {
106113
f.compatible.Store(false)
107114
f.compatibilityErr = fmt.Errorf(
108115
"server at %s requires wire version %d, but this version of the Go driver only supports up to %d",
109116
server.Addr.String(),
110117
server.WireVersion.Min,
111-
supportedWireVersions.Max,
118+
SupportedWireVersions.Max,
112119
)
113120
return description.Topology{}, s, f.compatibilityErr
114121
}

0 commit comments

Comments
 (0)