Skip to content

Commit 6c06fe8

Browse files
committed
wip
Signed-off-by: wind57 <[email protected]>
1 parent 1744873 commit 6c06fe8

File tree

11 files changed

+499
-152
lines changed

11 files changed

+499
-152
lines changed

spring-cloud-kubernetes-commons/src/test/java/org/springframework/cloud/kubernetes/commons/leader/election/PodReadyRunnerTests.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.cloud.kubernetes.commons.leader.election;
1818

19-
import java.time.Duration;
2019
import java.util.concurrent.CompletableFuture;
2120
import java.util.concurrent.Executors;
2221
import java.util.concurrent.ScheduledExecutorService;
@@ -32,7 +31,7 @@
3231
import org.springframework.boot.test.system.OutputCaptureExtension;
3332

3433
import static org.assertj.core.api.Assertions.assertThat;
35-
import static org.awaitility.Awaitility.await;
34+
import static org.springframework.cloud.kubernetes.integration.tests.commons.Awaitilities.awaitUntil;
3635

3736
/**
3837
* @author wind57
@@ -60,9 +59,7 @@ void readinessOKFromTheFirstCycle(CapturedOutput output) throws Exception {
6059
assertThat(output.getOut()).contains("Pod : identity in namespace : namespace is ready");
6160
assertThat(output.getOut()).contains("canceling scheduled future because readiness succeeded");
6261

63-
await().atMost(Duration.ofSeconds(3))
64-
.pollInterval(Duration.ofMillis(200))
65-
.until(() -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
62+
awaitUntil(3, 200, () -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
6663
}
6764

6865
/**
@@ -88,9 +85,7 @@ void readinessOKFromTheSecondCycle(CapturedOutput output) throws Exception {
8885
assertThat(output.getOut()).contains("Pod : identity in namespace : namespace is ready");
8986
assertThat(output.getOut()).contains("canceling scheduled future because readiness succeeded");
9087

91-
await().atMost(Duration.ofSeconds(3))
92-
.pollInterval(Duration.ofMillis(200))
93-
.until(() -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
88+
awaitUntil(3, 200, () -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
9489
}
9590

9691
/**
@@ -121,9 +116,7 @@ void readinessFailsOnTheSecondCycle(CapturedOutput output) {
121116
assertThat(output.getOut()).contains("pod readiness for : identity failed with : fail on the second cycle");
122117
assertThat(output.getOut()).contains("canceling scheduled future because readiness failed");
123118

124-
await().atMost(Duration.ofSeconds(3))
125-
.pollInterval(Duration.ofMillis(200))
126-
.until(() -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
119+
awaitUntil(3, 200, () -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
127120
}
128121
assertThat(caught).isTrue();
129122
}
@@ -168,9 +161,7 @@ void readinessFailsOnTheSecondCycleAttachNewPipeline(CapturedOutput output) {
168161
assertThat(output.getOut()).contains("pod readiness for : identity failed with : fail on the second cycle");
169162
assertThat(output.getOut()).contains("readiness failed and we caught that");
170163

171-
await().atMost(Duration.ofSeconds(3))
172-
.pollInterval(Duration.ofMillis(200))
173-
.until(() -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
164+
awaitUntil(3, 200, () -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
174165
assertThat(output.getOut()).contains("canceling scheduled future because readiness failed");
175166
}
176167
assertThat(caught).isTrue();
@@ -222,15 +213,12 @@ void readinessCanceledOnTheSecondCycleAttachNewPipeline(CapturedOutput output) t
222213
assertThat(output.getOut()).doesNotContain("leader election for : identity was not successful");
223214
assertThat(output.getOut()).contains("readiness failed and we caught that");
224215

225-
await().atMost(Duration.ofSeconds(3000))
226-
.pollInterval(Duration.ofMillis(200))
227-
.until(() -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
216+
awaitUntil(3, 200, () -> output.getOut().contains("Shutting down executor : podReadyExecutor"));
228217

229218
assertThat(output.getOut()).contains("canceling scheduled future because completable future was cancelled");
230219
assertThat(output.getOut()).doesNotContain("canceling scheduled future because readiness failed");
231220
assertThat(output.getOut()).contains("scheduledFuture is canceled: true");
232221

233-
234222
}
235223
assertThat(caught).isTrue();
236224
cancelScheduler.shutdownNow();

spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInitiator.java

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ final class Fabric8LeaderElectionInitiator {
8888
*/
8989
@PostConstruct
9090
void postConstruct() {
91+
9192
LOG.info(() -> "starting leader initiator : " + candidateIdentity);
9293

9394
// wait until the pod is ready
@@ -101,25 +102,11 @@ void postConstruct() {
101102

102103
// wait in a different thread until the pod is ready
103104
// and don't block the main application from starting
104-
podReadyWaitingExecutor.submit(() -> {
105+
podReadyWaitingExecutor.execute(() -> {
105106
if (waitForPodReady) {
106-
107-
// if 'ready' is already completed at this point, thread will run this,
108-
// otherwise it will attach the pipeline and move on to
109-
// 'blockReadinessCheck'
110-
CompletableFuture<?> ready = podReadyFuture.whenComplete((ok, error) -> {
111-
if (error != null) {
112-
LOG.error(() -> "readiness failed for : " + candidateIdentity
113-
+ ", leader election will not start");
114-
}
115-
else {
116-
LOG.info(() -> candidateIdentity + " is ready");
117-
startLeaderElection();
118-
}
119-
});
120-
107+
CompletableFuture<?> ready = attachStatusLoggerPipeline();
121108
blockReadinessCheck(ready);
122-
109+
startLeaderElection();
123110
}
124111
else {
125112
startLeaderElection();
@@ -154,35 +141,46 @@ void preDestroy() {
154141
}
155142
}
156143

144+
// needed for testing only
145+
CompletableFuture<?> leaderFeature() {
146+
return leaderFuture;
147+
}
148+
157149
private void startLeaderElection() {
150+
158151
leaderFuture = leaderElector(leaderElectionConfig, fabric8KubernetesClient).start();
159-
leaderFuture.whenComplete((ok, error) -> {
160152

161-
if (ok != null) {
162-
LOG.info(() -> "leaderFuture finished normally, will re-start it for : " + candidateIdentity);
163-
startLeaderElection();
164-
return;
165-
}
153+
leaderFuture.whenComplete((ok, error) -> {
166154

167-
if (error instanceof CancellationException) {
168-
if (!destroyCalled) {
169-
LOG.warn(() -> "renewal failed for : " + candidateIdentity + ", will re-start it after : "
170-
+ leaderElectionProperties.waitAfterRenewalFailure().toSeconds() + " seconds");
171-
sleep();
172-
startLeaderElection();
155+
if (error != null) {
156+
if (error instanceof CancellationException) {
157+
if (!destroyCalled) {
158+
LOG.warn(() -> "renewal failed for : " + candidateIdentity + ", will re-start it after : "
159+
+ leaderElectionProperties.waitAfterRenewalFailure().toSeconds() + " seconds");
160+
sleep();
161+
startLeaderElection();
162+
}
163+
}
164+
else {
165+
LOG.warn(() -> "leader failed with : " + error.getMessage());
166+
LOG.warn(() -> "leader election is over for : " + candidateIdentity);
173167
}
174168
}
175169
else {
176-
LOG.warn(() -> "leader election is over for : " + candidateIdentity);
177-
}
178-
179-
try {
180-
leaderFuture.get();
181-
}
182-
catch (Exception e) {
183-
LOG.warn(() -> "leader election failed for : " + candidateIdentity + ". Trying to recover...");
170+
// ok is always null; since it does not represent anything, it just passes
171+
// the state further
172+
LOG.info(() -> "leaderFuture finished normally, will re-start it for : " + candidateIdentity);
173+
startLeaderElection();
184174
}
185175
});
176+
177+
try {
178+
leaderFuture.get();
179+
}
180+
catch (Exception e) {
181+
LOG.warn(() -> "leader election failed for : " + candidateIdentity + ". Trying to recover...");
182+
}
183+
186184
}
187185

188186
private LeaderElector leaderElector(LeaderElectionConfig config, KubernetesClient fabric8KubernetesClient) {
@@ -207,4 +205,21 @@ private void blockReadinessCheck(CompletableFuture<?> ready) {
207205
}
208206
}
209207

208+
/**
209+
* if 'ready' is already completed at this point, thread will run this,
210+
* otherwise it will attach the pipeline and move on to
211+
* 'blockReadinessCheck'
212+
*/
213+
private CompletableFuture<?> attachStatusLoggerPipeline() {
214+
return podReadyFuture.whenComplete((ok, error) -> {
215+
if (error != null) {
216+
LOG.error(() -> "readiness failed for : " + candidateIdentity
217+
+ ", leader election will not start");
218+
}
219+
else {
220+
LOG.info(() -> candidateIdentity + " is ready");
221+
}
222+
});
223+
}
224+
210225
}

spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-leader-election/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/AbstractLeaderElection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.atomic.AtomicInteger;
2121
import java.util.function.BooleanSupplier;
2222

23+
import io.fabric8.kubernetes.api.model.coordination.v1.Lease;
2324
import io.fabric8.kubernetes.client.Config;
2425
import io.fabric8.kubernetes.client.KubernetesClient;
2526
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
@@ -45,6 +46,8 @@
4546
@ExtendWith(OutputCaptureExtension.class)
4647
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
4748
properties = { "spring.main.cloud-platform=KUBERNETES", "spring.cloud.kubernetes.leader.election.enabled=true",
49+
"spring.cloud.kubernetes.leader.election.lease-duration=6s",
50+
"spring.cloud.kubernetes.leader.election.renew-deadline=5s",
4851
"logging.level.org.springframework.cloud.kubernetes.commons.leader.election=debug",
4952
"logging.level.org.springframework.cloud.kubernetes.fabric8.leader.election=debug" },
5053
classes = { App.class, AbstractLeaderElection.LocalConfiguration.class })
@@ -72,6 +75,9 @@ void afterEach() {
7275
.withTimeout(10, TimeUnit.SECONDS)
7376
.delete();
7477
}
78+
Lease getLease() {
79+
return kubernetesClient.leases().inNamespace("default").withName("spring-k8s-leader-election-lock").get();
80+
}
7581

7682
@TestConfiguration
7783
static class LocalConfiguration {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.kubernetes.fabric8.leader.election;
18+
19+
import java.time.ZonedDateTime;
20+
import java.util.function.Supplier;
21+
22+
import io.fabric8.kubernetes.api.model.coordination.v1.Lease;
23+
24+
import org.springframework.boot.test.system.CapturedOutput;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.springframework.cloud.kubernetes.integration.tests.commons.Awaitilities.awaitUntil;
28+
29+
final class Assertions {
30+
31+
private Assertions() {
32+
33+
}
34+
35+
static void assertAcquireAndRenew(CapturedOutput output, Supplier<Lease> leaseSupplier, String name) {
36+
// we have become the leader
37+
awaitUntil(60, 100, () -> output.getOut().contains(name + " is the new leader"));
38+
39+
// let's unwind some logs to see that the process is how we expect it to be
40+
41+
// 1. lease is used as the lock (comes from our code)
42+
assertThat(output.getOut()).contains("will use lease as the lock for leader election");
43+
44+
// 2. we start leader initiator for our hostname (comes from our code)
45+
assertThat(output.getOut()).contains("starting leader initiator : " + name);
46+
47+
// 3. we try to acquire the lease (comes from fabric8 code)
48+
assertThat(output.getOut()).contains(
49+
"Attempting to acquire leader lease 'LeaseLock: default - spring-k8s-leader-election-lock (" + name + ")'");
50+
51+
// 4. lease has been acquired
52+
assertThat(output.getOut())
53+
.contains("Acquired lease 'LeaseLock: default - spring-k8s-leader-election-lock (" + name + ")'");
54+
55+
// 5. we are the leader (comes from fabric8 code)
56+
assertThat(output.getOut()).contains("Leader changed from null to " + name);
57+
58+
// 6. wait until a renewal happens (comes from fabric code)
59+
// this one means that we have extended our leadership
60+
awaitUntil(30, 500,
61+
() -> output.getOut()
62+
.contains("Attempting to renew leader lease 'LeaseLock: "
63+
+ "default - spring-k8s-leader-election-lock (" + name + ")'"));
64+
65+
Lease lease = leaseSupplier.get();
66+
67+
ZonedDateTime currentAcquiredTime = lease.getSpec().getAcquireTime();
68+
assertThat(currentAcquiredTime).isNotNull();
69+
assertThat(lease.getSpec().getLeaseDurationSeconds()).isEqualTo(6);
70+
assertThat(lease.getSpec().getLeaseTransitions()).isEqualTo(0);
71+
72+
ZonedDateTime currentRenewalTime = lease.getSpec().getRenewTime();
73+
assertThat(currentRenewalTime).isNotNull();
74+
75+
// 7. renewal happens
76+
awaitUntil(4, 500, () -> {
77+
ZonedDateTime newRenewalTime = leaseSupplier.get().getSpec().getRenewTime();
78+
return newRenewalTime.isAfter(currentRenewalTime);
79+
});
80+
}
81+
82+
}

0 commit comments

Comments
 (0)