@@ -127,6 +127,8 @@ func (m *EtcdManager) Acquire(ctx context.Context) (Lease, error) {
127127 ctx = context .Background ()
128128 }
129129
130+ linearizableCtx := clientv3 .WithRequireLeader (ctx )
131+
130132 session , err := concurrency .NewSession (m .client , concurrency .WithTTL (m .ttlSeconds ))
131133 if err != nil {
132134 if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
@@ -136,7 +138,7 @@ func (m *EtcdManager) Acquire(ctx context.Context) (Lease, error) {
136138 }
137139
138140 mutex := concurrency .NewMutex (session , m .key )
139- if err := mutex .TryLock (ctx ); err != nil {
141+ if err := mutex .TryLock (linearizableCtx ); err != nil {
140142 _ = session .Close ()
141143 if errors .Is (err , concurrency .ErrLocked ) {
142144 return nil , ErrNotAcquired
@@ -147,8 +149,9 @@ func (m *EtcdManager) Acquire(ctx context.Context) (Lease, error) {
147149 return nil , fmt .Errorf ("try lock: %w" , err )
148150 }
149151
150- if err := m .annotateLease (ctx , session , mutex ); err != nil {
151- cleanupCtx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
152+ if err := m .annotateLease (linearizableCtx , session , mutex ); err != nil {
153+ cleanupBase := clientv3 .WithRequireLeader (context .Background ())
154+ cleanupCtx , cancel := context .WithTimeout (cleanupBase , 5 * time .Second )
152155 _ = mutex .Unlock (cleanupCtx )
153156 cancel ()
154157 _ = session .Close ()
@@ -173,6 +176,8 @@ func (l *etcdLease) Release(ctx context.Context) error {
173176 ctx = context .Background ()
174177 }
175178
179+ ctx = clientv3 .WithRequireLeader (ctx )
180+
176181 unlockErr := l .mutex .Unlock (ctx )
177182 closeErr := l .session .Close ()
178183
0 commit comments