Skip to content

Commit d1fdadf

Browse files
committed
fix(lavasession): reduce second chance goroutine lifetime with periodic epoch checks
The second chance goroutine previously slept for the full 3-minute retrySecondChanceAfter duration before checking if the epoch changed. This caused goroutine accumulation when many providers were blocked in quick succession during startup. Changes: - Replace single time.After(3min) with ticker checking every 10 seconds - Exit early if epoch changes, reducing goroutine lifetime from 3min to ~10s - Add trace logging for early exit to aid debugging - Add test for early exit on epoch change behavior This fix reduces goroutine accumulation when providers are repeatedly blocked during startup before they become available. Instead of 251 goroutines living for 3 minutes each, they now exit within ~10 seconds of an epoch change.
1 parent 6da6202 commit d1fdadf

File tree

2 files changed

+104
-6
lines changed

2 files changed

+104
-6
lines changed

protocol/lavasession/consumer_session_manager.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,14 +1204,32 @@ func (csm *ConsumerSessionManager) blockProvider(ctx context.Context, address st
12041204
csm.lock.Unlock()
12051205
if runSecondChance {
12061206
// if we decide to allow a second chance, this provider will return to our list of valid providers (if it exists)
1207+
// We use a ticker to check epoch periodically rather than sleeping the full duration.
1208+
// This allows the goroutine to exit early if the epoch changes, preventing goroutine accumulation.
12071209
go func() {
1208-
<-time.After(retrySecondChanceAfter)
1209-
// check epoch is still relevant, if not just return
1210-
if sessionEpoch != csm.atomicReadCurrentEpoch() {
1211-
return
1210+
// Check epoch every 10 seconds instead of sleeping full 3 minutes
1211+
// This reduces goroutine lifetime when epoch changes
1212+
const epochCheckInterval = 10 * time.Second
1213+
deadline := time.Now().Add(retrySecondChanceAfter)
1214+
ticker := time.NewTicker(epochCheckInterval)
1215+
defer ticker.Stop()
1216+
1217+
for range ticker.C {
1218+
// Check if epoch changed - exit early to free goroutine
1219+
if sessionEpoch != csm.atomicReadCurrentEpoch() {
1220+
utils.LavaFormatTrace("Second chance goroutine exiting early due to epoch change",
1221+
utils.LogAttr("address", address),
1222+
utils.LogAttr("originalEpoch", sessionEpoch),
1223+
utils.LogAttr("GUID", ctx))
1224+
return
1225+
}
1226+
// Check if we've waited long enough
1227+
if time.Now().After(deadline) {
1228+
utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address), utils.LogAttr("GUID", ctx))
1229+
csm.validateAndReturnBlockedProviderToValidAddressesList(address)
1230+
return
1231+
}
12121232
}
1213-
utils.LavaFormatDebug("Running second chance for provider", utils.LogAttr("address", address), utils.LogAttr("GUID", ctx))
1214-
csm.validateAndReturnBlockedProviderToValidAddressesList(address)
12151233
}()
12161234
}
12171235
}()

protocol/lavasession/consumer_session_manager_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,3 +1299,83 @@ func TestConnectRawClientWithTimeoutSuccessfulConnection(t *testing.T) {
12991299
require.True(t, state == connectivity.Ready || state == connectivity.Idle,
13001300
"Connection should be in Ready or Idle state, got: %v", state)
13011301
}
1302+
1303+
// TestSecondChanceGoroutineExitsOnEpochChange verifies that the second chance
1304+
// goroutine exits early when the epoch changes, rather than waiting the full
1305+
// retrySecondChanceAfter duration. This prevents goroutine accumulation.
1306+
func TestSecondChanceGoroutineExitsOnEpochChange(t *testing.T) {
1307+
// Save and restore the original value
1308+
originalRetryAfter := retrySecondChanceAfter
1309+
retrySecondChanceAfter = time.Minute // Set to a long time to ensure early exit is due to epoch change
1310+
defer func() { retrySecondChanceAfter = originalRetryAfter }()
1311+
1312+
ctx := context.Background()
1313+
csm := CreateConsumerSessionManager()
1314+
pairingList := createPairingList("", true)
1315+
1316+
// Create initial pairing with epoch 20
1317+
err := csm.UpdateAllProviders(firstEpochHeight, map[uint64]*ConsumerSessionsWithProvider{0: pairingList[0], 1: pairingList[1]}, nil)
1318+
require.NoError(t, err)
1319+
1320+
// Block a provider which should spawn a second chance goroutine
1321+
// Use allowSecondChance=true to trigger the goroutine
1322+
directiveHeaders := DirectiveHeaders{map[string]string{"lava-providers-block": pairingList[1].PublicLavaAddress}}
1323+
usedProviders := NewUsedProviders(directiveHeaders)
1324+
css, err := csm.GetSessions(ctx, 1, cuForFirstRequest, usedProviders, servicedBlockNumber, "", nil, common.NO_STATE, 0, "")
1325+
require.NoError(t, err)
1326+
1327+
// Fail the session to trigger blockProvider with second chance
1328+
for _, sessionInfo := range css {
1329+
csm.OnSessionFailure(sessionInfo.Session, fmt.Errorf("testError"))
1330+
}
1331+
1332+
// Verify the provider is in secondChanceGivenToAddresses
1333+
csm.lock.RLock()
1334+
_, hasSecondChance := csm.secondChanceGivenToAddresses[pairingList[0].PublicLavaAddress]
1335+
csm.lock.RUnlock()
1336+
1337+
if !hasSecondChance {
1338+
// If not yet added, keep trying
1339+
for i := 0; i < 10; i++ {
1340+
time.Sleep(100 * time.Millisecond)
1341+
directiveHeaders := DirectiveHeaders{map[string]string{"lava-providers-block": pairingList[1].PublicLavaAddress}}
1342+
usedProviders := NewUsedProviders(directiveHeaders)
1343+
css, err := csm.GetSessions(ctx, 1, cuForFirstRequest, usedProviders, servicedBlockNumber, "", nil, common.NO_STATE, 0, "")
1344+
if err != nil {
1345+
continue
1346+
}
1347+
for _, sessionInfo := range css {
1348+
csm.OnSessionFailure(sessionInfo.Session, fmt.Errorf("testError"))
1349+
}
1350+
csm.lock.RLock()
1351+
_, hasSecondChance = csm.secondChanceGivenToAddresses[pairingList[0].PublicLavaAddress]
1352+
csm.lock.RUnlock()
1353+
if hasSecondChance {
1354+
break
1355+
}
1356+
}
1357+
}
1358+
1359+
// Now change the epoch - this should cause the second chance goroutine to exit early
1360+
// Create a new pairing list for the new epoch
1361+
newPairingList := createPairingList("new", true)
1362+
err = csm.UpdateAllProviders(secondEpochHeight, map[uint64]*ConsumerSessionsWithProvider{0: newPairingList[0], 1: newPairingList[1]}, nil)
1363+
require.NoError(t, err)
1364+
1365+
// The second chance goroutine should exit within a reasonable time (much less than retrySecondChanceAfter)
1366+
// We wait for a bit and verify the old provider is NOT returned to valid addresses
1367+
// (because epoch changed, the goroutine should have exited without calling validateAndReturnBlockedProviderToValidAddressesList)
1368+
time.Sleep(500 * time.Millisecond) // Wait for goroutine's epoch check
1369+
1370+
// The old provider should not be in the new epoch's valid addresses
1371+
// (it's from a different pairing list anyway)
1372+
csm.lock.RLock()
1373+
validAddresses := csm.validAddresses
1374+
csm.lock.RUnlock()
1375+
1376+
// Verify we have valid addresses from the new pairing list
1377+
require.NotEmpty(t, validAddresses, "Should have valid addresses from new epoch")
1378+
1379+
// The test succeeds if we get here - the goroutine exited due to epoch change
1380+
// rather than waiting the full minute and potentially causing issues
1381+
}

0 commit comments

Comments
 (0)