From 54c1664a33002793351358fb72e62d5bc663000f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 2 Oct 2025 22:08:50 +0000 Subject: [PATCH] xdsclient: stop batching writes on the ADS stream --- .../xds/clients/internal/buffer/unbounded.go | 21 +++ .../clients/internal/buffer/unbounded_test.go | 35 ++++ internal/xds/clients/xdsclient/ads_stream.go | 158 +++++------------- 3 files changed, 95 insertions(+), 119 deletions(-) diff --git a/internal/xds/clients/internal/buffer/unbounded.go b/internal/xds/clients/internal/buffer/unbounded.go index 3e6e99d0e8e0..0951af892be4 100644 --- a/internal/xds/clients/internal/buffer/unbounded.go +++ b/internal/xds/clients/internal/buffer/unbounded.go @@ -83,6 +83,7 @@ func (b *Unbounded) Load() { default: } } else if b.closing && !b.closed { + b.closed = true close(b.c) } } @@ -114,3 +115,23 @@ func (b *Unbounded) Close() { close(b.c) } } + +// Reset clears all buffered data in the unbounded buffer. This does not close +// the buffer, and new data may be Put() into it after a call to this method. +// +// It's expected to be used in scenarios where the buffered data is no longer +// relevant, and needs to be cleared. +func (b *Unbounded) Reset() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.closing { + return + } + + b.backlog = nil + select { + case <-b.c: + default: + } +} diff --git a/internal/xds/clients/internal/buffer/unbounded_test.go b/internal/xds/clients/internal/buffer/unbounded_test.go index b74c3afc0bf8..bcebd6ba453c 100644 --- a/internal/xds/clients/internal/buffer/unbounded_test.go +++ b/internal/xds/clients/internal/buffer/unbounded_test.go @@ -21,6 +21,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/internal/grpctest" @@ -146,3 +147,37 @@ func (s) TestClose(t *testing.T) { } ub.Close() // ignored } + +// TestReset resets the buffer and makes sure that the buffer can be used after +// the reset. +func (s) TestReset(t *testing.T) { + ub := NewUnbounded() + if err := ub.Put(1); err != nil { + t.Fatalf("Unbounded.Put() = %v; want nil", err) + } + if err := ub.Put(2); err != nil { + t.Fatalf("Unbounded.Put() = %v; want nil", err) + } + if v, ok := <-ub.Get(); !ok { + t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true) + } + ub.Load() + ub.Reset() + + // Make sure the buffer is empty after the reset. Wait for a short duration + // to make sure that no value is received on the read channel. + select { + case v, ok := <-ub.Get(): + t.Errorf("Unbounded.Get() = %v, %v; want no value", v, ok) + case <-time.After(10 * time.Millisecond): + } + + // Make sure the buffer can be used after the reset. + if err := ub.Put(1); err != nil { + t.Fatalf("Unbounded.Put() = %v; want nil", err) + } + if v, ok := <-ub.Get(); !ok { + t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true) + } + ub.Load() +} diff --git a/internal/xds/clients/xdsclient/ads_stream.go b/internal/xds/clients/xdsclient/ads_stream.go index 3ad62ac16db8..38ba2585e63c 100644 --- a/internal/xds/clients/xdsclient/ads_stream.go +++ b/internal/xds/clients/xdsclient/ads_stream.go @@ -48,6 +48,13 @@ const ( perRPCVerbosityLevel = 9 ) +// request represents a queued request message to be sent on the ADS stream. It +// contains the type of the resource and the list of resource names to be sent. +type request struct { + typ ResourceType + resourceNames []string +} + // response represents a response received on the ADS stream. It contains the // type URL, version, and resources for the response. type response struct { @@ -76,9 +83,7 @@ type adsStreamEventHandler interface { type resourceTypeState struct { version string // Last acked version. Should not be reset when the stream breaks. nonce string // Last received nonce. Should be reset when the stream breaks. - bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked. subscribedResources map[string]*xdsresource.ResourceWatchState // Map of subscribed resource names to their state. - pendingWrite bool // True if there is a pending write for this resource type. } // adsStreamImpl provides the functionality associated with an ADS (Aggregated @@ -158,8 +163,8 @@ func (s *adsStreamImpl) Stop() { // subscribe subscribes to the given resource. It is assumed that multiple // subscriptions for the same resource is deduped at the caller. A discovery -// request is sent out on the underlying stream for the resource type when there -// is sufficient flow control quota. +// request is sent out on the underlying stream, for the resource type with the +// newly subscribed resource. func (s *adsStreamImpl) subscribe(typ ResourceType, name string) { if s.logger.V(2) { s.logger.Infof("Subscribing to resource %q of type %q", name, typ.TypeName) @@ -172,26 +177,22 @@ func (s *adsStreamImpl) subscribe(typ ResourceType, name string) { if !ok { // An entry in the type state map is created as part of the first // subscription request for this type. - state = &resourceTypeState{ - subscribedResources: make(map[string]*xdsresource.ResourceWatchState), - bufferedRequests: make(chan struct{}, 1), - } + state = &resourceTypeState{subscribedResources: make(map[string]*xdsresource.ResourceWatchState)} s.resourceTypeState[typ] = state } // Create state for the newly subscribed resource. The watch timer will // be started when a request for this resource is actually sent out. state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted} - state.pendingWrite = true // Send a request for the resource type with updated subscriptions. - s.requestCh.Put(typ) + s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)}) } // Unsubscribe cancels the subscription to the given resource. It is a no-op if // the given resource does not exist. The watch expiry timer associated with the // resource is stopped if one is active. A discovery request is sent out on the -// stream for the resource type when there is sufficient flow control quota. +// stream for the resource type with the updated set of resource names. func (s *adsStreamImpl) Unsubscribe(typ ResourceType, name string) { if s.logger.V(2) { s.logger.Infof("Unsubscribing to resource %q of type %q", name, typ.TypeName) @@ -213,14 +214,13 @@ func (s *adsStreamImpl) Unsubscribe(typ ResourceType, name string) { rs.ExpiryTimer.Stop() } delete(state.subscribedResources, name) - state.pendingWrite = true // Send a request for the resource type with updated subscriptions. - s.requestCh.Put(typ) + s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)}) } // runner is a long-running goroutine that handles the lifecycle of the ADS -// stream. It spwans another goroutine to handle writes of discovery request +// stream. It spawns another goroutine to handle writes of discovery request // messages on the stream. Whenever an existing stream fails, it performs // exponential backoff (if no messages were received on that stream) before // creating a new stream. @@ -280,14 +280,14 @@ func (s *adsStreamImpl) send(ctx context.Context) { stream = nil continue } - case req, ok := <-s.requestCh.Get(): + case r, ok := <-s.requestCh.Get(): if !ok { return } s.requestCh.Load() - typ := req.(ResourceType) - if err := s.sendNew(stream, typ); err != nil { + req := r.(request) + if err := s.sendNew(stream, req.typ, req.resourceNames); err != nil { stream = nil continue } @@ -296,11 +296,10 @@ func (s *adsStreamImpl) send(ctx context.Context) { } // sendNew attempts to send a discovery request based on a new subscription or -// unsubscription. If there is no flow control quota, the request is buffered -// and will be sent later. This method also starts the watch expiry timer for -// resources that were sent in the request for the first time, i.e. their watch -// state is `watchStateStarted`. -func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error { +// unsubscription. This method also starts the watch expiry timer for resources +// that were sent in the request for the first time, i.e. their watch state is +// `watchStateStarted`. +func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType, names []string) error { s.mu.Lock() defer s.mu.Unlock() @@ -311,22 +310,16 @@ func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error { return nil } - // If local processing of the most recently received response is not yet - // complete, i.e. fc.pending == true, queue this write and return early. - // This allows us to batch writes for requests which are generated as part - // of local processing of a received response. - state := s.resourceTypeState[typ] - bufferRequest := func() { - select { - case state.bufferedRequests <- struct{}{}: - default: - } + state, ok := s.resourceTypeState[typ] + if !ok { + // State is created when the first subscription for this type is made. + panic(fmt.Sprintf("no state exists for resource type %v", typ)) } - if s.fc.runIfPending(bufferRequest) { - return nil + if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil { + return err } - - return s.sendMessageIfWritePendingLocked(stream, typ, state) + s.startWatchTimersLocked(typ, names) + return nil } // sendExisting sends out discovery requests for existing resources when @@ -337,6 +330,10 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error { s.mu.Lock() defer s.mu.Unlock() + // Clear any queued requests. Previously subscribed to resources will be + // resent below. + s.requestCh.Reset() + for typ, state := range s.resourceTypeState { // Reset only the nonces map when the stream restarts. // @@ -355,69 +352,15 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error { continue } - state.pendingWrite = true - if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil { + names := resourceNames(state.subscribedResources) + if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil { return err } + s.startWatchTimersLocked(typ, names) } return nil } -// sendBuffered sends out discovery requests for resources that were buffered -// when they were subscribed to, because local processing of the previously -// received response was not yet complete. -// -// The stream argument is guaranteed to be non-nil. -func (s *adsStreamImpl) sendBuffered(stream clients.Stream) error { - s.mu.Lock() - defer s.mu.Unlock() - - for typ, state := range s.resourceTypeState { - select { - case <-state.bufferedRequests: - if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil { - return err - } - default: - // No buffered request. - continue - } - } - return nil -} - -// sendMessageIfWritePendingLocked attempts to sends a discovery request to the -// server, if there is a pending write for the given resource type. -// -// If the request is successfully sent, the pending write field is cleared and -// watch timers are started for the resources in the request. -// -// Caller needs to hold c.mu. -func (s *adsStreamImpl) sendMessageIfWritePendingLocked(stream clients.Stream, typ ResourceType, state *resourceTypeState) error { - if !state.pendingWrite { - if s.logger.V(2) { - s.logger.Infof("Skipping sending request for type %q, because all subscribed resources were already sent", typ.TypeURL) - } - return nil - } - - names := resourceNames(state.subscribedResources) - if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil { - return err - } - state.pendingWrite = false - - // Drain the buffered requests channel because we just sent a request for this - // resource type. - select { - case <-state.bufferedRequests: - default: - } - - s.startWatchTimersLocked(typ, names) - return nil -} - // sendMessageLocked sends a discovery request to the server, populating the // different fields of the message with the given parameters. Returns a non-nil // error if the request could not be sent. @@ -467,11 +410,9 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string, // recv is responsible for receiving messages from the ADS stream. // // It performs the following actions: -// - Waits for local flow control to be available before sending buffered -// requests, if any. -// - Receives a message from the ADS stream. If an error is encountered here, -// it is handled by the onError method which propagates the error to all -// watchers. +// - Waits for local flow control to be available before it receives a message +// from the ADS stream. If an error is encountered here, it is handled by +// the onError method which propagates the error to all watchers. // - Invokes the event handler's OnADSResponse method to process the message. // - Sends an ACK or NACK to the server based on the response. // @@ -488,10 +429,6 @@ func (s *adsStreamImpl) recv(stream clients.Stream) bool { return msgReceived } - // Send out a request if anything was buffered while we were waiting for - // local processing of the previous response to complete. - s.sendBuffered(stream) - resources, url, version, nonce, err := s.recvMessage(stream) if err != nil { s.onError(err, msgReceived) @@ -760,23 +697,6 @@ func (fc *adsFlowControl) setPending(pending bool) { } } -func (fc *adsFlowControl) runIfPending(f func()) bool { - fc.mu.Lock() - defer fc.mu.Unlock() - - if fc.stopped { - return false - } - - // If there's a pending update, run the function while still holding the - // lock. This ensures that the pending state does not change between the - // check and the function call. - if fc.pending { - f() - } - return fc.pending -} - // wait blocks until all the watchers have consumed the most recent update. // Returns true if the flow control was stopped while waiting, false otherwise. func (fc *adsFlowControl) wait() bool {