Skip to content

Commit 6da6202

Browse files
committed
fix(lavasession): prevent goroutine leak in ConnectRawClientWithTimeout
The busy-wait goroutine that polls connection state did not check for context cancellation, causing it to run forever when connections timed out before becoming Ready. This was the primary cause of goroutine leaks observed in production when all providers were blocked during startup. Changes: - Add context.Done() check in the goroutine's polling loop - Use buffered channel to prevent goroutine blocking on send - Add non-blocking send to handle race between Ready state and context cancel - Add regression tests for goroutine cleanup behavior The fix ensures that when ConnectRawClientWithTimeout returns due to context timeout/cancellation, the internal goroutine exits promptly instead of running indefinitely.
1 parent e4f7444 commit 6da6202

File tree

2 files changed

+119
-6
lines changed

2 files changed

+119
-6
lines changed

protocol/lavasession/consumer_session_manager_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,3 +1196,106 @@ func TestDeadConnectionCleanup(t *testing.T) {
11961196
require.NotEqual(t, connectivity.Shutdown, endpoint.Connections[0].connection.GetState(), "Remaining connection should not be in Shutdown state")
11971197
require.Equal(t, validConn, endpoint.Connections[0].connection, "Remaining connection should be the valid one")
11981198
}
1199+
1200+
// TestConnectRawClientWithTimeoutGoroutineCleanup verifies that the internal goroutine
1201+
// in ConnectRawClientWithTimeout exits when context is cancelled.
1202+
// This is a regression test for a goroutine leak where the busy-wait loop didn't check context.
1203+
// Note: gRPC connections themselves spawn internal goroutines that may persist briefly,
1204+
// so we focus on testing that our busy-wait goroutine exits properly.
1205+
func TestConnectRawClientWithTimeoutGoroutineCleanup(t *testing.T) {
1206+
// Use a channel to track goroutine completion
1207+
goroutineExited := make(chan struct{})
1208+
1209+
// Create a mock test that verifies the goroutine behavior
1210+
// We'll manually simulate the pattern used in ConnectRawClientWithTimeout
1211+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
1212+
defer cancel()
1213+
1214+
// Simulate the goroutine pattern - this is what the fix enables
1215+
go func() {
1216+
defer close(goroutineExited)
1217+
for {
1218+
select {
1219+
case <-ctx.Done():
1220+
// This is the fix - goroutine exits when context is done
1221+
return
1222+
default:
1223+
// Simulate checking connection state
1224+
time.Sleep(10 * time.Millisecond)
1225+
}
1226+
}
1227+
}()
1228+
1229+
// Wait for context to timeout
1230+
<-ctx.Done()
1231+
1232+
// Verify the goroutine exits promptly after context cancellation
1233+
select {
1234+
case <-goroutineExited:
1235+
// Success - goroutine exited as expected
1236+
case <-time.After(500 * time.Millisecond):
1237+
t.Fatal("Goroutine did not exit after context cancellation - this indicates a leak")
1238+
}
1239+
}
1240+
1241+
// TestConnectRawClientWithTimeoutReturnsOnContextCancel verifies that
1242+
// ConnectRawClientWithTimeout returns promptly when context is cancelled,
1243+
// rather than hanging until the connection attempt completes.
1244+
func TestConnectRawClientWithTimeoutReturnsOnContextCancel(t *testing.T) {
1245+
// Use an address that will never connect (non-routable IP)
1246+
unreachableAddr := "10.255.255.1:12345"
1247+
csp := &ConsumerSessionsWithProvider{}
1248+
1249+
ctx, cancel := context.WithCancel(context.Background())
1250+
1251+
done := make(chan struct{})
1252+
var conn *grpc.ClientConn
1253+
1254+
go func() {
1255+
defer close(done)
1256+
_, conn, _ = csp.ConnectRawClientWithTimeout(ctx, unreachableAddr)
1257+
}()
1258+
1259+
// Wait a bit for the connection attempt to start
1260+
time.Sleep(50 * time.Millisecond)
1261+
1262+
// Cancel the context
1263+
cancel()
1264+
1265+
// The function should return promptly after cancellation
1266+
select {
1267+
case <-done:
1268+
// Success - function returned after context cancellation
1269+
if conn != nil {
1270+
conn.Close()
1271+
}
1272+
case <-time.After(2 * time.Second):
1273+
t.Fatal("ConnectRawClientWithTimeout did not return promptly after context cancellation")
1274+
}
1275+
}
1276+
1277+
// TestConnectRawClientWithTimeoutSuccessfulConnection verifies that successful
1278+
// connections still work correctly after the fix.
1279+
func TestConnectRawClientWithTimeoutSuccessfulConnection(t *testing.T) {
1280+
// Use the test grpc server that's set up in TestMain
1281+
if grpcListener == "localhost:0" {
1282+
t.Skip("grpcListener not initialized - run full test suite")
1283+
}
1284+
1285+
csp := &ConsumerSessionsWithProvider{}
1286+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1287+
defer cancel()
1288+
1289+
client, conn, err := csp.ConnectRawClientWithTimeout(ctx, grpcListener)
1290+
1291+
require.NoError(t, err)
1292+
require.NotNil(t, client)
1293+
require.NotNil(t, conn)
1294+
1295+
defer conn.Close()
1296+
1297+
// Verify connection is usable
1298+
state := conn.GetState()
1299+
require.True(t, state == connectivity.Ready || state == connectivity.Idle,
1300+
"Connection should be in Ready or Idle state, got: %v", state)
1301+
}

protocol/lavasession/consumer_types.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -386,16 +386,26 @@ func (cswp *ConsumerSessionsWithProvider) ConnectRawClientWithTimeout(ctx contex
386386
if err != nil {
387387
return nil, nil, err
388388
}
389-
ch := make(chan bool)
389+
ch := make(chan bool, 1) // Buffered to prevent goroutine leak if nobody receives
390390
go func() {
391391
for {
392-
// Check if the connection state is not Connecting
393-
if conn.GetState() == connectivity.Ready {
394-
ch <- true
392+
select {
393+
case <-connectCtx.Done():
394+
// Context cancelled or timed out, exit goroutine to prevent leak
395395
return
396+
default:
397+
// Check if the connection state is Ready
398+
if conn.GetState() == connectivity.Ready {
399+
// Non-blocking send in case context was cancelled right after our check
400+
select {
401+
case ch <- true:
402+
default:
403+
}
404+
return
405+
}
406+
// Add some delay to avoid busy-waiting
407+
time.Sleep(20 * time.Millisecond)
396408
}
397-
// Add some delay to avoid busy-waiting
398-
time.Sleep(20 * time.Millisecond)
399409
}
400410
}()
401411
select {

0 commit comments

Comments
 (0)