@@ -24,6 +24,7 @@ type leaser struct {
2424 kubernetesInterface kubernetes.Interface
2525 leasablePodsCh <- chan * corev1.Pod
2626 leasedPodsCh chan <- * corev1.Pod
27+ leaseAcquiringWg * sync.WaitGroup
2728 leaseCancelByPod map [string ]context.CancelFunc
2829 holderName string
2930}
@@ -42,6 +43,7 @@ func newLeaser(kubernetesInterface kubernetes.Interface, leasablePodsCh <-chan *
4243 kubernetesInterface : kubernetesInterface ,
4344 leasablePodsCh : leasablePodsCh ,
4445 leasedPodsCh : leasedPodsCh ,
46+ leaseAcquiringWg : & sync.WaitGroup {},
4547 leaseCancelByPod : make (map [string ]context.CancelFunc ),
4648 holderName : holderName ,
4749 }, leasedPodsCh , nil
@@ -58,6 +60,7 @@ func (l *leaser) releaseAll(opts ...releaseOptions) {
5860 } else {
5961 opt = opts [0 ]
6062 }
63+ l .leaseAcquiringWg .Wait ()
6164 for name , leaseCancel := range l .leaseCancelByPod {
6265 if opt .except == name {
6366 continue
@@ -74,7 +77,6 @@ func (l *leaser) releaseAll(opts ...releaseOptions) {
7477func (l * leaser ) acquireLeaseForAllPods (ctx context.Context , opts KubernertesDiscoveryOptions ) {
7578 // NOTE:(ravilock) the usage of WaitGroup here is to ensure that we only close the leasedPodsCh
7679 // after all goroutines that might write to it are done. i.e. The goroutines that acquire leases for a buildkit pod.
77- wg := & sync.WaitGroup {}
7880 for leasablePod := range l .leasablePodsCh {
7981 if _ , found := l .leaseCancelByPod [leasablePod .Name ]; found {
8082 continue
@@ -83,17 +85,20 @@ func (l *leaser) acquireLeaseForAllPods(ctx context.Context, opts KubernertesDis
8385 leaseCtx , leaseCancel := context .WithCancel (ctx )
8486 l .leaseCancelByPod [leasablePod .Name ] = leaseCancel
8587
86- wg .Add (1 )
87- go l .acquireLeaseForPod (leaseCtx , leasablePod , opts , wg )
88+ l .leaseAcquiringWg .Add (1 )
89+ go func () {
90+ defer l .leaseAcquiringWg .Done ()
91+ l .acquireLeaseForPod (leaseCtx , leasablePod , opts )
92+ }()
8893 }
89- wg .Wait ()
94+ l . leaseAcquiringWg .Wait ()
9095 close (l .leasedPodsCh )
9196}
9297
9398// acquireLeaseForPod tries to acquire a lease for the given pod.
9499// it is a blocking call and only returns after the lease is lost or the given context is canceled.
95100// it should always be used in a separate goroutine.
96- func (l * leaser ) acquireLeaseForPod (ctx context.Context , pod * corev1.Pod , opts KubernertesDiscoveryOptions , wg * sync. WaitGroup ) {
101+ func (l * leaser ) acquireLeaseForPod (ctx context.Context , pod * corev1.Pod , opts KubernertesDiscoveryOptions ) {
97102 uniqueHolderName := fmt .Sprintf ("%s-%d" , l .holderName , time .Now ().Unix ())
98103 klog .V (4 ).Infof ("Attempting to acquire the lease for pod %s/%s under holder name %q..." , pod .Namespace , pod .Name , uniqueHolderName )
99104 leaderelection .RunOrDie (ctx , leaderelection.LeaderElectionConfig {
@@ -125,5 +130,4 @@ func (l *leaser) acquireLeaseForPod(ctx context.Context, pod *corev1.Pod, opts K
125130 },
126131 })
127132 klog .V (4 ).Infof ("Shutting off the lease for %s/%s pod" , pod .Namespace , pod .Name )
128- wg .Done ()
129133}
0 commit comments