Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions internal/xds/clients/internal/buffer/unbounded.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (b *Unbounded) Load() {
default:
}
} else if b.closing && !b.closed {
b.closed = true
close(b.c)
}
}
Expand Down Expand Up @@ -114,3 +115,23 @@ func (b *Unbounded) Close() {
close(b.c)
}
}

// Reset clears all buffered data in the unbounded buffer. This does not close
// the buffer, and new data may be Put() into it after a call to this method.
//
// It's expected to be used in scenarios where the buffered data is no longer
// relevant, and needs to be cleared.
func (b *Unbounded) Reset() {
b.mu.Lock()
defer b.mu.Unlock()

if b.closing {
return
}

b.backlog = nil
select {
case <-b.c:
default:
}
}
35 changes: 35 additions & 0 deletions internal/xds/clients/internal/buffer/unbounded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/internal/grpctest"
Expand Down Expand Up @@ -146,3 +147,37 @@ func (s) TestClose(t *testing.T) {
}
ub.Close() // ignored
}

// TestReset resets the buffer and makes sure that the buffer can be used after
// the reset.
func (s) TestReset(t *testing.T) {
ub := NewUnbounded()
if err := ub.Put(1); err != nil {
t.Fatalf("Unbounded.Put() = %v; want nil", err)
}
if err := ub.Put(2); err != nil {
t.Fatalf("Unbounded.Put() = %v; want nil", err)
}
if v, ok := <-ub.Get(); !ok {
t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true)
}
ub.Load()
ub.Reset()

// Make sure the buffer is empty after the reset. Wait for a short duration
// to make sure that no value is received on the read channel.
select {
case v, ok := <-ub.Get():
t.Errorf("Unbounded.Get() = %v, %v; want no value", v, ok)
case <-time.After(10 * time.Millisecond):
}

// Make sure the buffer can be used after the reset.
if err := ub.Put(1); err != nil {
t.Fatalf("Unbounded.Put() = %v; want nil", err)
}
if v, ok := <-ub.Get(); !ok {
t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true)
}
ub.Load()
}
158 changes: 39 additions & 119 deletions internal/xds/clients/xdsclient/ads_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ const (
perRPCVerbosityLevel = 9
)

// request represents a queued request message to be sent on the ADS stream. It
// contains the type of the resource and the list of resource names to be sent.
type request struct {
typ ResourceType
resourceNames []string
}

// response represents a response received on the ADS stream. It contains the
// type URL, version, and resources for the response.
type response struct {
Expand Down Expand Up @@ -76,9 +83,7 @@ type adsStreamEventHandler interface {
type resourceTypeState struct {
version string // Last acked version. Should not be reset when the stream breaks.
nonce string // Last received nonce. Should be reset when the stream breaks.
bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked.
subscribedResources map[string]*xdsresource.ResourceWatchState // Map of subscribed resource names to their state.
pendingWrite bool // True if there is a pending write for this resource type.
}

// adsStreamImpl provides the functionality associated with an ADS (Aggregated
Expand Down Expand Up @@ -158,8 +163,8 @@ func (s *adsStreamImpl) Stop() {

// subscribe subscribes to the given resource. It is assumed that multiple
// subscriptions for the same resource is deduped at the caller. A discovery
// request is sent out on the underlying stream for the resource type when there
// is sufficient flow control quota.
// request is sent out on the underlying stream, for the resource type with the
// newly subscribed resource.
func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
if s.logger.V(2) {
s.logger.Infof("Subscribing to resource %q of type %q", name, typ.TypeName)
Expand All @@ -172,26 +177,22 @@ func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
if !ok {
// An entry in the type state map is created as part of the first
// subscription request for this type.
state = &resourceTypeState{
subscribedResources: make(map[string]*xdsresource.ResourceWatchState),
bufferedRequests: make(chan struct{}, 1),
}
state = &resourceTypeState{subscribedResources: make(map[string]*xdsresource.ResourceWatchState)}
s.resourceTypeState[typ] = state
}

// Create state for the newly subscribed resource. The watch timer will
// be started when a request for this resource is actually sent out.
state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted}
state.pendingWrite = true

// Send a request for the resource type with updated subscriptions.
s.requestCh.Put(typ)
s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
}

// Unsubscribe cancels the subscription to the given resource. It is a no-op if
// the given resource does not exist. The watch expiry timer associated with the
// resource is stopped if one is active. A discovery request is sent out on the
// stream for the resource type when there is sufficient flow control quota.
// stream for the resource type with the updated set of resource names.
func (s *adsStreamImpl) Unsubscribe(typ ResourceType, name string) {
if s.logger.V(2) {
s.logger.Infof("Unsubscribing to resource %q of type %q", name, typ.TypeName)
Expand All @@ -213,14 +214,13 @@ func (s *adsStreamImpl) Unsubscribe(typ ResourceType, name string) {
rs.ExpiryTimer.Stop()
}
delete(state.subscribedResources, name)
state.pendingWrite = true

// Send a request for the resource type with updated subscriptions.
s.requestCh.Put(typ)
s.requestCh.Put(request{typ: typ, resourceNames: resourceNames(state.subscribedResources)})
}

// runner is a long-running goroutine that handles the lifecycle of the ADS
// stream. It spwans another goroutine to handle writes of discovery request
// stream. It spawns another goroutine to handle writes of discovery request
// messages on the stream. Whenever an existing stream fails, it performs
// exponential backoff (if no messages were received on that stream) before
// creating a new stream.
Expand Down Expand Up @@ -280,14 +280,14 @@ func (s *adsStreamImpl) send(ctx context.Context) {
stream = nil
continue
}
case req, ok := <-s.requestCh.Get():
case r, ok := <-s.requestCh.Get():
if !ok {
return
}
s.requestCh.Load()

typ := req.(ResourceType)
if err := s.sendNew(stream, typ); err != nil {
req := r.(request)
if err := s.sendNew(stream, req.typ, req.resourceNames); err != nil {
stream = nil
continue
}
Expand All @@ -296,11 +296,10 @@ func (s *adsStreamImpl) send(ctx context.Context) {
}

// sendNew attempts to send a discovery request based on a new subscription or
// unsubscription. If there is no flow control quota, the request is buffered
// and will be sent later. This method also starts the watch expiry timer for
// resources that were sent in the request for the first time, i.e. their watch
// state is `watchStateStarted`.
func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error {
// unsubscription. This method also starts the watch expiry timer for resources
// that were sent in the request for the first time, i.e. their watch state is
// `watchStateStarted`.
func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType, names []string) error {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -311,22 +310,16 @@ func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error {
return nil
}

// If local processing of the most recently received response is not yet
// complete, i.e. fc.pending == true, queue this write and return early.
// This allows us to batch writes for requests which are generated as part
// of local processing of a received response.
state := s.resourceTypeState[typ]
bufferRequest := func() {
select {
case state.bufferedRequests <- struct{}{}:
default:
}
state, ok := s.resourceTypeState[typ]
if !ok {
// State is created when the first subscription for this type is made.
panic(fmt.Sprintf("no state exists for resource type %v", typ))
}
if s.fc.runIfPending(bufferRequest) {
return nil
if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
return err
}

return s.sendMessageIfWritePendingLocked(stream, typ, state)
s.startWatchTimersLocked(typ, names)
return nil
}

// sendExisting sends out discovery requests for existing resources when
Expand All @@ -337,6 +330,10 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
s.mu.Lock()
defer s.mu.Unlock()

// Clear any queued requests. Previously subscribed to resources will be
// resent below.
s.requestCh.Reset()

for typ, state := range s.resourceTypeState {
// Reset only the nonces map when the stream restarts.
//
Expand All @@ -355,69 +352,15 @@ func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
continue
}

state.pendingWrite = true
if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
names := resourceNames(state.subscribedResources)
if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
return err
}
s.startWatchTimersLocked(typ, names)
}
return nil
}

// sendBuffered sends out discovery requests for resources that were buffered
// when they were subscribed to, because local processing of the previously
// received response was not yet complete.
//
// The stream argument is guaranteed to be non-nil.
func (s *adsStreamImpl) sendBuffered(stream clients.Stream) error {
s.mu.Lock()
defer s.mu.Unlock()

for typ, state := range s.resourceTypeState {
select {
case <-state.bufferedRequests:
if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
return err
}
default:
// No buffered request.
continue
}
}
return nil
}

// sendMessageIfWritePendingLocked attempts to sends a discovery request to the
// server, if there is a pending write for the given resource type.
//
// If the request is successfully sent, the pending write field is cleared and
// watch timers are started for the resources in the request.
//
// Caller needs to hold c.mu.
func (s *adsStreamImpl) sendMessageIfWritePendingLocked(stream clients.Stream, typ ResourceType, state *resourceTypeState) error {
if !state.pendingWrite {
if s.logger.V(2) {
s.logger.Infof("Skipping sending request for type %q, because all subscribed resources were already sent", typ.TypeURL)
}
return nil
}

names := resourceNames(state.subscribedResources)
if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
return err
}
state.pendingWrite = false

// Drain the buffered requests channel because we just sent a request for this
// resource type.
select {
case <-state.bufferedRequests:
default:
}

s.startWatchTimersLocked(typ, names)
return nil
}

// sendMessageLocked sends a discovery request to the server, populating the
// different fields of the message with the given parameters. Returns a non-nil
// error if the request could not be sent.
Expand Down Expand Up @@ -467,11 +410,9 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
// recv is responsible for receiving messages from the ADS stream.
//
// It performs the following actions:
// - Waits for local flow control to be available before sending buffered
// requests, if any.
// - Receives a message from the ADS stream. If an error is encountered here,
// it is handled by the onError method which propagates the error to all
// watchers.
// - Waits for local flow control to be available before it receives a message
// from the ADS stream. If an error is encountered here, it is handled by
// the onError method which propagates the error to all watchers.
// - Invokes the event handler's OnADSResponse method to process the message.
// - Sends an ACK or NACK to the server based on the response.
//
Expand All @@ -488,10 +429,6 @@ func (s *adsStreamImpl) recv(stream clients.Stream) bool {
return msgReceived
}

// Send out a request if anything was buffered while we were waiting for
// local processing of the previous response to complete.
s.sendBuffered(stream)

resources, url, version, nonce, err := s.recvMessage(stream)
if err != nil {
s.onError(err, msgReceived)
Expand Down Expand Up @@ -760,23 +697,6 @@ func (fc *adsFlowControl) setPending(pending bool) {
}
}

func (fc *adsFlowControl) runIfPending(f func()) bool {
fc.mu.Lock()
defer fc.mu.Unlock()

if fc.stopped {
return false
}

// If there's a pending update, run the function while still holding the
// lock. This ensures that the pending state does not change between the
// check and the function call.
if fc.pending {
f()
}
return fc.pending
}

// wait blocks until all the watchers have consumed the most recent update.
// Returns true if the flow control was stopped while waiting, false otherwise.
func (fc *adsFlowControl) wait() bool {
Expand Down
Loading