-
Notifications
You must be signed in to change notification settings - Fork 4.6k
rls: only reset backoff on recovery from TRANSIENT_FAILURE #8720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
a6fcb7e
ca49ae7
c7eb618
943240d
ed5ab2c
2ad8249
faee5a0
5dcc02c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ package rls | |
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "google.golang.org/grpc" | ||
|
|
@@ -29,7 +30,6 @@ import ( | |
| "google.golang.org/grpc/connectivity" | ||
| "google.golang.org/grpc/credentials/insecure" | ||
| "google.golang.org/grpc/internal" | ||
| "google.golang.org/grpc/internal/buffer" | ||
| internalgrpclog "google.golang.org/grpc/internal/grpclog" | ||
| "google.golang.org/grpc/internal/grpcsync" | ||
| "google.golang.org/grpc/internal/pretty" | ||
|
|
@@ -57,24 +57,22 @@ type controlChannel struct { | |
| // hammering the RLS service while it is overloaded or down. | ||
| throttler adaptiveThrottler | ||
|
|
||
| cc *grpc.ClientConn | ||
| client rlsgrpc.RouteLookupServiceClient | ||
| logger *internalgrpclog.PrefixLogger | ||
| connectivityStateCh *buffer.Unbounded | ||
| unsubscribe func() | ||
| monitorDoneCh chan struct{} | ||
| cc *grpc.ClientConn | ||
| client rlsgrpc.RouteLookupServiceClient | ||
| logger *internalgrpclog.PrefixLogger | ||
| unsubscribe func() | ||
| seenTransientFailure bool | ||
| mu sync.Mutex | ||
|
Comment on lines
+64
to
+65
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Please group the mutex and the fields that it guards as a separate block. It is a generally used convention that any fields that come right below a mutex are to be guarded by that mutex. |
||
| } | ||
|
|
||
| // newControlChannel creates a controlChannel to rlsServerName and uses | ||
| // serviceConfig, if non-empty, as the default service config for the underlying | ||
| // gRPC channel. | ||
| func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) { | ||
| ctrlCh := &controlChannel{ | ||
| rpcTimeout: rpcTimeout, | ||
| backToReadyFunc: backToReadyFunc, | ||
| throttler: newAdaptiveThrottler(), | ||
| connectivityStateCh: buffer.NewUnbounded(), | ||
| monitorDoneCh: make(chan struct{}), | ||
| rpcTimeout: rpcTimeout, | ||
| backToReadyFunc: backToReadyFunc, | ||
| throttler: newAdaptiveThrottler(), | ||
| } | ||
| ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh)) | ||
|
|
||
|
|
@@ -92,7 +90,6 @@ func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Dura | |
| ctrlCh.cc.Connect() | ||
| ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc) | ||
| ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName) | ||
| go ctrlCh.monitorConnectivityState() | ||
| return ctrlCh, nil | ||
| } | ||
|
|
||
|
|
@@ -101,7 +98,34 @@ func (cc *controlChannel) OnMessage(msg any) { | |
| if !ok { | ||
| panic(fmt.Sprintf("Unexpected message type %T , wanted connectectivity.State type", msg)) | ||
| } | ||
| cc.connectivityStateCh.Put(st) | ||
|
|
||
| cc.mu.Lock() | ||
| defer cc.mu.Unlock() | ||
|
|
||
| switch st { | ||
| case connectivity.Ready: | ||
| // Only reset backoff when transitioning from TRANSIENT_FAILURE to READY. | ||
| // This indicates the RLS server has recovered from being unreachable, so | ||
| // we reset backoff state in all cache entries to allow pending RPCs to | ||
| // proceed immediately. We skip benign transitions like READY → IDLE → READY | ||
| // since those don't represent actual failures. | ||
| if cc.seenTransientFailure { | ||
| cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Please guard INFO logs with a verbosity check of if cc.logger.V(2) {
cc.logger.Infof("Control channel back to READY after TRANSIENT_FAILURE")
}
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here and elsewhere in this method. Thanks. |
||
| cc.seenTransientFailure = false | ||
| if cc.backToReadyFunc != nil { | ||
| cc.backToReadyFunc() | ||
| } | ||
| } else { | ||
| cc.logger.Infof("Control channel is READY") | ||
| } | ||
| case connectivity.TransientFailure: | ||
| // Track that we've entered TRANSIENT_FAILURE state so we know to reset | ||
| // backoffs when we recover to READY. | ||
| cc.logger.Warningf("Control channel is TRANSIENT_FAILURE") | ||
| cc.seenTransientFailure = true | ||
| default: | ||
| cc.logger.Infof("Control channel connectivity state is %s", st) | ||
| } | ||
| } | ||
|
|
||
| // dialOpts constructs the dial options for the control plane channel. | ||
|
|
@@ -148,68 +172,8 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st | |
| return dopts, nil | ||
| } | ||
|
|
||
| func (cc *controlChannel) monitorConnectivityState() { | ||
| cc.logger.Infof("Starting connectivity state monitoring goroutine") | ||
| defer close(cc.monitorDoneCh) | ||
|
|
||
| // Since we use two mechanisms to deal with RLS server being down: | ||
| // - adaptive throttling for the channel as a whole | ||
| // - exponential backoff on a per-request basis | ||
| // we need a way to avoid double-penalizing requests by counting failures | ||
| // toward both mechanisms when the RLS server is unreachable. | ||
| // | ||
| // To accomplish this, we monitor the state of the control plane channel. If | ||
| // the state has been TRANSIENT_FAILURE since the last time it was in state | ||
| // READY, and it then transitions into state READY, we push on a channel | ||
| // which is being read by the LB policy. | ||
| // | ||
| // The LB the policy will iterate through the cache to reset the backoff | ||
| // timeouts in all cache entries. Specifically, this means that it will | ||
| // reset the backoff state and cancel the pending backoff timer. Note that | ||
| // when cancelling the backoff timer, just like when the backoff timer fires | ||
| // normally, a new picker is returned to the channel, to force it to | ||
| // re-process any wait-for-ready RPCs that may still be queued if we failed | ||
| // them while we were in backoff. However, we should optimize this case by | ||
| // returning only one new picker, regardless of how many backoff timers are | ||
| // cancelled. | ||
|
|
||
| // Wait for the control channel to become READY for the first time. | ||
| for s, ok := <-cc.connectivityStateCh.Get(); s != connectivity.Ready; s, ok = <-cc.connectivityStateCh.Get() { | ||
| if !ok { | ||
| return | ||
| } | ||
|
|
||
| cc.connectivityStateCh.Load() | ||
| if s == connectivity.Shutdown { | ||
| return | ||
| } | ||
| } | ||
| cc.connectivityStateCh.Load() | ||
| cc.logger.Infof("Connectivity state is READY") | ||
|
|
||
| for { | ||
| s, ok := <-cc.connectivityStateCh.Get() | ||
| if !ok { | ||
| return | ||
| } | ||
| cc.connectivityStateCh.Load() | ||
|
|
||
| if s == connectivity.Shutdown { | ||
| return | ||
| } | ||
| if s == connectivity.Ready { | ||
| cc.logger.Infof("Control channel back to READY") | ||
| cc.backToReadyFunc() | ||
| } | ||
|
|
||
| cc.logger.Infof("Connectivity state is %s", s) | ||
| } | ||
| } | ||
|
|
||
| func (cc *controlChannel) close() { | ||
| cc.unsubscribe() | ||
| cc.connectivityStateCh.Close() | ||
| <-cc.monitorDoneCh | ||
| cc.cc.Close() | ||
| cc.logger.Infof("Shutdown") | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,13 +26,15 @@ import ( | |
| "fmt" | ||
| "os" | ||
| "regexp" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/google/go-cmp/cmp" | ||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/balancer" | ||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/connectivity" | ||
| "google.golang.org/grpc/credentials" | ||
| "google.golang.org/grpc/internal" | ||
| rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" | ||
|
|
@@ -463,3 +465,124 @@ func (s) TestNewControlChannelUnsupportedCredsBundle(t *testing.T) { | |
| t.Fatal("newControlChannel succeeded when expected to fail") | ||
| } | ||
| } | ||
|
|
||
| // TestControlChannelConnectivityStateTransitions verifies that the control | ||
| // channel only resets backoff when recovering from TRANSIENT_FAILURE, not | ||
| // when going through benign state changes like READY → IDLE → READY. | ||
| func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| description string | ||
| states []connectivity.State | ||
| wantCallbackCount int | ||
| }{ | ||
| { | ||
| name: "ready_after_transient_failure", | ||
| description: "ready after transient failure triggers callback to reset the timer.", | ||
| states: []connectivity.State{ | ||
| connectivity.TransientFailure, | ||
| connectivity.Ready, | ||
| }, | ||
| wantCallbackCount: 1, | ||
| }, | ||
| { | ||
| name: "ready_after_idle", | ||
| description: "ready after idle does not trigger callback", | ||
| states: []connectivity.State{ | ||
| connectivity.Idle, | ||
| connectivity.Ready, | ||
| }, | ||
| wantCallbackCount: 0, | ||
| }, | ||
| { | ||
| name: "multiple_failures", | ||
| description: "multiple failures trigger callback each time", | ||
| states: []connectivity.State{ | ||
| connectivity.TransientFailure, | ||
| connectivity.Ready, | ||
| connectivity.TransientFailure, | ||
| connectivity.Ready, | ||
| }, | ||
| wantCallbackCount: 2, | ||
| }, | ||
| { | ||
| name: "idle_between_failures", | ||
| description: "idle between failures doesn't affect callback", | ||
| states: []connectivity.State{ | ||
| connectivity.TransientFailure, | ||
| connectivity.Idle, | ||
| connectivity.Ready, | ||
| }, | ||
| wantCallbackCount: 1, | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there might be some way to improve the test. Maybe we can use waitGroups , but I will defer to @easwars for his opinion on this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be able to test this in an e2e style, we would need to do make it possible for the test to see the connectivity state changes on the control channel, but without adding hooks into the test code. I propose the following:
var newConnectivityStateSubscriber = connStateSubscriber
func connStateSubscriber(sub grpcsync.Subscriber) grpcsync.Subscriber {
return sub
}
ctrlCh.unsubscribe = internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(ctrlCh.cc, newConnectivityStateSubscriber(ctrlCh))
type wrappingConnectivityStateSubscriber struct {
delegate grpcsync.Subscriber
connStateCh chan connectivity.State
}
func (w *wrappingConnectivityStateSubscriber) OnMessage(msg any) {
w.delegate.OnMessage(msg)
w.connStateCh <- msg.(connectivity.State)
}
// Override the connectivity state subscriber.
wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: make(chan connectivity.State, 1)}
origConnectivityStateSubscriber := newConnectivityStateSubscriber
newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber {
wrappedSubscriber.delegate = delegate
return wrappedSubscriber
}
defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }()
// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh, true)
// Verify that the control channel moves to READY.
wantStates := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
}
for _, wantState := range wantStates {
select {
case gotState := <-wrappedSubscriber.connStateCh:
if gotState != wantState {
t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState)
}
case <-ctx.Done():
t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState)
}
}
// Stop the RLS server.
lis.Stop()
// Verify that the control channel moves to IDLE.
wantStates = []connectivity.State{
connectivity.Idle,
}
for _, wantState := range wantStates {
select {
case gotState := <-wrappedSubscriber.connStateCh:
if gotState != wantState {
t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState)
}
case <-ctx.Done():
t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState)
}
}
// Make another RPC similar to the first one. Since the above cache entry
// would have expired by now, this should trigger another RLS request. And
// since the RLS server is down, RLS request will fail and the cache entry
// will enter backoff, and we have overridden the default backoff strategy to
// return a value which will keep this entry in backoff for the whole duration
// of the test.
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)
// Verify that the control channel moves to TRANSIENT_FAILURE.
wantStates = []connectivity.State{
connectivity.Connecting,
connectivity.TransientFailure,
}
for _, wantState := range wantStates {
select {
case gotState := <-wrappedSubscriber.connStateCh:
if gotState != wantState {
t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState)
}
case <-ctx.Done():
t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState)
}
}
// Restart the RLS server.
lis.Restart()The above will test the READY --> TF --> READY transition. For the READY --> IDLE --> READY, we need to restart the RLS server once the control channel goes IDLE, and then wait for it to go READY before attempting another RPC and verifying that backoffs are not reset. Let me know what you think about this approach. Thanks |
||
| // Start an RLS server | ||
| rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) | ||
|
|
||
| // Setup callback to count invocations | ||
| var mu sync.Mutex | ||
| var callbackCount int | ||
| // Buffered channel large enough to never block | ||
| callbackInvoked := make(chan struct{}, 100) | ||
| callback := func() { | ||
| mu.Lock() | ||
| callbackCount++ | ||
| mu.Unlock() | ||
| // Send to channel - should never block with large buffer | ||
| callbackInvoked <- struct{}{} | ||
| } | ||
|
|
||
| // Create control channel | ||
| ctrlCh, err := newControlChannel(rlsServer.Address, "", defaultTestTimeout, balancer.BuildOptions{}, callback) | ||
| if err != nil { | ||
| t.Fatalf("Failed to create control channel: %v", err) | ||
| } | ||
| defer ctrlCh.close() | ||
|
|
||
| // Inject all test states | ||
| for _, state := range tt.states { | ||
| ctrlCh.OnMessage(state) | ||
| } | ||
|
|
||
| // Wait for all expected callbacks with timeout | ||
| callbackTimeout := time.NewTimer(defaultTestTimeout) | ||
| defer callbackTimeout.Stop() | ||
|
|
||
| receivedCallbacks := 0 | ||
| for receivedCallbacks < tt.wantCallbackCount { | ||
| select { | ||
| case <-callbackInvoked: | ||
| receivedCallbacks++ | ||
| case <-callbackTimeout.C: | ||
| mu.Lock() | ||
| got := callbackCount | ||
| mu.Unlock() | ||
| t.Fatalf("Timeout waiting for callbacks: expected %d, received %d via channel, callback count is %d", tt.wantCallbackCount, receivedCallbacks, got) | ||
| } | ||
| } | ||
|
|
||
| // Verify final callback count matches expected | ||
| mu.Lock() | ||
| gotCallbackCount := callbackCount | ||
| mu.Unlock() | ||
|
|
||
| if gotCallbackCount != tt.wantCallbackCount { | ||
| t.Errorf("Got %d callback invocations, want %d", gotCallbackCount, tt.wantCallbackCount) | ||
| } | ||
|
|
||
| // Ensure no extra callbacks are invoked | ||
| select { | ||
| case <-callbackInvoked: | ||
| mu.Lock() | ||
| final := callbackCount | ||
| mu.Unlock() | ||
| t.Fatalf("Received more callbacks than expected: got %d, want %d", final, tt.wantCallbackCount) | ||
| case <-time.After(50 * time.Millisecond): | ||
| // Expected: no more callbacks | ||
| } | ||
| }) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.