Skip to content

Commit 0cdb185

Browse files
GODRIVER-2166 Await server saturation and generalize threaded operations (#943)
1 parent cb28f46 commit 0cdb185

File tree

1 file changed

+98
-51
lines changed

1 file changed

+98
-51
lines changed

mongo/integration/server_selection_prose_test.go

Lines changed: 98 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"context"
1111
"sync"
1212
"testing"
13+
"time"
1314

1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
@@ -21,9 +22,90 @@ import (
2122
"go.mongodb.org/mongo-driver/mongo/options"
2223
)
2324

25+
type saturatedConnections map[uint64]bool
26+
27+
// saturatedHosts is used to maintain information about events with specific host+pool combinations.
28+
type saturatedHosts map[string]saturatedConnections
29+
30+
func (set saturatedHosts) add(host string, connectionID uint64) {
31+
if set[host] == nil {
32+
set[host] = make(saturatedConnections)
33+
}
34+
set[host][connectionID] = true
35+
}
36+
37+
// isSaturated returns true when each client on the cluster URI has a tolerable number of ready connections.
38+
func (set saturatedHosts) isSaturated(tolerance uint64) bool {
39+
for _, host := range options.Client().ApplyURI(mtest.ClusterURI()).Hosts {
40+
if cxns := set[host]; cxns == nil || uint64(len(cxns)) < tolerance {
41+
return false
42+
}
43+
}
44+
return true
45+
}
46+
47+
// awaitSaturation uses CMAP events to ensure that the client's connection pools for N-mongoses have been saturated.
48+
// The qualification for a host to be "saturated" is for each host on the client to have a tolerable number of ready
49+
// connections.
50+
func awaitSaturation(ctx context.Context, mt *mtest.T, monitor *monitor.TestPoolMonitor, tolerance uint64) error {
51+
set := make(saturatedHosts)
52+
var err error
53+
for !set.isSaturated(tolerance) {
54+
if err = ctx.Err(); err != nil {
55+
break
56+
}
57+
if err = mt.Coll.FindOne(ctx, bson.D{}).Err(); err != nil {
58+
break
59+
}
60+
monitor.Events(func(evt *event.PoolEvent) bool {
61+
// Add host only when the connection is ready for use.
62+
if evt.Type == event.ConnectionReady {
63+
set.add(evt.Address, evt.ConnectionID)
64+
}
65+
return true
66+
})
67+
}
68+
return err
69+
}
70+
71+
// runsServerSelection will run opCount-many `FindOne` operations within threadCount-many go routines. The purpose of
72+
// this is to test the reliability of the server selection algorithm, which can be verified with the `counts` map and
73+
// `event.PoolEvent` slice.
74+
func runsServerSelection(mt *mtest.T, monitor *monitor.TestPoolMonitor,
75+
threadCount, opCount int) (map[string]int, []*event.PoolEvent) {
76+
var wg sync.WaitGroup
77+
for i := 0; i < threadCount; i++ {
78+
wg.Add(1)
79+
go func() {
80+
defer wg.Done()
81+
82+
for i := 0; i < opCount; i++ {
83+
res := mt.Coll.FindOne(context.Background(), bson.D{})
84+
assert.NoError(mt.T, res.Err(), "FindOne() error for Collection '%s'", mt.Coll.Name())
85+
}
86+
}()
87+
}
88+
wg.Wait()
89+
90+
// Get all checkOut events and calculate the number of times each server was selected. The prose test spec says to
91+
// use command monitoring events, but those don't include the server address, so use checkOut events instead.
92+
checkOutEvents := monitor.Events(func(evt *event.PoolEvent) bool {
93+
return evt.Type == event.GetStarted
94+
})
95+
counts := make(map[string]int)
96+
for _, evt := range checkOutEvents {
97+
counts[evt.Address]++
98+
}
99+
assert.Equal(mt, 2, len(counts), "expected exactly 2 server addresses")
100+
return counts, checkOutEvents
101+
}
102+
24103
// TestServerSelectionProse implements the Server Selection prose tests:
25104
// https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection-tests.rst
26105
func TestServerSelectionProse(t *testing.T) {
106+
const maxPoolSize = 10
107+
const localThreshold = 30 * time.Second
108+
27109
mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
28110
defer mt.Close()
29111

@@ -65,6 +147,9 @@ func TestServerSelectionProse(t *testing.T) {
65147
topologyEvents := make(chan *event.TopologyDescriptionChangedEvent, 10)
66148
tpm := monitor.NewTestPoolMonitor()
67149
mt.ResetClient(options.Client().
150+
SetLocalThreshold(localThreshold).
151+
SetMaxPoolSize(maxPoolSize).
152+
SetMinPoolSize(maxPoolSize).
68153
SetHosts(hosts[:2]).
69154
SetPoolMonitor(tpm.PoolMonitor).
70155
SetAppName("loadBalancingTest").
@@ -79,34 +164,14 @@ func TestServerSelectionProse(t *testing.T) {
79164
break
80165
}
81166
}
167+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
168+
defer cancel()
82169

83-
// Start 10 goroutines that each run 10 findOne operations.
84-
var wg sync.WaitGroup
85-
for i := 0; i < 10; i++ {
86-
wg.Add(1)
87-
go func() {
88-
defer wg.Done()
89-
90-
for i := 0; i < 10; i++ {
91-
res := mt.Coll.FindOne(context.Background(), bson.D{})
92-
assert.NoError(t, res.Err(), "FindOne() error")
93-
}
94-
}()
170+
if err := awaitSaturation(ctx, mt, tpm, maxPoolSize); err != nil {
171+
mt.Fatalf("Error awaiting saturation: %v", err.Error())
95172
}
96-
wg.Wait()
97-
98-
// Get all checkOut events and calculate the number of times each server was selected. The
99-
// prose test spec says to use command monitoring events, but those don't include the server
100-
// address, so use checkOut events instead.
101-
checkOutEvents := tpm.Events(func(evt *event.PoolEvent) bool {
102-
return evt.Type == event.GetStarted
103-
})
104-
counts := make(map[string]int)
105-
for _, evt := range checkOutEvents {
106-
counts[evt.Address]++
107-
}
108-
assert.Equal(mt, 2, len(counts), "expected exactly 2 server addresses")
109173

174+
counts, checkOutEvents := runsServerSelection(mt, tpm, 10, 10)
110175
// Calculate the frequency that the server with the failpoint was selected. Assert that it
111176
// was selected less than 25% of the time.
112177
frequency := float64(counts[failpointHost]) / float64(len(checkOutEvents))
@@ -132,6 +197,9 @@ func TestServerSelectionProse(t *testing.T) {
132197
mt.ResetClient(options.Client().
133198
SetHosts(hosts[:2]).
134199
SetPoolMonitor(tpm.PoolMonitor).
200+
SetLocalThreshold(localThreshold).
201+
SetMaxPoolSize(maxPoolSize).
202+
SetMinPoolSize(maxPoolSize).
135203
SetServerMonitor(&event.ServerMonitor{
136204
TopologyDescriptionChanged: func(evt *event.TopologyDescriptionChangedEvent) {
137205
topologyEvents <- evt
@@ -143,35 +211,14 @@ func TestServerSelectionProse(t *testing.T) {
143211
break
144212
}
145213
}
214+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
215+
defer cancel()
146216

147-
// Start 25 goroutines that each run 200 findOne operations. Run more operations than the
148-
// prose test specifies to get more samples and reduce intermittent test failures.
149-
var wg sync.WaitGroup
150-
for i := 0; i < 25; i++ {
151-
wg.Add(1)
152-
go func() {
153-
defer wg.Done()
154-
155-
for i := 0; i < 200; i++ {
156-
res := mt.Coll.FindOne(context.Background(), bson.D{})
157-
assert.NoError(mt, res.Err(), "FindOne() error")
158-
}
159-
}()
160-
}
161-
wg.Wait()
162-
163-
// Get all checkOut events and calculate the number of times each server was selected. The
164-
// prose test spec says to use command monitoring events, but those don't include the server
165-
// address, so use checkOut events instead.
166-
checkOutEvents := tpm.Events(func(evt *event.PoolEvent) bool {
167-
return evt.Type == event.GetStarted
168-
})
169-
counts := make(map[string]int)
170-
for _, evt := range checkOutEvents {
171-
counts[evt.Address]++
217+
if err := awaitSaturation(ctx, mt, tpm, maxPoolSize); err != nil {
218+
mt.Fatalf("Error awaiting saturation: %v", err.Error())
172219
}
173-
assert.Equal(mt, 2, len(counts), "expected exactly 2 server addresses")
174220

221+
counts, checkOutEvents := runsServerSelection(mt, tpm, 10, 100)
175222
// Calculate the frequency that each server was selected. Assert that each server was
176223
// selected 50% (+/- 10%) of the time.
177224
for addr, count := range counts {

0 commit comments

Comments
 (0)