Skip to content

Commit 340aed9

Browse files
Fix lock behavior
1 parent 59cc509 commit 340aed9

File tree

1 file changed

+31
-16
lines changed

1 file changed

+31
-16
lines changed

pkg/store/lock.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ type KubeLocker struct {
8282
namespace string
8383
objLock sync.Mutex
8484
lockCancellations map[string]context.CancelFunc
85+
lockCompletions map[string]chan struct{}
8586
coordinationClient coordv1.CoordinationV1Interface
86-
done chan struct{}
8787
}
8888

8989
// NewKubeLocker returns a Locker that is backed by a lock in Kubernetes.
@@ -102,14 +102,13 @@ func NewKubeLocker(
102102
}
103103

104104
coordinationClient := client.CoordinationV1()
105-
done := make(chan struct{}, 1)
106105

107106
return &KubeLocker{
108107
id: id,
109108
namespace: namespace,
110109
lockCancellations: map[string]context.CancelFunc{},
110+
lockCompletions: map[string]chan struct{}{},
111111
coordinationClient: coordinationClient,
112-
done: done,
113112
}, nil
114113
}
115114

@@ -124,6 +123,7 @@ func (k *KubeLocker) Acquire(ctx context.Context, name string) error {
124123
// Create a separate context for the lock itself
125124
lockCtx, lockCancel := context.WithCancel(context.Background())
126125
k.lockCancellations[name] = lockCancel
126+
k.lockCompletions[name] = make(chan struct{}, 1)
127127
k.objLock.Unlock()
128128

129129
leaseName := fmt.Sprintf("kubeapply-lock-%s", name)
@@ -154,8 +154,10 @@ func (k *KubeLocker) Acquire(ctx context.Context, name string) error {
154154
acquired <- struct{}{}
155155
},
156156
OnStoppedLeading: func() {
157+
k.objLock.Lock()
158+
defer k.objLock.Unlock()
157159
log.Warn("Lock lost")
158-
k.done <- struct{}{}
160+
k.lockCompletions[name] <- struct{}{}
159161
},
160162
},
161163
},
@@ -180,30 +182,43 @@ func (k *KubeLocker) Acquire(ctx context.Context, name string) error {
180182

181183
// Release releases the lock with the argument name.
182184
func (k *KubeLocker) Release(name string) error {
183-
k.objLock.Lock()
184-
defer k.objLock.Unlock()
185-
186-
log.Infof("Releasing lock with name %s", name)
187-
188-
cancel, ok := k.lockCancellations[name]
189-
if !ok {
190-
return fmt.Errorf("Lock was not acquired for name %s", name)
185+
// Do this in a separate function to prevent deadlock on k.objLock.
186+
if err := k.releaseHelper(name); err != nil {
187+
return err
191188
}
192189

193-
cancel()
194-
delete(k.lockCancellations, name)
195-
196190
log.Infof("Waiting for lock to be released")
197191
releaseCtx, releaseCancel := context.WithTimeout(
198192
context.Background(),
199193
kubeLockerReleaseTimeout,
200194
)
201195
defer releaseCancel()
202196

197+
k.objLock.Lock()
198+
completionChan := k.lockCompletions[name]
199+
k.objLock.Unlock()
200+
203201
select {
204-
case <-k.done:
202+
case <-completionChan:
203+
delete(k.lockCompletions, name)
205204
return nil
206205
case <-releaseCtx.Done():
207206
return releaseCtx.Err()
208207
}
209208
}
209+
210+
func (k *KubeLocker) releaseHelper(name string) error {
211+
k.objLock.Lock()
212+
defer k.objLock.Unlock()
213+
214+
log.Infof("Releasing lock with name %s", name)
215+
216+
cancel, ok := k.lockCancellations[name]
217+
if !ok {
218+
return fmt.Errorf("Lock was not acquired for name %s", name)
219+
}
220+
221+
cancel()
222+
delete(k.lockCancellations, name)
223+
return nil
224+
}

0 commit comments

Comments
 (0)