Skip to content

Commit c3587ec

Browse files
author
Divjot Arora
committed
GODRIVER-1858 Use mock deployment for slow BulkWrite test (#584)
1 parent 9d9808f commit c3587ec

File tree

6 files changed

+55
-25
lines changed

6 files changed

+55
-25
lines changed

mongo/integration/change_stream_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
236236
Code: errorHostUnreachable,
237237
Name: "foo",
238238
Message: "bar",
239+
Labels: []string{"ResumableChangeStreamError"},
239240
})
240241
killCursorsRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
241242
Code: errorInterrupted,

mongo/integration/collection_test.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1661,9 +1661,10 @@ func TestCollection(t *testing.T) {
16611661
mt.Fatalf("expected BulkWrite error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
16621662
}
16631663
})
1664-
mt.Run("insert and delete with batches", func(mt *mtest.T) {
1664+
mt.RunOpts("insert and delete with batches", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
16651665
// grouped together because delete requires the documents to be inserted
1666-
numDocs := 100050
1666+
maxBatchCount := int(mtest.MockDescription.MaxBatchCount)
1667+
numDocs := maxBatchCount + 50
16671668
var insertModels []mongo.WriteModel
16681669
var deleteModels []mongo.WriteModel
16691670
for i := 0; i < numDocs; i++ {
@@ -1675,6 +1676,21 @@ func TestCollection(t *testing.T) {
16751676
insertModels = append(insertModels, mongo.NewInsertOneModel().SetDocument(d))
16761677
deleteModels = append(deleteModels, mongo.NewDeleteOneModel().SetFilter(bson.D{}))
16771678
}
1679+
1680+
// Seed mock responses. Both insert and delete respones look like {ok: 1, n: <inserted/deleted count>}.
1681+
// This loop only creates one set of responses, but the sets for insert and delete should be equivalent,
1682+
// so we can duplicate the generated set before calling mt.AddMockResponses().
1683+
var responses []bson.D
1684+
for i := numDocs; i > 0; i -= maxBatchCount {
1685+
count := maxBatchCount
1686+
if i < maxBatchCount {
1687+
count = i
1688+
}
1689+
res := mtest.CreateSuccessResponse(bson.E{"n", count})
1690+
responses = append(responses, res)
1691+
}
1692+
mt.AddMockResponses(append(responses, responses...)...)
1693+
16781694
mt.ClearEvents()
16791695
res, err := mt.Coll.BulkWrite(mtest.Background, insertModels)
16801696
assert.Nil(mt, err, "BulkWrite error: %v", err)

mongo/integration/mtest/opmsg_deployment.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.mongodb.org/mongo-driver/mongo/description"
1616
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
1717
"go.mongodb.org/mongo-driver/x/mongo/driver"
18+
"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
1819
"go.mongodb.org/mongo-driver/x/mongo/driver/wiremessage"
1920
)
2021

@@ -24,7 +25,22 @@ const (
2425
maxMessageSize uint32 = 48000000
2526
maxBatchCount uint32 = 100000
2627
sessionTimeoutMinutes uint32 = 30
27-
maxWireVersion int32 = 8
28+
)
29+
30+
var (
31+
// MockDescription is the server description used for the mock deployment. Each mocked connection returns this
32+
// value from its Description method.
33+
MockDescription = description.Server{
34+
CanonicalAddr: serverAddress,
35+
MaxDocumentSize: maxDocumentSize,
36+
MaxMessageSize: maxMessageSize,
37+
MaxBatchCount: maxBatchCount,
38+
SessionTimeoutMinutes: sessionTimeoutMinutes,
39+
Kind: description.RSPrimary,
40+
WireVersion: &description.VersionRange{
41+
Max: topology.SupportedWireVersions.Max,
42+
},
43+
}
2844
)
2945

3046
// connection implements the driver.Connection interface and responds to wire messages with pre-configured responses.
@@ -59,17 +75,7 @@ func (c *connection) ReadWireMessage(_ context.Context, dst []byte) ([]byte, err
5975

6076
// Description returns a fixed server description for the connection.
6177
func (c *connection) Description() description.Server {
62-
return description.Server{
63-
CanonicalAddr: serverAddress,
64-
MaxDocumentSize: maxDocumentSize,
65-
MaxMessageSize: maxMessageSize,
66-
MaxBatchCount: maxBatchCount,
67-
SessionTimeoutMinutes: sessionTimeoutMinutes,
68-
Kind: description.RSPrimary,
69-
WireVersion: &description.VersionRange{
70-
Max: maxWireVersion,
71-
},
72-
}
78+
return MockDescription
7379
}
7480

7581
// Close is a no-op operation.

x/mongo/driver/topology/fsm.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,15 @@ import (
1616
"go.mongodb.org/mongo-driver/mongo/description"
1717
)
1818

19-
var supportedWireVersions = description.NewVersionRange(2, 9)
20-
var minSupportedMongoDBVersion = "2.6"
19+
var (
20+
// SupportedWireVersions is the range of wire versions supported by the driver.
21+
SupportedWireVersions = description.NewVersionRange(2, 9)
22+
)
23+
24+
const (
25+
// MinSupportedMongoDBVersion is the version string for the lowest MongoDB version supported by the driver.
26+
MinSupportedMongoDBVersion = "2.6"
27+
)
2128

2229
type fsm struct {
2330
description.Topology
@@ -89,27 +96,27 @@ func (f *fsm) apply(s description.Server) (description.Topology, description.Ser
8996

9097
for _, server := range f.Servers {
9198
if server.WireVersion != nil {
92-
if server.WireVersion.Max < supportedWireVersions.Min {
99+
if server.WireVersion.Max < SupportedWireVersions.Min {
93100
f.compatible.Store(false)
94101
f.compatibilityErr = fmt.Errorf(
95102
"server at %s reports wire version %d, but this version of the Go driver requires "+
96103
"at least %d (MongoDB %s)",
97104
server.Addr.String(),
98105
server.WireVersion.Max,
99-
supportedWireVersions.Min,
100-
minSupportedMongoDBVersion,
106+
SupportedWireVersions.Min,
107+
MinSupportedMongoDBVersion,
101108
)
102109
f.Topology.CompatibilityErr = f.compatibilityErr
103110
return f.Topology, s, nil
104111
}
105112

106-
if server.WireVersion.Min > supportedWireVersions.Max {
113+
if server.WireVersion.Min > SupportedWireVersions.Max {
107114
f.compatible.Store(false)
108115
f.compatibilityErr = fmt.Errorf(
109116
"server at %s requires wire version %d, but this version of the Go driver only supports up to %d",
110117
server.Addr.String(),
111118
server.WireVersion.Min,
112-
supportedWireVersions.Max,
119+
SupportedWireVersions.Max,
113120
)
114121
f.Topology.CompatibilityErr = f.compatibilityErr
115122
return f.Topology, s, nil

x/mongo/driver/topology/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ func (p *processErrorTestConn) Stale() bool {
576576
func (p *processErrorTestConn) Description() description.Server {
577577
return description.Server{
578578
WireVersion: &description.VersionRange{
579-
Max: supportedWireVersions.Max,
579+
Max: SupportedWireVersions.Max,
580580
},
581581
TopologyVersion: p.tv,
582582
}

x/mongo/driver/topology/topology_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestServerSelection(t *testing.T) {
100100
"server at %s requires wire version %d, but this version of the Go driver only supports up to %d",
101101
desc.Servers[0].Addr.String(),
102102
desc.Servers[0].WireVersion.Min,
103-
supportedWireVersions.Max,
103+
SupportedWireVersions.Max,
104104
)
105105
desc.CompatibilityErr = want
106106
atomic.StoreInt32(&topo.connectionstate, connected)
@@ -124,8 +124,8 @@ func TestServerSelection(t *testing.T) {
124124
"at least %d (MongoDB %s)",
125125
desc.Servers[0].Addr.String(),
126126
desc.Servers[0].WireVersion.Max,
127-
supportedWireVersions.Min,
128-
minSupportedMongoDBVersion,
127+
SupportedWireVersions.Min,
128+
MinSupportedMongoDBVersion,
129129
)
130130
desc.CompatibilityErr = want
131131
atomic.StoreInt32(&topo.connectionstate, connected)

0 commit comments

Comments
 (0)