@@ -75,6 +75,29 @@ type clientEntity struct {
75
75
logQueue chan orderedLogMessage
76
76
}
77
77
78
+ // awaitMinimumPoolSize waits for the client's connection pool to reach the
79
+ // specified minimum size. This is a best effort operation that times out after
80
+ // some predefined amount of time to avoid blocking tests indefinitely.
81
+ func awaitMinimumPoolSize (ctx context.Context , entity * clientEntity , minPoolSize uint64 ) error {
82
+ // Don't spend longer than 500ms awaiting minPoolSize.
83
+ awaitCtx , cancel := context .WithTimeout (ctx , 500 * time .Millisecond )
84
+ defer cancel ()
85
+
86
+ ticker := time .NewTicker (100 * time .Millisecond )
87
+ defer ticker .Stop ()
88
+
89
+ for {
90
+ select {
91
+ case <- awaitCtx .Done ():
92
+ return fmt .Errorf ("timed out waiting for client to reach minPoolSize" )
93
+ case <- ticker .C :
94
+ if uint64 (entity .eventsCount [connectionReadyEvent ]) >= minPoolSize {
95
+ return nil
96
+ }
97
+ }
98
+ }
99
+ }
100
+
78
101
func newClientEntity (ctx context.Context , em * EntityMap , entityOptions * entityOptions ) (* clientEntity , error ) {
79
102
// The "configureFailPoint" command should always be ignored.
80
103
ignoredCommands := map [string ]struct {}{
@@ -203,6 +226,12 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
203
226
return nil , fmt .Errorf ("error creating mongo.Client: %w" , err )
204
227
}
205
228
229
+ if entityOptions .AwaitMinPoolSize && clientOpts .MinPoolSize != nil && * clientOpts .MinPoolSize > 0 {
230
+ if err := awaitMinimumPoolSize (ctx , entity , * clientOpts .MinPoolSize ); err != nil {
231
+ return nil , err
232
+ }
233
+ }
234
+
206
235
entity .Client = client
207
236
return entity , nil
208
237
}
0 commit comments