@@ -161,6 +161,9 @@ type SyncWorker struct {
161
161
// coordination between the sync loop and external callers
162
162
notify chan string
163
163
report chan SyncWorkerStatus
164
+ // startApply is used to start the initial attempt to apply a payload. It may be
165
+ // used consecutively to start additional attempts as well.
166
+ startApply chan string
164
167
165
168
// lock guards changes to these fields
166
169
lock sync.Mutex
@@ -199,7 +202,8 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder,
199
202
200
203
minimumReconcileInterval : reconcileInterval ,
201
204
202
- notify : make (chan string , 1 ),
205
+ notify : make (chan string , 1 ),
206
+ startApply : make (chan string , 1 ),
203
207
// report is a large buffered channel to improve local testing - most consumers should invoke
204
208
// Status() or use the result of calling Update() instead because the channel can be out of date
205
209
// if the reader is not fast enough.
@@ -547,7 +551,7 @@ func (w *SyncWorker) Update(ctx context.Context, generation int64, desired confi
547
551
}
548
552
msg := "new work is available"
549
553
select {
550
- case w .notify <- msg :
554
+ case w .startApply <- msg :
551
555
klog .V (2 ).Info ("Notify the sync worker that new work is available" )
552
556
default :
553
557
klog .V (2 ).Info ("The sync worker has already been notified that new work is available" )
@@ -565,6 +569,23 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) {
565
569
klog .V (2 ).Infof ("Start: starting sync worker" )
566
570
567
571
work := & SyncWork {}
572
+ initialStartApplyReceived := make (chan string , 1 ) // a local channel to not cause a potential deadlock
573
+
574
+ // Until Update() has finished at least once, we do nothing.
575
+ for loop := true ; loop ; {
576
+ select {
577
+ case <- ctx .Done ():
578
+ klog .V (2 ).Infof ("The sync worker was shut down while waiting for the initial signal" )
579
+ return
580
+ case <- w .notify :
581
+ // Do not queue any retries until the worker has started
582
+ klog .V (2 ).Infof ("The sync worker was notified; however, it is waiting for the initial signal" )
583
+ case msg := <- w .startApply :
584
+ klog .V (2 ).Infof ("The sync worker has received the initial signal" )
585
+ initialStartApplyReceived <- msg
586
+ loop = false
587
+ }
588
+ }
568
589
569
590
wait .Until (func () {
570
591
consecutiveErrors := 0
@@ -582,6 +603,10 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) {
582
603
klog .V (2 ).Infof ("Wait finished" )
583
604
case msg := <- w .notify :
584
605
klog .V (2 ).Info (msg )
606
+ case msg := <- w .startApply :
607
+ klog .V (2 ).Info (msg )
608
+ case msg := <- initialStartApplyReceived :
609
+ klog .V (2 ).Info (msg )
585
610
}
586
611
587
612
// determine whether we need to do work
0 commit comments