@@ -22,7 +22,6 @@ import (
22
22
"context"
23
23
"fmt"
24
24
"sync"
25
- "sync/atomic"
26
25
"time"
27
26
28
27
"google.golang.org/grpc/grpclog"
@@ -103,11 +102,11 @@ type adsStreamImpl struct {
103
102
requestCh * buffer.Unbounded // Subscriptions and unsubscriptions are pushed here.
104
103
runnerDoneCh chan struct {} // Notify completion of runner goroutine.
105
104
cancel context.CancelFunc // To cancel the context passed to the runner goroutine.
105
+ fc * adsFlowControl // Flow control for ADS stream.
106
106
107
107
// Guards access to the below fields (and to the contents of the map).
108
108
mu sync.Mutex
109
109
resourceTypeState map [ResourceType ]* resourceTypeState // Map of resource types to their state.
110
- fc * adsFlowControl // Flow control for ADS stream.
111
110
firstRequest bool // False after the first request is sent out.
112
111
}
113
112
@@ -135,6 +134,7 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
135
134
streamCh : make (chan clients.Stream , 1 ),
136
135
requestCh : buffer .NewUnbounded (),
137
136
runnerDoneCh : make (chan struct {}),
137
+ fc : newADSFlowControl (),
138
138
resourceTypeState : make (map [ResourceType ]* resourceTypeState ),
139
139
}
140
140
@@ -150,6 +150,7 @@ func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
150
150
// Stop blocks until the stream is closed and all spawned goroutines exit.
151
151
func (s * adsStreamImpl ) Stop () {
152
152
s .cancel ()
153
+ s .fc .stop ()
153
154
s .requestCh .Close ()
154
155
<- s .runnerDoneCh
155
156
s .logger .Infof ("Shutdown ADS stream" )
@@ -240,9 +241,6 @@ func (s *adsStreamImpl) runner(ctx context.Context) {
240
241
}
241
242
242
243
s .mu .Lock ()
243
- // Flow control is a property of the underlying streaming RPC call and
244
- // needs to be initialized everytime a new one is created.
245
- s .fc = newADSFlowControl (s .logger )
246
244
s .firstRequest = true
247
245
s .mu .Unlock ()
248
246
@@ -256,7 +254,7 @@ func (s *adsStreamImpl) runner(ctx context.Context) {
256
254
257
255
// Backoff state is reset upon successful receipt of at least one
258
256
// message from the server.
259
- if s .recv (ctx , stream ) {
257
+ if s .recv (stream ) {
260
258
return backoff .ErrResetBackoff
261
259
}
262
260
return nil
@@ -318,11 +316,13 @@ func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error {
318
316
// This allows us to batch writes for requests which are generated as part
319
317
// of local processing of a received response.
320
318
state := s .resourceTypeState [typ ]
321
- if s . fc . pending . Load () {
319
+ bufferRequest := func () {
322
320
select {
323
321
case state .bufferedRequests <- struct {}{}:
324
322
default :
325
323
}
324
+ }
325
+ if s .fc .runIfPending (bufferRequest ) {
326
326
return nil
327
327
}
328
328
@@ -477,18 +477,19 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
477
477
//
478
478
// It returns a boolean indicating whether at least one message was received
479
479
// from the server.
480
- func (s * adsStreamImpl ) recv (ctx context. Context , stream clients.Stream ) bool {
480
+ func (s * adsStreamImpl ) recv (stream clients.Stream ) bool {
481
481
msgReceived := false
482
482
for {
483
- // Wait for ADS stream level flow control to be available, and send out
484
- // a request if anything was buffered while we were waiting for local
485
- // processing of the previous response to complete.
486
- if ! s .fc .wait (ctx ) {
483
+ // Wait for ADS stream level flow control to be available.
484
+ if s .fc .wait () {
487
485
if s .logger .V (2 ) {
488
- s .logger .Infof ("ADS stream context canceled " )
486
+ s .logger .Infof ("ADS stream stopped while waiting for flow control " )
489
487
}
490
488
return msgReceived
491
489
}
490
+
491
+ // Send out a request if anything was buffered while we were waiting for
492
+ // local processing of the previous response to complete.
492
493
s .sendBuffered (stream )
493
494
494
495
resources , url , version , nonce , err := s .recvMessage (stream )
@@ -508,8 +509,8 @@ func (s *adsStreamImpl) recv(ctx context.Context, stream clients.Stream) bool {
508
509
}
509
510
var resourceNames []string
510
511
var nackErr error
511
- s .fc .setPending ()
512
- resourceNames , nackErr = s .eventHandler .onResponse (resp , s .fc .onDone )
512
+ s .fc .setPending (true )
513
+ resourceNames , nackErr = s .eventHandler .onResponse (resp , sync . OnceFunc ( func () { s .fc .setPending ( false ) }) )
513
514
if xdsresource .ErrType (nackErr ) == xdsresource .ErrorTypeResourceTypeUnsupported {
514
515
// A general guiding principle is that if the server sends
515
516
// something the client didn't actually subscribe to, then the
@@ -707,69 +708,84 @@ func resourceNames(m map[string]*xdsresource.ResourceWatchState) []string {
707
708
return ret
708
709
}
709
710
710
- // adsFlowControl implements ADS stream level flow control that enables the
711
- // transport to block the reading of the next message off of the stream until
712
- // the previous update is consumed by all watchers.
711
+ // adsFlowControl implements ADS stream level flow control that enables the ADS
712
+ // stream to block the reading of the next message until the previous update is
713
+ // consumed by all watchers.
713
714
//
714
- // The lifetime of the flow control is tied to the lifetime of the stream.
715
+ // The lifetime of the flow control is tied to the lifetime of the stream. When
716
+ // the stream is closed, it is the responsibility of the caller to stop the flow
717
+ // control. This ensures that any goroutine blocked on the flow control's wait
718
+ // method is unblocked.
715
719
type adsFlowControl struct {
716
- logger * igrpclog. PrefixLogger
717
-
718
- // Whether the most recent update is pending consumption by all watchers.
719
- pending atomic. Bool
720
- // Channel used to notify when all the watchers have consumed the most
721
- // recent update. Wait() blocks on reading a value from this channel.
722
- readyCh chan struct {}
720
+ mu sync. Mutex
721
+ // cond is used to signal when the most recent update has been consumed, or
722
+ // the flow control has been stopped (in which case, waiters should be
723
+ // unblocked as well).
724
+ cond * sync. Cond
725
+ pending bool // indicates if the most recent update is pending consumption
726
+ stopped bool // indicates if the ADS stream has been stopped
723
727
}
724
728
725
729
// newADSFlowControl returns a new adsFlowControl.
726
- func newADSFlowControl (logger * igrpclog.PrefixLogger ) * adsFlowControl {
727
- return & adsFlowControl {
728
- logger : logger ,
729
- readyCh : make (chan struct {}, 1 ),
730
- }
730
+ func newADSFlowControl () * adsFlowControl {
731
+ fc := & adsFlowControl {}
732
+ fc .cond = sync .NewCond (& fc .mu )
733
+ return fc
731
734
}
732
735
733
- // setPending changes the internal state to indicate that there is an update
734
- // pending consumption by all watchers.
735
- func (fc * adsFlowControl ) setPending () {
736
- fc .pending .Store (true )
736
+ // stop marks the flow control as stopped and signals the condition variable to
737
+ // unblock any goroutine waiting on it.
738
+ func (fc * adsFlowControl ) stop () {
739
+ fc .mu .Lock ()
740
+ defer fc .mu .Unlock ()
741
+
742
+ fc .stopped = true
743
+ fc .cond .Broadcast ()
737
744
}
738
745
739
- // wait blocks until all the watchers have consumed the most recent update and
740
- // returns true. If the context expires before that, it returns false.
741
- func (fc * adsFlowControl ) wait (ctx context.Context ) bool {
742
- // If there is no pending update, there is no need to block.
743
- if ! fc .pending .Load () {
744
- // If all watchers finished processing the most recent update before the
745
- // `recv` goroutine made the next call to `Wait()`, there would be an
746
- // entry in the readyCh channel that needs to be drained to ensure that
747
- // the next call to `Wait()` doesn't unblock before it actually should.
748
- select {
749
- case <- fc .readyCh :
750
- default :
751
- }
752
- return true
746
+ // setPending changes the internal state to indicate whether there is an update
747
+ // pending consumption by all watchers. If there is no longer a pending update,
748
+ // the condition variable is signaled to allow the recv method to proceed.
749
+ func (fc * adsFlowControl ) setPending (pending bool ) {
750
+ fc .mu .Lock ()
751
+ defer fc .mu .Unlock ()
752
+
753
+ if fc .stopped {
754
+ return
753
755
}
754
756
755
- select {
756
- case <- ctx .Done ():
757
+ fc .pending = pending
758
+ if ! pending {
759
+ fc .cond .Broadcast ()
760
+ }
761
+ }
762
+
763
+ func (fc * adsFlowControl ) runIfPending (f func ()) bool {
764
+ fc .mu .Lock ()
765
+ defer fc .mu .Unlock ()
766
+
767
+ if fc .stopped {
757
768
return false
758
- case <- fc .readyCh :
759
- return true
760
769
}
770
+
771
+ // If there's a pending update, run the function while still holding the
772
+ // lock. This ensures that the pending state does not change between the
773
+ // check and the function call.
774
+ if fc .pending {
775
+ f ()
776
+ }
777
+ return fc .pending
761
778
}
762
779
763
- // onDone indicates that all watchers have consumed the most recent update.
764
- func (fc * adsFlowControl ) onDone () {
765
- select {
766
- // Writes to the readyCh channel should not block ideally. The default
767
- // branch here is to appease the paranoid mind.
768
- case fc .readyCh <- struct {}{}:
769
- default :
770
- if fc .logger .V (2 ) {
771
- fc .logger .Infof ("ADS stream flow control readyCh is full" )
772
- }
780
+ // wait blocks until all the watchers have consumed the most recent update.
781
+ // Returns true if the flow control was stopped while waiting, false otherwise.
782
+ func (fc * adsFlowControl ) wait () bool {
783
+ fc .mu .Lock ()
784
+ defer fc .mu .Unlock ()
785
+
786
+ for fc .pending && ! fc .stopped {
787
+ fc .cond .Wait ()
773
788
}
774
- fc .pending .Store (false )
789
+
790
+ return fc .stopped
775
791
}
0 commit comments