Skip to content

Commit 95e4d63

Browse files
Filter CMAP/SDAM events during pool initialization for awaitMinPoolSizeMS
1 parent 9bd07db commit 95e4d63

File tree

4 files changed

+367
-42
lines changed

4 files changed

+367
-42
lines changed

internal/integration/unified/client_entity.go

Lines changed: 101 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,50 @@ var securitySensitiveCommands = []string{
3838
"createUser", "updateUser", "copydbgetnonce", "copydbsaslstart", "copydb",
3939
}
4040

41+
// eventSequencer allows for sequence-based event filtering for
42+
// awaitMinPoolSizeMS support.
43+
//
44+
// Per the unified test format spec, when awaitMinPoolSizeMS is specified, any
45+
// CMAP and SDAM events that occur during connection pool initialization
46+
// (before minPoolSize is reached) must be ignored. We track this by
47+
// assigning a monotonically increasing sequence number to each event as it's
48+
// recorded. After pool initialization completes, we set eventCutoffSeq to the
49+
// current sequence number. Event accessors for CMAP and SDAM types then
50+
// filter out any events with sequence <= eventCutoffSeq.
51+
type eventSequencer struct {
52+
counter atomic.Int64
53+
cutoff int64
54+
55+
// pool events are heterogeneous, so we track their sequence separately
56+
poolSeq []int64
57+
seqByEventType map[monitoringEventType][]int64
58+
}
59+
60+
// setCutoff marks the current sequence as the filtering cutoff point.
61+
func (es *eventSequencer) setCutoff() {
62+
es.cutoff = es.counter.Load()
63+
}
64+
65+
// recordEvent stores the sequence number for a given event type.
66+
func (es *eventSequencer) recordEvent(eventType monitoringEventType) {
67+
next := es.counter.Add(1)
68+
es.seqByEventType[eventType] = append(es.seqByEventType[eventType], next)
69+
}
70+
71+
func (es *eventSequencer) recordPooledEvent() {
72+
next := es.counter.Add(1)
73+
es.poolSeq = append(es.poolSeq, next)
74+
}
75+
76+
// shouldFilter returns true if the event at the given index should be filtered.
77+
func (es *eventSequencer) shouldFilter(eventType monitoringEventType, index int) bool {
78+
if es.cutoff == 0 {
79+
return false
80+
}
81+
82+
return es.seqByEventType[eventType][index] <= es.cutoff
83+
}
84+
4185
// clientEntity is a wrapper for a mongo.Client object that also holds additional information required during test
4286
// execution.
4387
type clientEntity struct {
@@ -72,30 +116,8 @@ type clientEntity struct {
72116

73117
entityMap *EntityMap
74118

75-
logQueue chan orderedLogMessage
76-
}
77-
78-
// awaitMinimumPoolSize waits for the client's connection pool to reach the
79-
// specified minimum size. This is a best effort operation that times out after
80-
// some predefined amount of time to avoid blocking tests indefinitely.
81-
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
82-
// Don't spend longer than 500ms awaiting minPoolSize.
83-
awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
84-
defer cancel()
85-
86-
ticker := time.NewTicker(100 * time.Millisecond)
87-
defer ticker.Stop()
88-
89-
for {
90-
select {
91-
case <-awaitCtx.Done():
92-
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
93-
case <-ticker.C:
94-
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
95-
return nil
96-
}
97-
}
98-
}
119+
logQueue chan orderedLogMessage
120+
eventSequencer eventSequencer
99121
}
100122

101123
func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) {
@@ -118,6 +140,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
118140
serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32),
119141
entityMap: em,
120142
observeSensitiveCommands: entityOptions.ObserveSensitiveCommands,
143+
eventSequencer: eventSequencer{
144+
seqByEventType: make(map[monitoringEventType][]int64),
145+
},
121146
}
122147
entity.setRecordEvents(true)
123148

@@ -226,8 +251,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
226251
return nil, fmt.Errorf("error creating mongo.Client: %w", err)
227252
}
228253

229-
if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
230-
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil {
254+
if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 &&
255+
clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
256+
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize, *entityOptions.AwaitMinPoolSizeMS); err != nil {
231257
return nil, err
232258
}
233259
}
@@ -326,8 +352,21 @@ func (c *clientEntity) failedEvents() []*event.CommandFailedEvent {
326352
return events
327353
}
328354

329-
func (c *clientEntity) poolEvents() []*event.PoolEvent {
330-
return c.pooled
355+
// filterEventsBySeq filters events by sequence number using the provided
356+
// sequence slice. See comments on eventSequencer for more details.
357+
func filterEventsBySeq[T any](c *clientEntity, events []T, seqSlice []int64) []T {
358+
if c.eventSequencer.cutoff == 0 {
359+
return events
360+
}
361+
362+
var filtered []T
363+
for i, evt := range events {
364+
if seqSlice[i] > c.eventSequencer.cutoff {
365+
filtered = append(filtered, evt)
366+
}
367+
}
368+
369+
return filtered
331370
}
332371

333372
func (c *clientEntity) numberConnectionsCheckedOut() int32 {
@@ -517,6 +556,7 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
517556
eventType := monitoringEventTypeFromPoolEvent(evt)
518557
if _, ok := c.observedEvents[eventType]; ok {
519558
c.pooled = append(c.pooled, evt)
559+
c.eventSequencer.recordPooledEvent()
520560
}
521561

522562
c.addEventsCount(eventType)
@@ -539,6 +579,7 @@ func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDes
539579

540580
if _, ok := c.observedEvents[serverDescriptionChangedEvent]; ok {
541581
c.serverDescriptionChanged = append(c.serverDescriptionChanged, evt)
582+
c.eventSequencer.recordEvent(serverDescriptionChangedEvent)
542583
}
543584

544585
// Record object-specific unified spec test data on an event.
@@ -558,6 +599,7 @@ func (c *clientEntity) processServerHeartbeatFailedEvent(evt *event.ServerHeartb
558599

559600
if _, ok := c.observedEvents[serverHeartbeatFailedEvent]; ok {
560601
c.serverHeartbeatFailedEvent = append(c.serverHeartbeatFailedEvent, evt)
602+
c.eventSequencer.recordEvent(serverHeartbeatFailedEvent)
561603
}
562604

563605
c.addEventsCount(serverHeartbeatFailedEvent)
@@ -573,6 +615,7 @@ func (c *clientEntity) processServerHeartbeatStartedEvent(evt *event.ServerHeart
573615

574616
if _, ok := c.observedEvents[serverHeartbeatStartedEvent]; ok {
575617
c.serverHeartbeatStartedEvent = append(c.serverHeartbeatStartedEvent, evt)
618+
c.eventSequencer.recordEvent(serverHeartbeatStartedEvent)
576619
}
577620

578621
c.addEventsCount(serverHeartbeatStartedEvent)
@@ -588,6 +631,7 @@ func (c *clientEntity) processServerHeartbeatSucceededEvent(evt *event.ServerHea
588631

589632
if _, ok := c.observedEvents[serverHeartbeatSucceededEvent]; ok {
590633
c.serverHeartbeatSucceeded = append(c.serverHeartbeatSucceeded, evt)
634+
c.eventSequencer.recordEvent(serverHeartbeatSucceededEvent)
591635
}
592636

593637
c.addEventsCount(serverHeartbeatSucceededEvent)
@@ -603,6 +647,7 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog
603647

604648
if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok {
605649
c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt)
650+
c.eventSequencer.recordEvent(topologyDescriptionChangedEvent)
606651
}
607652

608653
c.addEventsCount(topologyDescriptionChangedEvent)
@@ -618,6 +663,7 @@ func (c *clientEntity) processTopologyOpeningEvent(evt *event.TopologyOpeningEve
618663

619664
if _, ok := c.observedEvents[topologyOpeningEvent]; ok {
620665
c.topologyOpening = append(c.topologyOpening, evt)
666+
c.eventSequencer.recordEvent(topologyOpeningEvent)
621667
}
622668

623669
c.addEventsCount(topologyOpeningEvent)
@@ -633,6 +679,7 @@ func (c *clientEntity) processTopologyClosedEvent(evt *event.TopologyClosedEvent
633679

634680
if _, ok := c.observedEvents[topologyClosedEvent]; ok {
635681
c.topologyClosed = append(c.topologyClosed, evt)
682+
c.eventSequencer.recordEvent(topologyClosedEvent)
636683
}
637684

638685
c.addEventsCount(topologyClosedEvent)
@@ -724,3 +771,29 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM
724771
}
725772
return nil
726773
}
774+
775+
// awaitMinimumPoolSize waits for the client's connection pool to reach the
776+
// specified minimum size, then clears all CMAP and SDAM events that occurred
777+
// during pool initialization.
778+
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64, timeoutMS int) error {
779+
awaitCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond)
780+
defer cancel()
781+
782+
ticker := time.NewTicker(100 * time.Millisecond)
783+
defer ticker.Stop()
784+
785+
for {
786+
select {
787+
case <-awaitCtx.Done():
788+
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
789+
case <-ticker.C:
790+
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
791+
// Clear all CMAP and SDAM events that occurred during pool
792+
// initialization.
793+
entity.eventSequencer.setCutoff()
794+
795+
return nil
796+
}
797+
}
798+
}
799+
}

0 commit comments

Comments
 (0)