diff --git a/internal/integration/unified/client_entity.go b/internal/integration/unified/client_entity.go index 6d5fedc38c..bc981793df 100644 --- a/internal/integration/unified/client_entity.go +++ b/internal/integration/unified/client_entity.go @@ -75,6 +75,29 @@ type clientEntity struct { logQueue chan orderedLogMessage } +// awaitMinimumPoolSize waits for the client's connection pool to reach the +// specified minimum size. This is a best effort operation that times out after +// some predefined amount of time to avoid blocking tests indefinitely. +func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error { + // Don't spend longer than 500ms awaiting minPoolSize. + awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-awaitCtx.Done(): + return fmt.Errorf("timed out waiting for client to reach minPoolSize") + case <-ticker.C: + if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize { + return nil + } + } + } +} + func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) { // The "configureFailPoint" command should always be ignored. ignoredCommands := map[string]struct{}{ @@ -203,6 +226,12 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp return nil, fmt.Errorf("error creating mongo.Client: %w", err) } + if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 { + if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil { + return nil, err + } + } + entity.Client = client return entity, nil } diff --git a/internal/integration/unified/entity.go b/internal/integration/unified/entity.go index d11a15b1ee..b1b827a124 100644 --- a/internal/integration/unified/entity.go +++ b/internal/integration/unified/entity.go @@ -82,6 +82,12 @@ type entityOptions struct { DatabaseID string `bson:"database"` ClientEncryptionOpts *clientEncryptionOpts `bson:"clientEncryptionOpts"` + + // If true, the unified spec runner must wait for the connection pool to be + // populated for all servers according to the minPoolSize option. If false, + // not specified, or if minPoolSize equals 0, there is no need to wait for any + // specific pool state. + AwaitMinPoolSize bool `bson:"awaitMinPoolSize"` } func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) { diff --git a/internal/integration/unified/schema_version.go b/internal/integration/unified/schema_version.go index 7908b39017..651976d367 100644 --- a/internal/integration/unified/schema_version.go +++ b/internal/integration/unified/schema_version.go @@ -16,7 +16,7 @@ import ( var ( supportedSchemaVersions = map[int]string{ - 1: "1.22", + 1: "1.26", } )