Skip to content

Commit ea5a923

Browse files
committed
fix(lavasession): add mutex to Endpoint to prevent race conditions
Add a sync.RWMutex to the Endpoint struct to protect concurrent access to Connections, ConnectionRefusals, and Enabled fields. This fixes a pre-existing race condition that occurred when multiple goroutines (from probeProviders) accessed the same Endpoint object simultaneously. The race was detected in fetchEndpointConnectionFromConsumerSessionWithProvider where multiple providers could share the same Endpoint object and concurrently modify its fields without synchronization. The fix: - Added `mu sync.RWMutex` field to Endpoint struct - Wrapped modifications to Connections, ConnectionRefusals, and Enabled with Lock/Unlock - Used RLock/RUnlock for read-only access to Enabled field - Carefully release lock before blocking network calls and re-acquire after
1 parent d1fdadf commit ea5a923

File tree

1 file changed

+21
-2
lines changed

1 file changed

+21
-2
lines changed

protocol/lavasession/consumer_types.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ type Endpoint struct {
141141
Addons map[string]struct{}
142142
Extensions map[string]struct{}
143143
Geolocation planstypes.Geolocation
144+
mu sync.RWMutex // Protects Connections, ConnectionRefusals, and Enabled fields
144145
}
145146

146147
func (e *Endpoint) CheckSupportForServices(addon string, extensions []string) (supported bool) {
@@ -507,7 +508,10 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
507508
for idx, endpoint := range cswp.Endpoints {
508509
// retryDisabledEndpoints will attempt to reconnect to the provider even though we have disabled the endpoint
509510
// this is used on a routine that tries to reconnect to a provider that has been disabled due to being unable to connect to it.
510-
if !retryDisabledEndpoints && !endpoint.Enabled {
511+
endpoint.mu.RLock()
512+
enabled := endpoint.Enabled
513+
endpoint.mu.RUnlock()
514+
if !retryDisabledEndpoints && !enabled {
511515
continue
512516
}
513517
if retryDisabledEndpoints {
@@ -521,6 +525,10 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
521525
}
522526
// return
523527
connectEndpoint := func(cswp *ConsumerSessionsWithProvider, ctx context.Context, endpoint *Endpoint) (endpointConnection_ *EndpointConnection, connected_ bool) {
528+
// Lock the endpoint to protect concurrent access to Connections, ConnectionRefusals, and Enabled
529+
endpoint.mu.Lock()
530+
defer endpoint.mu.Unlock()
531+
524532
// Clean up dead connections before iterating to prevent accumulation
525533
cleanedConnections := make([]*EndpointConnection, 0, len(endpoint.Connections))
526534
deadConnectionCount := 0
@@ -589,7 +597,13 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
589597
}
590598
}
591599
}
600+
601+
// Release lock before making network call to avoid blocking other goroutines
602+
endpoint.mu.Unlock()
592603
client, conn, err := cswp.ConnectRawClientWithTimeout(ctx, endpoint.NetworkAddress)
604+
// Re-acquire lock to update endpoint state
605+
endpoint.mu.Lock()
606+
593607
if err != nil {
594608
endpoint.ConnectionRefusals++
595609
utils.LavaFormatInfo("error connecting to provider",
@@ -621,7 +635,9 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
621635
if !connected_ {
622636
continue
623637
}
638+
endpoint.mu.Lock()
624639
cswp.Endpoints[idx].Enabled = true // return enabled once we successfully reconnect
640+
endpoint.mu.Unlock()
625641
// successful new connection add to endpoints list
626642
endpoints = append(endpoints, &EndpointAndChosenConnection{endpoint: endpoint, chosenEndpointConnection: endpointConnection})
627643
if !getAllEndpoints {
@@ -638,7 +654,10 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
638654
// before verifying all are Disabled.
639655
allDisabled = true
640656
for _, endpoint := range cswp.Endpoints {
641-
if !endpoint.Enabled {
657+
endpoint.mu.RLock()
658+
enabled := endpoint.Enabled
659+
endpoint.mu.RUnlock()
660+
if !enabled {
642661
continue
643662
}
644663
// even one endpoint is enough for us to not purge.

0 commit comments

Comments
 (0)