Skip to content

Commit 7040b47

Browse files
committed
support on-new-leader hook for leader elector
1 parent 8f5aabe commit 7040b47

File tree

2 files changed

+125
-2
lines changed

2 files changed

+125
-2
lines changed

extended/src/main/java/io/kubernetes/client/extended/leaderelection/LeaderElector.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ public class LeaderElector {
2525
private LeaderElectionRecord observedRecord;
2626
private long observedTimeMilliSeconds;
2727
private final Consumer<Throwable> exceptionHandler;
28+
// used to implement OnNewLeader(), may lag slightly from the
29+
// value observedRecord.HolderIdentity if the transition has
30+
// not yet been reported.
31+
private String reportedLeader;
32+
private Consumer<String> onNewLeaderHook;
2833

2934
private ScheduledExecutorService scheduledWorkers =
3035
Executors.newSingleThreadScheduledExecutor(
@@ -81,7 +86,27 @@ public LeaderElector(LeaderElectionConfig config, Consumer<Throwable> exceptionH
8186
this.exceptionHandler = exceptionHandler;
8287
}
8388

89+
/**
90+
* Runs the leader election in foreground.
91+
*
92+
* @param startLeadingHook called when a LeaderElector client starts leading
93+
* @param stopLeadingHook called when a LeaderElector client stops leading
94+
*/
8495
public void run(Runnable startLeadingHook, Runnable stopLeadingHook) {
96+
run(startLeadingHook, stopLeadingHook, null);
97+
}
98+
99+
/**
100+
* Runs the leader election in foreground.
101+
*
102+
* @param startLeadingHook called when a LeaderElector client starts leading
103+
* @param stopLeadingHook called when a LeaderElector client stops leading
104+
* @param onNewLeaderHook called when the client observes a leader that is not the previously
105+
* observed leader. This includes the first observed leader when the client starts.
106+
*/
107+
public void run(
108+
Runnable startLeadingHook, Runnable stopLeadingHook, Consumer<String> onNewLeaderHook) {
109+
this.onNewLeaderHook = onNewLeaderHook;
85110
log.info("Start leader election with lock {}", config.getLock().describe());
86111
try {
87112
if (!acquire()) {
@@ -122,7 +147,9 @@ private boolean acquire() {
122147
log.info("Processing tryAcquireOrRenew successfully canceled");
123148
} catch (Throwable t) {
124149
this.exceptionHandler.accept(t);
125-
future.cancel(true);
150+
future.cancel(true); // make sure the acquire work doesn't overlap
151+
} finally {
152+
maybeReportTransition();
126153
}
127154
},
128155
0,
@@ -160,6 +187,7 @@ private void renewLoop() {
160187
// retry until success or interrupted
161188
while (!tryAcquireOrRenew()) {
162189
Thread.sleep(retryPeriodMillis);
190+
maybeReportTransition();
163191
}
164192
} catch (InterruptedException e) {
165193
return false;
@@ -176,7 +204,7 @@ private void renewLoop() {
176204
}
177205
renewResult = false;
178206
} finally {
179-
future.cancel(true);
207+
future.cancel(true); // make the lease worker doesn't overlap
180208
}
181209
if (renewResult) {
182210
if (log.isDebugEnabled()) {
@@ -280,6 +308,20 @@ private boolean isLeader() {
280308
return this.config.getLock().identity().equals(this.observedRecord.getHolderIdentity());
281309
}
282310

311+
private void maybeReportTransition() {
312+
if (this.observedRecord == null) {
313+
return;
314+
}
315+
if (this.observedRecord.getHolderIdentity().equals(this.reportedLeader)) {
316+
return;
317+
}
318+
this.reportedLeader = this.observedRecord.getHolderIdentity();
319+
320+
if (this.onNewLeaderHook != null) {
321+
this.hookWorkers.submit(() -> onNewLeaderHook.accept(this.reportedLeader));
322+
}
323+
}
324+
283325
private ThreadFactory makeThreadFactory(String nameFormat) {
284326
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
285327
}

extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.kubernetes.client.openapi.ApiException;
88
import java.time.Duration;
99
import java.util.ArrayList;
10+
import java.util.Date;
1011
import java.util.List;
1112
import java.util.concurrent.CountDownLatch;
1213
import java.util.concurrent.ExecutorService;
@@ -282,6 +283,86 @@ public void testLeaderElectionCaptureException() throws ApiException, Interrupte
282283
assertEquals(expectedException, actualException.get().getCause());
283284
}
284285

286+
@Test
287+
public void testLeaderElectionReportLeaderOnStart() throws ApiException, InterruptedException {
288+
when(lock.identity()).thenReturn("foo1");
289+
when(lock.get())
290+
.thenReturn(
291+
new LeaderElectionRecord() {
292+
{
293+
setHolderIdentity("foo2");
294+
setAcquireTime(new Date());
295+
setRenewTime(new Date());
296+
setLeaderTransitions(1);
297+
setLeaseDurationSeconds(60);
298+
}
299+
});
300+
List<String> notifications = new ArrayList<>();
301+
LeaderElectionConfig leaderElectionConfig = new LeaderElectionConfig();
302+
leaderElectionConfig.setLock(lock);
303+
leaderElectionConfig.setLeaseDuration(Duration.ofMillis(1000));
304+
leaderElectionConfig.setRetryPeriod(Duration.ofMillis(200));
305+
leaderElectionConfig.setRenewDeadline(Duration.ofMillis(700));
306+
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
307+
ExecutorService leaderElectionWorker = Executors.newFixedThreadPool(1);
308+
leaderElectionWorker.submit(
309+
() -> {
310+
leaderElector.run(() -> {}, () -> {}, (id) -> notifications.add(id));
311+
});
312+
313+
Thread.sleep(Duration.ofSeconds(2).toMillis());
314+
315+
when(lock.get())
316+
.thenReturn(
317+
new LeaderElectionRecord() {
318+
{
319+
setHolderIdentity("foo3");
320+
setAcquireTime(new Date());
321+
setRenewTime(new Date());
322+
setLeaderTransitions(1);
323+
setLeaseDurationSeconds(60);
324+
}
325+
});
326+
Thread.sleep(Duration.ofSeconds(2).toMillis());
327+
328+
assertEquals(2, notifications.size());
329+
assertEquals("foo2", notifications.get(0));
330+
assertEquals("foo3", notifications.get(1));
331+
}
332+
333+
@Test
334+
public void testLeaderElectionShouldReportLeaderItAcquiresOnStart()
335+
throws ApiException, InterruptedException {
336+
when(lock.identity()).thenReturn("foo1");
337+
when(lock.get())
338+
.thenReturn(
339+
new LeaderElectionRecord() {
340+
{
341+
setHolderIdentity("foo1");
342+
setAcquireTime(new Date());
343+
setRenewTime(new Date());
344+
setLeaderTransitions(1);
345+
setLeaseDurationSeconds(60);
346+
}
347+
});
348+
List<String> notifications = new ArrayList<>();
349+
LeaderElectionConfig leaderElectionConfig = new LeaderElectionConfig();
350+
leaderElectionConfig.setLock(lock);
351+
leaderElectionConfig.setLeaseDuration(Duration.ofMillis(1000));
352+
leaderElectionConfig.setRetryPeriod(Duration.ofMillis(200));
353+
leaderElectionConfig.setRenewDeadline(Duration.ofMillis(700));
354+
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
355+
ExecutorService leaderElectionWorker = Executors.newFixedThreadPool(1);
356+
leaderElectionWorker.submit(
357+
() -> {
358+
leaderElector.run(() -> {}, () -> {}, (id) -> notifications.add(id));
359+
});
360+
361+
Thread.sleep(Duration.ofSeconds(2).toMillis());
362+
assertEquals(1, notifications.size());
363+
assertEquals("foo1", notifications.get(0));
364+
}
365+
285366
public static class MockResourceLock implements Lock {
286367

287368
public static ReentrantLock lock;

0 commit comments

Comments
 (0)