From a6fcb7e0af72182edfe4d14b74587ea257597c37 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Thu, 20 Nov 2025 22:15:13 +0300 Subject: [PATCH 1/8] rls: only reset backoff on recovery from TRANSIENT_FAILURE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix control channel connectivity monitoring to track TRANSIENT_FAILURE state explicitly. Only reset backoff timers when transitioning from TRANSIENT_FAILURE to READY, not for benign state changes like READY → IDLE → READY. Fixes #8693 --- balancer/rls/control_channel.go | 21 ++++++- balancer/rls/control_channel_test.go | 93 ++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 2 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 60e6a021d133..5834e56b8237 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -187,6 +187,11 @@ func (cc *controlChannel) monitorConnectivityState() { cc.connectivityStateCh.Load() cc.logger.Infof("Connectivity state is READY") + // Track whether we've seen TRANSIENT_FAILURE since the last READY state. + // We only want to reset backoff when recovering from an actual failure, + // not when transitioning through benign states like IDLE. + seenTransientFailure := false + for { s, ok := <-cc.connectivityStateCh.Get() if !ok { @@ -197,9 +202,21 @@ func (cc *controlChannel) monitorConnectivityState() { if s == connectivity.Shutdown { return } + + // Track if we've entered TRANSIENT_FAILURE state + if s == connectivity.TransientFailure { + seenTransientFailure = true + } + + // Only reset backoff if we're returning to READY after a failure if s == connectivity.Ready { - cc.logger.Infof("Control channel back to READY") - cc.backToReadyFunc() + if seenTransientFailure { + cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") + cc.backToReadyFunc() + seenTransientFailure = false + } else { + cc.logger.Infof("Control channel back to READY (no prior failure)") + } } cc.logger.Infof("Connectivity state is %s", s) diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 5a30820c3b47..1933868b7952 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -26,6 +26,7 @@ import ( "fmt" "os" "regexp" + "sync" "testing" "time" @@ -33,6 +34,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" @@ -463,3 +465,94 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { t.Fatal("newControlChannel succeeded when expected to fail") } } + +// TestControlChannelConnectivityStateTransitions verifies that the control +// channel only resets backoff when recovering from TRANSIENT_FAILURE, not +// when going through benign state changes like READY → IDLE → READY. +func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { + tests := []struct { + name string + states []connectivity.State + wantCallbackCount int + }{ + { + name: "READY → TRANSIENT_FAILURE → READY triggers callback", + states: []connectivity.State{ + connectivity.TransientFailure, + connectivity.Ready, + }, + wantCallbackCount: 1, + }, + { + name: "READY → IDLE → READY does not trigger callback", + states: []connectivity.State{ + connectivity.Idle, + connectivity.Ready, + }, + wantCallbackCount: 0, + }, + { + name: "Multiple failures trigger callback each time", + states: []connectivity.State{ + connectivity.TransientFailure, + connectivity.Ready, + connectivity.TransientFailure, + connectivity.Ready, + }, + wantCallbackCount: 2, + }, + { + name: "IDLE between failures doesn't affect callback", + states: []connectivity.State{ + connectivity.TransientFailure, + connectivity.Idle, + connectivity.Ready, + }, + wantCallbackCount: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Start an RLS server + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) + + // Setup callback to count invocations + callbackCount := 0 + var mu sync.Mutex + callback := func() { + mu.Lock() + callbackCount++ + mu.Unlock() + } + + // Create control channel + ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{}, callback) + if err != nil { + t.Fatalf("Failed to create control channel: %v", err) + } + defer ctrlCh.close() + + // Give the channel time to reach initial READY state + time.Sleep(100 * time.Millisecond) + + // Inject the test state sequence + for _, state := range tt.states { + ctrlCh.OnMessage(state) + // Give time for the monitoring goroutine to process the state + time.Sleep(50 * time.Millisecond) + } + + // Give extra time for any pending callbacks + time.Sleep(100 * time.Millisecond) + + mu.Lock() + gotCallbackCount := callbackCount + mu.Unlock() + + if gotCallbackCount != tt.wantCallbackCount { + t.Errorf("Got %d callback invocations, want %d", gotCallbackCount, tt.wantCallbackCount) + } + }) + } +} From ca49ae7c502d2c4b71588030bd4509117bcffa19 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Fri, 21 Nov 2025 12:13:00 +0300 Subject: [PATCH 2/8] address reviews --- balancer/rls/control_channel.go | 4 ++- balancer/rls/control_channel_test.go | 43 +++++++++++++++++++++++----- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 5834e56b8237..3f39ad74eacd 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -208,7 +208,9 @@ func (cc *controlChannel) monitorConnectivityState() { seenTransientFailure = true } - // Only reset backoff if we're returning to READY after a failure + // Only reset backoff when recovering from TRANSIENT_FAILURE to READY. + // This prevents unnecessary backoff resets for benign state transitions + // like READY → IDLE → READY, which don't represent actual failures. if s == connectivity.Ready { if seenTransientFailure { cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 1933868b7952..6e0d39cd1b75 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -517,13 +517,15 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { // Start an RLS server rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) - // Setup callback to count invocations + // Setup callback to count invocations with synchronization callbackCount := 0 var mu sync.Mutex + callbackInvoked := make(chan struct{}, 10) callback := func() { mu.Lock() callbackCount++ mu.Unlock() + callbackInvoked <- struct{}{} } // Create control channel @@ -533,18 +535,45 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { } defer ctrlCh.close() - // Give the channel time to reach initial READY state - time.Sleep(100 * time.Millisecond) + // Wait for initial READY state by checking connectivity state buffer + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + initialReady := false + for !initialReady { + select { + case <-ctx.Done(): + t.Fatal("Timeout waiting for initial READY state") + default: + if ctrlCh.cc.GetState() == connectivity.Ready { + initialReady = true + } else { + time.Sleep(10 * time.Millisecond) + } + } + } // Inject the test state sequence for _, state := range tt.states { ctrlCh.OnMessage(state) - // Give time for the monitoring goroutine to process the state - time.Sleep(50 * time.Millisecond) } - // Give extra time for any pending callbacks - time.Sleep(100 * time.Millisecond) + // Wait for expected callbacks to be invoked + for i := 0; i < tt.wantCallbackCount; i++ { + select { + case <-callbackInvoked: + // Callback received as expected + case <-time.After(defaultTestTimeout): + t.Fatalf("Timeout waiting for callback %d/%d", i+1, tt.wantCallbackCount) + } + } + + // Ensure no extra callbacks are invoked + select { + case <-callbackInvoked: + t.Fatal("Received more callbacks than expected") + case <-time.After(100 * time.Millisecond): + // Expected: no more callbacks + } mu.Lock() gotCallbackCount := callbackCount From c7eb61886ca60b7ab4afd548f97c603c8314e6c3 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Fri, 21 Nov 2025 12:44:27 +0300 Subject: [PATCH 3/8] address reviews --- balancer/rls/control_channel.go | 16 ++++- balancer/rls/control_channel_test.go | 93 ++++++++++++++++++---------- 2 files changed, 75 insertions(+), 34 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 3f39ad74eacd..6e9304aea97d 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -209,12 +209,22 @@ func (cc *controlChannel) monitorConnectivityState() { } // Only reset backoff when recovering from TRANSIENT_FAILURE to READY. - // This prevents unnecessary backoff resets for benign state transitions - // like READY → IDLE → READY, which don't represent actual failures. + // When the control channel enters TRANSIENT_FAILURE, it indicates the RLS + // server is unreachable or experiencing issues. When it then transitions to + // READY, we reset the backoff state in all cache entries to allow pending + // RPCs to proceed immediately, rather than waiting for their individual + // backoff timers to expire. + // + // We skip resetting backoff for benign state transitions like READY → IDLE + // → READY (which occur during normal operation due to connection idleness) + // because these don't represent actual failures that would justify clearing + // the backoff state. if s == connectivity.Ready { if seenTransientFailure { cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") - cc.backToReadyFunc() + if cc.backToReadyFunc != nil { + cc.backToReadyFunc() + } seenTransientFailure = false } else { cc.logger.Infof("Control channel back to READY (no prior failure)") diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 6e0d39cd1b75..90a3a124af2a 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -517,15 +517,20 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { // Start an RLS server rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) - // Setup callback to count invocations with synchronization - callbackCount := 0 + // Setup callback to count invocations var mu sync.Mutex - callbackInvoked := make(chan struct{}, 10) + var callbackCount int + // Buffered channel to collect callback invocations without blocking + callbackInvoked := make(chan struct{}, tt.wantCallbackCount+5) callback := func() { mu.Lock() callbackCount++ mu.Unlock() - callbackInvoked <- struct{}{} + // Non-blocking send - if channel is full, we still counted it + select { + case callbackInvoked <- struct{}{}: + default: + } } // Create control channel @@ -535,46 +540,61 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { } defer ctrlCh.close() - // Wait for initial READY state by checking connectivity state buffer + // Wait for initial READY state using state change notifications ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - initialReady := false - for !initialReady { - select { - case <-ctx.Done(): - t.Fatal("Timeout waiting for initial READY state") - default: - if ctrlCh.cc.GetState() == connectivity.Ready { - initialReady = true - } else { - time.Sleep(10 * time.Millisecond) + + readyCh := make(chan struct{}) + go func() { + for { + state := ctrlCh.cc.GetState() + if state == connectivity.Ready { + close(readyCh) + return + } + if !ctrlCh.cc.WaitForStateChange(ctx, state) { + return } } + }() + + select { + case <-readyCh: + // Initial READY state achieved + case <-ctx.Done(): + t.Fatal("Timeout waiting for initial READY state") } - // Inject the test state sequence + // Process states sequentially, waiting for callbacks when expected + seenTransientFailure := false + expectedCallbacks := 0 + for _, state := range tt.states { + // Inject the state ctrlCh.OnMessage(state) - } - // Wait for expected callbacks to be invoked - for i := 0; i < tt.wantCallbackCount; i++ { - select { - case <-callbackInvoked: - // Callback received as expected - case <-time.After(defaultTestTimeout): - t.Fatalf("Timeout waiting for callback %d/%d", i+1, tt.wantCallbackCount) + // Track if we're in a failure state + if state == connectivity.TransientFailure { + seenTransientFailure = true } - } - // Ensure no extra callbacks are invoked - select { - case <-callbackInvoked: - t.Fatal("Received more callbacks than expected") - case <-time.After(100 * time.Millisecond): - // Expected: no more callbacks + // If transitioning to READY after a failure, wait for callback + if state == connectivity.Ready && seenTransientFailure { + expectedCallbacks++ + select { + case <-callbackInvoked: + // Callback received as expected + seenTransientFailure = false + case <-time.After(defaultTestTimeout): + mu.Lock() + got := callbackCount + mu.Unlock() + t.Fatalf("Timeout waiting for callback %d/%d after TRANSIENT_FAILURE→READY (got %d callbacks so far)", expectedCallbacks, tt.wantCallbackCount, got) + } + } } + // Verify final callback count matches expected mu.Lock() gotCallbackCount := callbackCount mu.Unlock() @@ -582,6 +602,17 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { if gotCallbackCount != tt.wantCallbackCount { t.Errorf("Got %d callback invocations, want %d", gotCallbackCount, tt.wantCallbackCount) } + + // Ensure no extra callbacks are invoked + select { + case <-callbackInvoked: + mu.Lock() + final := callbackCount + mu.Unlock() + t.Fatalf("Received more callbacks than expected: got %d, want %d", final, tt.wantCallbackCount) + case <-time.After(50 * time.Millisecond): + // Expected: no more callbacks + } }) } } From 943240dc873dc2cca5c8da8bbc722dc40964ea42 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Fri, 21 Nov 2025 12:47:48 +0300 Subject: [PATCH 4/8] Shorter comment --- balancer/rls/control_channel.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index 6e9304aea97d..fc33cfd1a31e 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -208,17 +208,11 @@ func (cc *controlChannel) monitorConnectivityState() { seenTransientFailure = true } - // Only reset backoff when recovering from TRANSIENT_FAILURE to READY. - // When the control channel enters TRANSIENT_FAILURE, it indicates the RLS - // server is unreachable or experiencing issues. When it then transitions to - // READY, we reset the backoff state in all cache entries to allow pending - // RPCs to proceed immediately, rather than waiting for their individual - // backoff timers to expire. - // - // We skip resetting backoff for benign state transitions like READY → IDLE - // → READY (which occur during normal operation due to connection idleness) - // because these don't represent actual failures that would justify clearing - // the backoff state. + // Only reset backoff when transitioning from TRANSIENT_FAILURE to READY. + // This indicates the RLS server has recovered from being unreachable, so + // we reset backoff state in all cache entries to allow pending RPCs to + // proceed immediately. We skip benign transitions like READY → IDLE → READY + // since those don't represent actual failures. if s == connectivity.Ready { if seenTransientFailure { cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") From ed5ab2c9ae37915866566770ed50f5548ca66913 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Fri, 21 Nov 2025 13:19:18 +0300 Subject: [PATCH 5/8] Fix test synchronization and remove time.Sleep dependencies - Add testOnlyInitialReadyDone channel for proper test synchronization - Signal when monitoring goroutine processes initial READY state - Tests wait for this signal instead of using time.Sleep - All synchronization now uses channels/callbacks - no arbitrary sleeps - Tests pass consistently with race detector Addresses review feedback about removing time.Sleep for state transitions. --- balancer/rls/control_channel.go | 17 ++++-- balancer/rls/control_channel_test.go | 77 ++++++++++------------------ 2 files changed, 38 insertions(+), 56 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index fc33cfd1a31e..e27a9d4b5d25 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -63,6 +63,9 @@ type controlChannel struct { connectivityStateCh *buffer.Unbounded unsubscribe func() monitorDoneCh chan struct{} + // testOnlyInitialReadyDone is closed when the monitoring goroutine + // processes the initial READY state. Only used in tests. + testOnlyInitialReadyDone chan struct{} } // newControlChannel creates a controlChannel to rlsServerName and uses @@ -70,11 +73,12 @@ type controlChannel struct { // gRPC channel. func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) { ctrlCh := &controlChannel{ - rpcTimeout: rpcTimeout, - backToReadyFunc: backToReadyFunc, - throttler: newAdaptiveThrottler(), - connectivityStateCh: buffer.NewUnbounded(), - monitorDoneCh: make(chan struct{}), + rpcTimeout: rpcTimeout, + backToReadyFunc: backToReadyFunc, + throttler: newAdaptiveThrottler(), + connectivityStateCh: buffer.NewUnbounded(), + monitorDoneCh: make(chan struct{}), + testOnlyInitialReadyDone: make(chan struct{}), } ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh)) @@ -187,6 +191,9 @@ func (cc *controlChannel) monitorConnectivityState() { cc.connectivityStateCh.Load() cc.logger.Infof("Connectivity state is READY") + // Signal tests that initial READY has been processed + close(cc.testOnlyInitialReadyDone) + // Track whether we've seen TRANSIENT_FAILURE since the last READY state. // We only want to reset backoff when recovering from an actual failure, // not when transitioning through benign states like IDLE. diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 90a3a124af2a..2966cf086955 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -520,17 +520,14 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { // Setup callback to count invocations var mu sync.Mutex var callbackCount int - // Buffered channel to collect callback invocations without blocking - callbackInvoked := make(chan struct{}, tt.wantCallbackCount+5) + // Buffered channel large enough to never block + callbackInvoked := make(chan struct{}, 100) callback := func() { mu.Lock() callbackCount++ mu.Unlock() - // Non-blocking send - if channel is full, we still counted it - select { - case callbackInvoked <- struct{}{}: - default: - } + // Send to channel - should never block with large buffer + callbackInvoked <- struct{}{} } // Create control channel @@ -540,57 +537,35 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { } defer ctrlCh.close() - // Wait for initial READY state using state change notifications - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - readyCh := make(chan struct{}) - go func() { - for { - state := ctrlCh.cc.GetState() - if state == connectivity.Ready { - close(readyCh) - return - } - if !ctrlCh.cc.WaitForStateChange(ctx, state) { - return - } - } - }() - + // Wait for the monitoring goroutine to process the initial READY state + // before injecting test states. This ensures our injected states are + // processed in the main monitoring loop, not consumed during initialization. select { - case <-readyCh: - // Initial READY state achieved - case <-ctx.Done(): - t.Fatal("Timeout waiting for initial READY state") + case <-ctrlCh.testOnlyInitialReadyDone: + // Initial READY processed by monitoring goroutine + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for monitoring goroutine to process initial READY state") } - // Process states sequentially, waiting for callbacks when expected - seenTransientFailure := false - expectedCallbacks := 0 - + // Inject all test states for _, state := range tt.states { - // Inject the state ctrlCh.OnMessage(state) + } - // Track if we're in a failure state - if state == connectivity.TransientFailure { - seenTransientFailure = true - } + // Wait for all expected callbacks with timeout + callbackTimeout := time.NewTimer(defaultTestTimeout) + defer callbackTimeout.Stop() - // If transitioning to READY after a failure, wait for callback - if state == connectivity.Ready && seenTransientFailure { - expectedCallbacks++ - select { - case <-callbackInvoked: - // Callback received as expected - seenTransientFailure = false - case <-time.After(defaultTestTimeout): - mu.Lock() - got := callbackCount - mu.Unlock() - t.Fatalf("Timeout waiting for callback %d/%d after TRANSIENT_FAILURE→READY (got %d callbacks so far)", expectedCallbacks, tt.wantCallbackCount, got) - } + receivedCallbacks := 0 + for receivedCallbacks < tt.wantCallbackCount { + select { + case <-callbackInvoked: + receivedCallbacks++ + case <-callbackTimeout.C: + mu.Lock() + got := callbackCount + mu.Unlock() + t.Fatalf("Timeout waiting for callbacks: expected %d, received %d via channel, callback count is %d", tt.wantCallbackCount, receivedCallbacks, got) } } From 2ad8249a8975f1ab1bc5b9b8f84ffa4fabf5eec0 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Sun, 23 Nov 2025 14:11:11 +0300 Subject: [PATCH 6/8] Fix comments --- balancer/rls/control_channel.go | 142 +++++++-------------------- balancer/rls/control_channel_test.go | 10 -- 2 files changed, 38 insertions(+), 114 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index e27a9d4b5d25..de28fc3e9b4e 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -21,6 +21,7 @@ package rls import ( "context" "fmt" + "sync" "time" "google.golang.org/grpc" @@ -29,7 +30,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/buffer" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" @@ -57,15 +57,12 @@ type controlChannel struct { // hammering the RLS service while it is overloaded or down. throttler adaptiveThrottler - cc *grpc.ClientConn - client rlsgrpc.RouteLookupServiceClient - logger *internalgrpclog.PrefixLogger - connectivityStateCh *buffer.Unbounded - unsubscribe func() - monitorDoneCh chan struct{} - // testOnlyInitialReadyDone is closed when the monitoring goroutine - // processes the initial READY state. Only used in tests. - testOnlyInitialReadyDone chan struct{} + cc *grpc.ClientConn + client rlsgrpc.RouteLookupServiceClient + logger *internalgrpclog.PrefixLogger + unsubscribe func() + seenTransientFailure bool + mu sync.Mutex } // newControlChannel creates a controlChannel to rlsServerName and uses @@ -73,12 +70,9 @@ type controlChannel struct { // gRPC channel. func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) { ctrlCh := &controlChannel{ - rpcTimeout: rpcTimeout, - backToReadyFunc: backToReadyFunc, - throttler: newAdaptiveThrottler(), - connectivityStateCh: buffer.NewUnbounded(), - monitorDoneCh: make(chan struct{}), - testOnlyInitialReadyDone: make(chan struct{}), + rpcTimeout: rpcTimeout, + backToReadyFunc: backToReadyFunc, + throttler: newAdaptiveThrottler(), } ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh)) @@ -96,7 +90,6 @@ func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Dura ctrlCh.cc.Connect() ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc) ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName) - go ctrlCh.monitorConnectivityState() return ctrlCh, nil } @@ -105,7 +98,34 @@ func (cc *controlChannel) OnMessage(msg any) { if !ok { panic(fmt.Sprintf("Unexpected message type %T , wanted connectectivity.State type", msg)) } - cc.connectivityStateCh.Put(st) + + cc.mu.Lock() + defer cc.mu.Unlock() + + switch st { + case connectivity.Ready: + // Only reset backoff when transitioning from TRANSIENT_FAILURE to READY. + // This indicates the RLS server has recovered from being unreachable, so + // we reset backoff state in all cache entries to allow pending RPCs to + // proceed immediately. We skip benign transitions like READY → IDLE → READY + // since those don't represent actual failures. + if cc.seenTransientFailure { + cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") + cc.seenTransientFailure = false + if cc.backToReadyFunc != nil { + cc.backToReadyFunc() + } + } else { + cc.logger.Infof("Control channel is READY") + } + case connectivity.TransientFailure: + // Track that we've entered TRANSIENT_FAILURE state so we know to reset + // backoffs when we recover to READY. + cc.logger.Infof("Control channel is TRANSIENT_FAILURE") + cc.seenTransientFailure = true + default: + cc.logger.Infof("Control channel connectivity state is %s", st) + } } // dialOpts constructs the dial options for the control plane channel. @@ -152,94 +172,8 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st return dopts, nil } -func (cc *controlChannel) monitorConnectivityState() { - cc.logger.Infof("Starting connectivity state monitoring goroutine") - defer close(cc.monitorDoneCh) - - // Since we use two mechanisms to deal with RLS server being down: - // - adaptive throttling for the channel as a whole - // - exponential backoff on a per-request basis - // we need a way to avoid double-penalizing requests by counting failures - // toward both mechanisms when the RLS server is unreachable. - // - // To accomplish this, we monitor the state of the control plane channel. If - // the state has been TRANSIENT_FAILURE since the last time it was in state - // READY, and it then transitions into state READY, we push on a channel - // which is being read by the LB policy. - // - // The LB the policy will iterate through the cache to reset the backoff - // timeouts in all cache entries. Specifically, this means that it will - // reset the backoff state and cancel the pending backoff timer. Note that - // when cancelling the backoff timer, just like when the backoff timer fires - // normally, a new picker is returned to the channel, to force it to - // re-process any wait-for-ready RPCs that may still be queued if we failed - // them while we were in backoff. However, we should optimize this case by - // returning only one new picker, regardless of how many backoff timers are - // cancelled. - - // Wait for the control channel to become READY for the first time. - for s, ok := <-cc.connectivityStateCh.Get(); s != connectivity.Ready; s, ok = <-cc.connectivityStateCh.Get() { - if !ok { - return - } - - cc.connectivityStateCh.Load() - if s == connectivity.Shutdown { - return - } - } - cc.connectivityStateCh.Load() - cc.logger.Infof("Connectivity state is READY") - - // Signal tests that initial READY has been processed - close(cc.testOnlyInitialReadyDone) - - // Track whether we've seen TRANSIENT_FAILURE since the last READY state. - // We only want to reset backoff when recovering from an actual failure, - // not when transitioning through benign states like IDLE. - seenTransientFailure := false - - for { - s, ok := <-cc.connectivityStateCh.Get() - if !ok { - return - } - cc.connectivityStateCh.Load() - - if s == connectivity.Shutdown { - return - } - - // Track if we've entered TRANSIENT_FAILURE state - if s == connectivity.TransientFailure { - seenTransientFailure = true - } - - // Only reset backoff when transitioning from TRANSIENT_FAILURE to READY. - // This indicates the RLS server has recovered from being unreachable, so - // we reset backoff state in all cache entries to allow pending RPCs to - // proceed immediately. We skip benign transitions like READY → IDLE → READY - // since those don't represent actual failures. - if s == connectivity.Ready { - if seenTransientFailure { - cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") - if cc.backToReadyFunc != nil { - cc.backToReadyFunc() - } - seenTransientFailure = false - } else { - cc.logger.Infof("Control channel back to READY (no prior failure)") - } - } - - cc.logger.Infof("Connectivity state is %s", s) - } -} - func (cc *controlChannel) close() { cc.unsubscribe() - cc.connectivityStateCh.Close() - <-cc.monitorDoneCh cc.cc.Close() cc.logger.Infof("Shutdown") } diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 2966cf086955..dbe41db893a1 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -537,16 +537,6 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { } defer ctrlCh.close() - // Wait for the monitoring goroutine to process the initial READY state - // before injecting test states. This ensures our injected states are - // processed in the main monitoring loop, not consumed during initialization. - select { - case <-ctrlCh.testOnlyInitialReadyDone: - // Initial READY processed by monitoring goroutine - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout waiting for monitoring goroutine to process initial READY state") - } - // Inject all test states for _, state := range tt.states { ctrlCh.OnMessage(state) From faee5a0231fc3b656162103232e0306915aa6494 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Mon, 24 Nov 2025 20:39:43 +0300 Subject: [PATCH 7/8] address reviews --- balancer/rls/control_channel.go | 2 +- balancer/rls/control_channel_test.go | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index de28fc3e9b4e..ca98a0cfc9d0 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -121,7 +121,7 @@ func (cc *controlChannel) OnMessage(msg any) { case connectivity.TransientFailure: // Track that we've entered TRANSIENT_FAILURE state so we know to reset // backoffs when we recover to READY. - cc.logger.Infof("Control channel is TRANSIENT_FAILURE") + cc.logger.Warningf("Control channel is TRANSIENT_FAILURE") cc.seenTransientFailure = true default: cc.logger.Infof("Control channel connectivity state is %s", st) diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index dbe41db893a1..9f6084bc7319 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -472,28 +472,32 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { tests := []struct { name string + description string states []connectivity.State wantCallbackCount int }{ { - name: "READY → TRANSIENT_FAILURE → READY triggers callback", - states: []connectivity.State{ + name: "ready_after_transient_failure", + description: "ready after transient failure triggers callback to reset the timer.", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Ready, }, wantCallbackCount: 1, }, { - name: "READY → IDLE → READY does not trigger callback", - states: []connectivity.State{ + name: "ready_after_idle", + description: "ready after idle does not trigger callback", + states: []connectivity.State{ connectivity.Idle, connectivity.Ready, }, wantCallbackCount: 0, }, { - name: "Multiple failures trigger callback each time", - states: []connectivity.State{ + name: "multiple_failures", + description: "multiple failures trigger callback each time", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Ready, connectivity.TransientFailure, @@ -502,8 +506,9 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { wantCallbackCount: 2, }, { - name: "IDLE between failures doesn't affect callback", - states: []connectivity.State{ + name: "idle_between_failures", + description: "idle between failures doesn't affect callback", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Idle, connectivity.Ready, From 5dcc02c8832f4f536d20b26f998d9f26f31617c9 Mon Sep 17 00:00:00 2001 From: ulascansenturk Date: Mon, 24 Nov 2025 20:52:32 +0300 Subject: [PATCH 8/8] gofmt --- balancer/rls/control_channel_test.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/balancer/rls/control_channel_test.go b/balancer/rls/control_channel_test.go index 9f6084bc7319..e5b61d4ac0d1 100644 --- a/balancer/rls/control_channel_test.go +++ b/balancer/rls/control_channel_test.go @@ -477,27 +477,27 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { wantCallbackCount int }{ { - name: "ready_after_transient_failure", - description: "ready after transient failure triggers callback to reset the timer.", - states: []connectivity.State{ + name: "ready_after_transient_failure", + description: "ready after transient failure triggers callback to reset the timer.", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Ready, }, wantCallbackCount: 1, }, { - name: "ready_after_idle", - description: "ready after idle does not trigger callback", - states: []connectivity.State{ + name: "ready_after_idle", + description: "ready after idle does not trigger callback", + states: []connectivity.State{ connectivity.Idle, connectivity.Ready, }, wantCallbackCount: 0, }, { - name: "multiple_failures", - description: "multiple failures trigger callback each time", - states: []connectivity.State{ + name: "multiple_failures", + description: "multiple failures trigger callback each time", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Ready, connectivity.TransientFailure, @@ -506,9 +506,9 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { wantCallbackCount: 2, }, { - name: "idle_between_failures", - description: "idle between failures doesn't affect callback", - states: []connectivity.State{ + name: "idle_between_failures", + description: "idle between failures doesn't affect callback", + states: []connectivity.State{ connectivity.TransientFailure, connectivity.Idle, connectivity.Ready,