Skip to content

Commit 60e7e34

Browse files
committed
Require linearizable contexts for etcd lock operations
1 parent 5983f7c commit 60e7e34

File tree

6 files changed

+555
-21
lines changed

6 files changed

+555
-21
lines changed

pkg/cooldown/etcd.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ func (m *EtcdManager) Status(ctx context.Context) (Status, error) {
9494
ctx = context.Background()
9595
}
9696

97-
resp, err := m.client.Get(ctx, m.key)
97+
linearizableCtx := clientv3.WithRequireLeader(ctx)
98+
99+
resp, err := m.client.Get(linearizableCtx, m.key)
98100
if err != nil {
99101
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
100102
return Status{}, err
@@ -122,7 +124,8 @@ func (m *EtcdManager) Status(ctx context.Context) (Status, error) {
122124
status.ExpiresAt = startedAt.Add(time.Duration(record.DurationSec) * time.Second)
123125
}
124126
if kv.Lease != 0 {
125-
ttlResp, ttlErr := m.client.TimeToLive(ctx, clientv3.LeaseID(kv.Lease))
127+
ttlCtx := clientv3.WithRequireLeader(ctx)
128+
ttlResp, ttlErr := m.client.TimeToLive(ttlCtx, clientv3.LeaseID(kv.Lease))
126129
if ttlErr != nil {
127130
if errors.Is(ttlErr, context.Canceled) || errors.Is(ttlErr, context.DeadlineExceeded) {
128131
return Status{}, ttlErr

pkg/lock/etcd_manager.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ func (m *EtcdManager) Acquire(ctx context.Context) (Lease, error) {
127127
ctx = context.Background()
128128
}
129129

130+
linearizableCtx := clientv3.WithRequireLeader(ctx)
131+
130132
session, err := concurrency.NewSession(m.client, concurrency.WithTTL(m.ttlSeconds))
131133
if err != nil {
132134
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
@@ -136,7 +138,7 @@ func (m *EtcdManager) Acquire(ctx context.Context) (Lease, error) {
136138
}
137139

138140
mutex := concurrency.NewMutex(session, m.key)
139-
if err := mutex.TryLock(ctx); err != nil {
141+
if err := mutex.TryLock(linearizableCtx); err != nil {
140142
_ = session.Close()
141143
if errors.Is(err, concurrency.ErrLocked) {
142144
return nil, ErrNotAcquired
@@ -147,8 +149,9 @@ func (m *EtcdManager) Acquire(ctx context.Context) (Lease, error) {
147149
return nil, fmt.Errorf("try lock: %w", err)
148150
}
149151

150-
if err := m.annotateLease(ctx, session, mutex); err != nil {
151-
cleanupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
152+
if err := m.annotateLease(linearizableCtx, session, mutex); err != nil {
153+
cleanupBase := clientv3.WithRequireLeader(context.Background())
154+
cleanupCtx, cancel := context.WithTimeout(cleanupBase, 5*time.Second)
152155
_ = mutex.Unlock(cleanupCtx)
153156
cancel()
154157
_ = session.Close()
@@ -173,6 +176,8 @@ func (l *etcdLease) Release(ctx context.Context) error {
173176
ctx = context.Background()
174177
}
175178

179+
ctx = clientv3.WithRequireLeader(ctx)
180+
176181
unlockErr := l.mutex.Unlock(ctx)
177182
closeErr := l.session.Close()
178183

pkg/lock/etcd_manager_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ func TestEtcdManagerAnnotatesLease(t *testing.T) {
174174
}
175175

176176
key := internal.mutex.Key()
177-
resp, err := etcdClient.Get(context.Background(), key)
177+
ctx := clientv3.WithRequireLeader(context.Background())
178+
resp, err := etcdClient.Get(ctx, key)
178179
if err != nil {
179180
t.Fatalf("failed to read metadata: %v", err)
180181
}
@@ -206,7 +207,7 @@ func TestEtcdManagerAnnotatesLease(t *testing.T) {
206207
t.Fatalf("failed to release lock: %v", err)
207208
}
208209

209-
resp, err = etcdClient.Get(context.Background(), key)
210+
resp, err = etcdClient.Get(ctx, key)
210211
if err != nil {
211212
t.Fatalf("failed to read metadata after release: %v", err)
212213
}

pkg/orchestrator/loop.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -178,20 +178,14 @@ func (l *Loop) Run(ctx context.Context) error {
178178
return err
179179
}
180180
if err := l.executor.Execute(ctx, outcome.Command); err != nil {
181-
clearErr := error(nil)
182-
if outcome.cooldownStarted {
183-
clearErr = outcome.clearCooldown(context.Background())
184-
}
185-
releaseErr := outcome.ReleaseLock(context.Background())
181+
// Keep the cooldown marker active so other nodes continue to back off; the reboot
182+
// command may still be in progress even though it reported an error.
186183
execErr := fmt.Errorf("execute reboot command: %w", err)
187-
finalErr := execErr
188-
if clearErr != nil {
189-
finalErr = errors.Join(finalErr, clearErr)
190-
}
184+
releaseErr := outcome.ReleaseLock(context.Background())
191185
if releaseErr != nil {
192-
finalErr = errors.Join(finalErr, releaseErr)
186+
return errors.Join(execErr, releaseErr)
193187
}
194-
return finalErr
188+
return execErr
195189
}
196190
return nil
197191
}

pkg/orchestrator/loop_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func TestLoopStartsCooldownBeforeExecuting(t *testing.T) {
159159
}
160160
}
161161

162-
func TestLoopClearsCooldownWhenExecutorFails(t *testing.T) {
162+
func TestLoopKeepsCooldownWhenExecutorFails(t *testing.T) {
163163
cfg := baseConfig()
164164
startCalls := 0
165165
clearCalls := 0
@@ -189,8 +189,8 @@ func TestLoopClearsCooldownWhenExecutorFails(t *testing.T) {
189189
if startCalls != 1 {
190190
t.Fatalf("expected cooldown start to be invoked once, got %d", startCalls)
191191
}
192-
if clearCalls != 1 {
193-
t.Fatalf("expected cooldown clear to run on failure, got %d", clearCalls)
192+
if clearCalls != 0 {
193+
t.Fatalf("expected cooldown clear to be skipped on failure, got %d", clearCalls)
194194
}
195195
}
196196

0 commit comments

Comments
 (0)