@@ -539,8 +539,29 @@ func (s *Sender) GetSnapshot() *ctpb.Update {
539539type updatesBuf struct {
540540 mu struct {
541541 syncutil.Mutex
542- // updated is signaled when a new item is inserted.
543- updated sync.Cond
542+
543+ // We use two condition variables that we atomically swap to avoid signaling
544+ // the same goroutine multiple times. Without this, a goroutine could:
545+ // 1. Wake up from a signal.
546+ // 2. Process its message.
547+ // 3. Start waiting again on the same condition variable.
548+ // 4. Get signaled again before other waiting goroutines get a chance.
549+ //
550+ // By swapping to a new condition variable when publishing a message, we
551+ // ensure that goroutines can only wait on the new one. This lets us safely
552+ // signal all waiters on the old condition variable without racing against
553+ // goroutines re-queuing on it.
554+ updated1 sync.Cond
555+ numWaiters1 int
556+
557+ // updated2 is signaled when a new item is inserted.
558+ updated2 sync.Cond
559+ numWaiters2 int
560+
561+ // activeCondVar is 0 or 1, indicating which conditional variable new
562+ // goroutines should be waiting on. This is flipped after each push.
563+ activeCondVar int
564+
544565 // data contains pointers to the Updates.
545566 data []* ctpb.Update
546567 // head points to the earliest update in the buffer. If the buffer is empty,
@@ -561,15 +582,36 @@ const updatesBufSize = 50
561582
562583func newUpdatesBuf () * updatesBuf {
563584 buf := & updatesBuf {}
564- buf .mu .updated .L = & buf .mu
585+ buf .mu .updated1 .L = & buf .mu
586+ buf .mu .updated2 .L = & buf .mu
565587 buf .mu .data = make ([]* ctpb.Update , updatesBufSize )
566588 return buf
567589}
568590
569591// Push adds a new update to the back of the buffer.
570592func (b * updatesBuf ) Push (ctx context.Context , update * ctpb.Update ) {
571593 b .mu .Lock ()
572- defer b .mu .Unlock ()
594+
595+ // The goal here is that we want to signal all goroutines who were waiting
596+ // for the next update. We know that the goroutine that we will signal will
597+ // wake up, and then perform some work and wait again for the next update. We
598+ // want to avoid the race condition where we can't differentiate between a
599+ // goroutine that is waiting for the next update and a goroutine that was
600+ // waiting, got woken up, performed some work, and is then waiting for the
601+ // next update.
602+ var condVar * sync.Cond
603+ var numWaiters * int
604+ if b .mu .activeCondVar == 0 {
605+ condVar = & b .mu .updated1
606+ numWaiters = & b .mu .numWaiters1
607+ } else {
608+ condVar = & b .mu .updated2
609+ numWaiters = & b .mu .numWaiters2
610+ }
611+
612+ // At this point, we know that any goroutine that we wake up will wait on the
613+ // next conditional variable.
614+ b .mu .activeCondVar = (b .mu .activeCondVar + 1 ) % 2
573615
574616 // If the buffer is not empty, sanity check the seq num.
575617 if b .sizeLocked () != 0 {
@@ -589,7 +631,21 @@ func (b *updatesBuf) Push(ctx context.Context, update *ctpb.Update) {
589631
590632 // Notify everybody who might have been waiting for this message - we expect
591633 // all the connections to be blocked waiting.
592- b .mu .updated .Broadcast ()
634+ b .mu .Unlock ()
635+ b .PaceBroadcastUpdate (condVar , numWaiters )
636+ }
637+
638+ // PaceBroadcastUpdate paces the conditional variable signaling to avoid
639+ // overloading the system.
640+ func (b * updatesBuf ) PaceBroadcastUpdate (condVar * sync.Cond , numWaiters * int ) {
641+ // TODO(ibrahim): We can just hook this with the taskPacer.
642+ b .mu .Lock ()
643+ originalNumWaiters := * numWaiters
644+ for originalNumWaiters > 0 {
645+ condVar .Signal ()
646+ originalNumWaiters --
647+ }
648+ b .mu .Unlock ()
593649}
594650
595651func (b * updatesBuf ) lastIdxLocked () int {
@@ -612,6 +668,23 @@ func (b *updatesBuf) GetBySeq(ctx context.Context, seqNum ctpb.SeqNum) (*ctpb.Up
612668 b .mu .Lock ()
613669 defer b .mu .Unlock ()
614670
671+ var condVar * sync.Cond
672+ var numWaiters * int
673+ if b .mu .activeCondVar == 0 {
674+ condVar = & b .mu .updated1
675+ numWaiters = & b .mu .numWaiters1
676+ } else {
677+ condVar = & b .mu .updated2
678+ numWaiters = & b .mu .numWaiters2
679+ }
680+
681+ // Increment the number of waiters on the active conditional variable, and
682+ // decrement the same counter when we return.
683+ * numWaiters ++
684+ defer func () {
685+ * numWaiters --
686+ }()
687+
615688 // Loop until the requested seqNum is added to the buffer.
616689 for {
617690 if b .mu .closed {
@@ -630,7 +703,7 @@ func (b *updatesBuf) GetBySeq(ctx context.Context, seqNum ctpb.SeqNum) (*ctpb.Up
630703 }
631704 // If the requested msg has not been produced yet, block.
632705 if seqNum == lastSeq + 1 {
633- b . mu . updated .Wait ()
706+ condVar .Wait ()
634707 continue
635708 }
636709 if seqNum > lastSeq + 1 {
@@ -666,7 +739,8 @@ func (b *updatesBuf) Close() {
666739 b .mu .Lock ()
667740 defer b .mu .Unlock ()
668741 b .mu .closed = true
669- b .mu .updated .Broadcast ()
742+ b .mu .updated1 .Broadcast ()
743+ b .mu .updated2 .Broadcast ()
670744}
671745
672746// connFactory is capable of creating new connections to specific nodes.
0 commit comments