Skip to content

Commit 498ed6f

Browse files
committed
Force Lease Expiration When Leader Exits
Currently, when the leader exits (say, after receiving a SIGINT), the workers need to wait for its lease to expire before a leader is re-elected. This patch mimics the behaviour of the Go Client implementation of using ctx.Done() by capturing the SIGINT and forcing the expiration date to a past date and also sets the acquire_time to None to start the leader election.
1 parent d80165d commit 498ed6f

File tree

4 files changed

+80
-11
lines changed

4 files changed

+80
-11
lines changed

kubernetes/base/leaderelection/electionconfig.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
import logging
1717
logging.basicConfig(level=logging.INFO)
1818

19-
2019
class Config:
20+
2121
# Validate config, exit if an error is detected
22-
def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading):
22+
def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading, context):
2323
self.jitter_factor = 1.2
2424

2525
if lock is None:
@@ -53,6 +53,7 @@ def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted
5353
self.onstopped_leading = self.on_stoppedleading_callback
5454
else:
5555
self.onstopped_leading = onstopped_leading
56+
self.context = context
5657

5758
# Default callback for when the current candidate if a leader, stops leading
5859
def on_stoppedleading_callback(self):

kubernetes/base/leaderelection/example.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414

1515
import uuid
1616
from kubernetes import client, config
17-
from kubernetes.leaderelection import leaderelection
18-
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
19-
from kubernetes.leaderelection import electionconfig
20-
17+
import leaderelection
18+
from resourcelock.configmaplock import ConfigMapLock
19+
import electionconfig
2120

2221
# Authenticate using config file
2322
config.load_kube_config(config_file=r"")
@@ -33,6 +32,8 @@
3332
# Kubernetes namespace
3433
lock_namespace = "default"
3534

35+
context = leaderelection.Context()
36+
3637

3738
# The function that a user wants to run once a candidate is elected as a leader
3839
def example_func():
@@ -45,7 +46,7 @@ def example_func():
4546
# Create config
4647
config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17,
4748
renew_deadline=15, retry_period=5, onstarted_leading=example_func,
48-
onstopped_leading=None)
49+
onstopped_leading=None, context=context)
4950

5051
# Enter leader election
5152
leaderelection.LeaderElection(config).run()

kubernetes/base/leaderelection/leaderelection.py

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
import time
1818
import json
1919
import threading
20-
from .leaderelectionrecord import LeaderElectionRecord
20+
from leaderelectionrecord import LeaderElectionRecord
2121
import logging
22+
import signal
2223
# if condition to be removed when support for python2 will be removed
2324
if sys.version_info > (3, 0):
2425
from http import HTTPStatus
@@ -36,8 +37,21 @@
3637
lease.
3738
"""
3839

40+
class Context:
41+
def __init__(self):
42+
self.cancelled = False
43+
44+
def cancel(self):
45+
self.cancelled = True
46+
47+
# This currently only handles Ctrl+C on a leader, which is not the only way a leader may exit
48+
def handle_sigint(signal_received, frame):
49+
print("\nSIGINT received! Cancelling election...")
50+
if LeaderElection.global_context:
51+
LeaderElection.global_context.cancel()
3952

4053
class LeaderElection:
54+
global_context = None
4155
def __init__(self, election_config):
4256
if election_config is None:
4357
sys.exit("argument config not passed")
@@ -51,13 +65,18 @@ def __init__(self, election_config):
5165
# Latest update time of the lock
5266
self.observed_time_milliseconds = 0
5367

68+
LeaderElection.global_context = self.election_config.context
69+
70+
# Attach signal handler to Ctrl+C (SIGINT)
71+
signal.signal(signal.SIGINT, handle_sigint)
72+
5473
# Point of entry to Leader election
5574
def run(self):
5675
# Try to create/ acquire a lock
5776
if self.acquire():
5877
logging.info("{} successfully acquired lease".format(self.election_config.lock.identity))
5978

60-
# Start leading and call OnStartedLeading()
79+
# Start the leader callback in a new daemon thread.
6180
threading.daemon = True
6281
threading.Thread(target=self.election_config.onstarted_leading).start()
6382

@@ -72,13 +91,15 @@ def acquire(self):
7291
retry_period = self.election_config.retry_period
7392

7493
while True:
94+
7595
succeeded = self.try_acquire_or_renew()
7696

7797
if succeeded:
7898
return True
7999

80100
time.sleep(retry_period)
81101

102+
82103
def renew_loop(self):
83104
# Leader
84105
logging.info("Leader has entered renew loop and will try to update lease continuously")
@@ -87,10 +108,20 @@ def renew_loop(self):
87108
renew_deadline = self.election_config.renew_deadline * 1000
88109

89110
while True:
111+
# Check for context cancellation
112+
if self.election_config.context.cancelled:
113+
self.force_expire_lease()
114+
return
115+
90116
timeout = int(time.time() * 1000) + renew_deadline
91117
succeeded = False
92118

93119
while int(time.time() * 1000) < timeout:
120+
if self.election_config.context.cancelled:
121+
logging.info(f"Context cancelled during renew loop. Reason: {self.election_config.context.cancel_reason}")
122+
self.force_expire_lease()
123+
return
124+
94125
succeeded = self.try_acquire_or_renew()
95126

96127
if succeeded:
@@ -104,6 +135,41 @@ def renew_loop(self):
104135
# failed to renew, return
105136
return
106137

138+
def force_expire_lease(self, max_retries=3):
139+
"""
140+
Force the lease to be considered expired by updating the leader election record's renewTime
141+
to a value in the past. Retries the update if a conflict (HTTP 409) is encountered.
142+
"""
143+
expired_time = time.time() - self.election_config.lease_duration - 1 # Expired timestamp
144+
retries = 0
145+
while retries < max_retries:
146+
# Re-read the current state of the lock to get the latest version.
147+
lock_status, current_record = self.election_config.lock.get(
148+
self.election_config.lock.name,
149+
self.election_config.lock.namespace
150+
)
151+
# Create a new record using the current record's acquireTime if available.
152+
new_record = LeaderElectionRecord(
153+
self.election_config.lock.identity,
154+
str(self.election_config.lease_duration),
155+
None,
156+
str(expired_time)
157+
)
158+
update_status = self.election_config.lock.update(
159+
self.election_config.lock.name,
160+
self.election_config.lock.namespace,
161+
new_record
162+
)
163+
if update_status:
164+
logging.info("Lease forcibly expired.")
165+
return True
166+
else:
167+
logging.info(f"Conflict encountered, retrying update... (attempt {retries+1})")
168+
retries += 1
169+
time.sleep(0.5) # wait a bit before retrying, this is very hacky
170+
logging.info("Failed to force lease expiration after retries.")
171+
return False
172+
107173
def try_acquire_or_renew(self):
108174
now_timestamp = time.time()
109175
now = datetime.datetime.fromtimestamp(now_timestamp)

kubernetes/base/leaderelection/resourcelock/configmaplock.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import sys
15+
sys.path.append("..")
1516
from kubernetes.client.rest import ApiException
1617
from kubernetes import client, config
1718
from kubernetes.client.api_client import ApiClient
18-
from ..leaderelectionrecord import LeaderElectionRecord
19+
from leaderelectionrecord import LeaderElectionRecord
1920
import json
2021
import logging
2122
logging.basicConfig(level=logging.INFO)

0 commit comments

Comments
 (0)