Skip to content

Commit b8e7d18

Browse files
committed
added tests
Signed-off-by: wind57 <[email protected]>
1 parent 2060f0a commit b8e7d18

File tree

13 files changed

+236
-287
lines changed

13 files changed

+236
-287
lines changed

spring-cloud-kubernetes-commons/src/main/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionProperties.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@
4141
* the lock to become the leader. In our current code,
4242
* this is what we use in LeaderInitiator::start,
4343
* more exactly in the scheduleAtFixRate
44+
* restartOnFailure: what to do when leader election future fails
45+
* with an Exception. Do we restart the leader election process
46+
* or let it fail and thus this instance never participates
47+
* in the leader election process.
4448
*
4549
*
4650
* First, we try to acquire the lock (lock is either a configmap or a lease)
@@ -84,7 +88,8 @@ public record LeaderElectionProperties(
8488
@DefaultValue("spring-k8s-leader-election-lock") String lockName,
8589
@DefaultValue("10s") Duration renewDeadline,
8690
@DefaultValue("2s") Duration retryPeriod,
87-
@DefaultValue("0s") Duration waitAfterRenewalFailure,
88-
@DefaultValue("false") boolean useConfigMapAsLock) {
91+
@DefaultValue("3s") Duration waitAfterRenewalFailure,
92+
@DefaultValue("false") boolean useConfigMapAsLock,
93+
@DefaultValue("true") boolean restartOnFailure) {
8994
// @formatter:on
9095
}

spring-cloud-kubernetes-commons/src/test/java/org/springframework/cloud/kubernetes/commons/leader/election/LeaderElectionPropertiesTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ void testDefaults() {
4444
Assertions.assertThat(properties.retryPeriod()).isEqualTo(Duration.ofSeconds(2));
4545
Assertions.assertThat(properties.waitAfterRenewalFailure()).isEqualTo(Duration.ofSeconds(0));
4646
Assertions.assertThat(properties.useConfigMapAsLock()).isFalse();
47+
Assertions.assertThat(properties.restartOnFailure()).isTrue();
4748
});
4849
}
4950

@@ -58,7 +59,8 @@ void testNonDefaults() {
5859
"spring.cloud.kubernetes.leader.election.renew-deadline=2d",
5960
"spring.cloud.kubernetes.leader.election.retry-period=3m",
6061
"spring.cloud.kubernetes.leader.election.wait-after-renewal-failure=13m",
61-
"spring.cloud.kubernetes.leader.election.use-config-map-as-lock=true")
62+
"spring.cloud.kubernetes.leader.election.use-config-map-as-lock=true",
63+
"spring.cloud.kubernetes.leader.election.restart-on-failure=false")
6264
.run(context -> {
6365
LeaderElectionProperties properties = context.getBean(LeaderElectionProperties.class);
6466
Assertions.assertThat(properties).isNotNull();
@@ -71,6 +73,7 @@ void testNonDefaults() {
7173
Assertions.assertThat(properties.retryPeriod()).isEqualTo(Duration.ofMinutes(3));
7274
Assertions.assertThat(properties.waitAfterRenewalFailure()).isEqualTo(Duration.ofMinutes(13));
7375
Assertions.assertThat(properties.useConfigMapAsLock()).isTrue();
76+
Assertions.assertThat(properties.restartOnFailure()).isFalse();
7477
});
7578
}
7679

spring-cloud-kubernetes-discovery/src/test/java/org/springframework/cloud/kubernetes/discovery/HealthEventListenerConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ HealthEventListener healthEventListener() {
3838
return new HealthEventListener();
3939
}
4040

41-
private static class HealthEventListener implements ApplicationListener<InstanceRegisteredEvent<?>> {
41+
private static final class HealthEventListener implements ApplicationListener<InstanceRegisteredEvent<?>> {
4242

4343
@Override
4444
public void onApplicationEvent(InstanceRegisteredEvent<?> event) {

spring-cloud-kubernetes-fabric8-leader/src/main/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Fabric8LeaderElectionInitiator.java

Lines changed: 17 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919
import java.util.concurrent.CancellationException;
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.ExecutorService;
22-
import java.util.concurrent.TimeUnit;
2322
import java.util.function.BooleanSupplier;
2423

2524
import io.fabric8.kubernetes.client.KubernetesClient;
2625
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
27-
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
2826
import jakarta.annotation.PostConstruct;
2927
import jakarta.annotation.PreDestroy;
3028

@@ -33,6 +31,10 @@
3331
import org.springframework.core.log.LogAccessor;
3432

3533
import static java.util.concurrent.Executors.newSingleThreadExecutor;
34+
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionInitiatorUtil.attachStatusLoggerPipeline;
35+
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionInitiatorUtil.blockReadinessCheck;
36+
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionInitiatorUtil.leaderElector;
37+
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Fabric8LeaderElectionInitiatorUtil.sleep;
3638

3739
/**
3840
* @author wind57
@@ -104,7 +106,7 @@ void postConstruct() {
104106
// and don't block the main application from starting
105107
podReadyWaitingExecutor.execute(() -> {
106108
if (waitForPodReady) {
107-
CompletableFuture<?> ready = attachStatusLoggerPipeline();
109+
CompletableFuture<?> ready = attachStatusLoggerPipeline(podReadyFuture, candidateIdentity);
108110
blockReadinessCheck(ready);
109111
startLeaderElection();
110112
}
@@ -157,69 +159,37 @@ private void startLeaderElection() {
157159
if (!destroyCalled) {
158160
LOG.warn(() -> "renewal failed for : " + candidateIdentity + ", will re-start it after : "
159161
+ leaderElectionProperties.waitAfterRenewalFailure().toSeconds() + " seconds");
160-
sleep();
161-
startLeaderElection();
162+
sleep(leaderElectionProperties);
163+
podReadyWaitingExecutor.execute(this::startLeaderElection);
162164
}
163165
}
164166
else {
165167
LOG.warn(() -> "leader failed with : " + error.getMessage());
166-
LOG.warn(() -> "leader election is over for : " + candidateIdentity);
168+
if (leaderElectionProperties.restartOnFailure()) {
169+
LOG.info(() -> "will restart leader election for : " + candidateIdentity);
170+
sleep(leaderElectionProperties);
171+
podReadyWaitingExecutor.execute(this::startLeaderElection);
172+
}
173+
else {
174+
LOG.warn(() -> "leader election is over for : " + candidateIdentity);
175+
}
167176
}
168177
}
169178
else {
170179
// ok is always null; since it does not represent anything, it just passes
171180
// the state further
172181
LOG.info(() -> "leaderFuture finished normally, will re-start it for : " + candidateIdentity);
173-
startLeaderElection();
182+
podReadyWaitingExecutor.execute(this::startLeaderElection);
174183
}
175184
});
176185

177186
try {
178187
leaderFuture.get();
179188
}
180189
catch (Exception e) {
181-
LOG.warn(() -> "leader election failed for : " + candidateIdentity + ". Trying to recover...");
190+
LOG.warn(() -> "leader election failed for : " + candidateIdentity);
182191
}
183192

184193
}
185194

186-
private LeaderElector leaderElector(LeaderElectionConfig config, KubernetesClient fabric8KubernetesClient) {
187-
return fabric8KubernetesClient.leaderElector().withConfig(config).build();
188-
}
189-
190-
private void sleep() {
191-
try {
192-
TimeUnit.SECONDS.sleep(leaderElectionProperties.waitAfterRenewalFailure().toSeconds());
193-
}
194-
catch (InterruptedException e) {
195-
throw new RuntimeException(e);
196-
}
197-
}
198-
199-
private void blockReadinessCheck(CompletableFuture<?> ready) {
200-
try {
201-
ready.get();
202-
}
203-
catch (Exception e) {
204-
throw new RuntimeException(e);
205-
}
206-
}
207-
208-
/**
209-
* if 'ready' is already completed at this point, thread will run this,
210-
* otherwise it will attach the pipeline and move on to
211-
* 'blockReadinessCheck'
212-
*/
213-
private CompletableFuture<?> attachStatusLoggerPipeline() {
214-
return podReadyFuture.whenComplete((ok, error) -> {
215-
if (error != null) {
216-
LOG.error(() -> "readiness failed for : " + candidateIdentity
217-
+ ", leader election will not start");
218-
}
219-
else {
220-
LOG.info(() -> candidateIdentity + " is ready");
221-
}
222-
});
223-
}
224-
225195
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.kubernetes.fabric8.leader.election;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import io.fabric8.kubernetes.client.KubernetesClient;
23+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
24+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
25+
26+
import org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties;
27+
import org.springframework.core.log.LogAccessor;
28+
29+
final class Fabric8LeaderElectionInitiatorUtil {
30+
31+
private static final LogAccessor LOG = new LogAccessor(Fabric8LeaderElectionInitiatorUtil.class);
32+
33+
private Fabric8LeaderElectionInitiatorUtil() {
34+
35+
}
36+
37+
/**
38+
* if 'ready' is already completed at this point, thread will run this, otherwise it
39+
* will attach the pipeline and move on to 'blockReadinessCheck'.
40+
*/
41+
static CompletableFuture<?> attachStatusLoggerPipeline(CompletableFuture<?> innerPodReadyFuture,
42+
String candidateIdentity) {
43+
return innerPodReadyFuture.whenComplete((ok, error) -> {
44+
if (error != null) {
45+
LOG.error(() -> "readiness failed for : " + candidateIdentity + ", leader election will not start");
46+
}
47+
else {
48+
LOG.info(() -> candidateIdentity + " is ready");
49+
}
50+
});
51+
}
52+
53+
static void sleep(LeaderElectionProperties leaderElectionProperties) {
54+
try {
55+
TimeUnit.SECONDS.sleep(leaderElectionProperties.waitAfterRenewalFailure().toSeconds());
56+
}
57+
catch (InterruptedException e) {
58+
throw new RuntimeException(e);
59+
}
60+
}
61+
62+
static LeaderElector leaderElector(LeaderElectionConfig config, KubernetesClient fabric8KubernetesClient) {
63+
return fabric8KubernetesClient.leaderElector().withConfig(config).build();
64+
}
65+
66+
static void blockReadinessCheck(CompletableFuture<?> ready) {
67+
try {
68+
ready.get();
69+
}
70+
catch (Exception e) {
71+
LOG.error(e, () -> "block readiness check failed with : " + e.getMessage());
72+
}
73+
}
74+
75+
}

spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-leader-election/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election/Assertions.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,35 +32,40 @@ private Assertions() {
3232

3333
}
3434

35-
static void assertAcquireAndRenew(CapturedOutput output, Supplier<Lease> leaseSupplier, String name) {
35+
/**
36+
* lease was acquired and we renewed it, at least once.
37+
*/
38+
static void assertAcquireAndRenew(CapturedOutput output, Supplier<Lease> leaseSupplier, String candidateIdentity) {
3639
// we have become the leader
37-
awaitUntil(60, 100, () -> output.getOut().contains(name + " is the new leader"));
40+
awaitUntil(60, 100, () -> output.getOut().contains(candidateIdentity + " is the new leader"));
3841

3942
// let's unwind some logs to see that the process is how we expect it to be
4043

4144
// 1. lease is used as the lock (comes from our code)
4245
assertThat(output.getOut()).contains("will use lease as the lock for leader election");
4346

4447
// 2. we start leader initiator for our hostname (comes from our code)
45-
assertThat(output.getOut()).contains("starting leader initiator : " + name);
48+
assertThat(output.getOut()).contains("starting leader initiator : " + candidateIdentity);
4649

4750
// 3. we try to acquire the lease (comes from fabric8 code)
4851
assertThat(output.getOut()).contains(
49-
"Attempting to acquire leader lease 'LeaseLock: default - spring-k8s-leader-election-lock (" + name + ")'");
52+
"Attempting to acquire leader lease 'LeaseLock: default - spring-k8s-leader-election-lock " +
53+
"(" + candidateIdentity + ")'");
5054

5155
// 4. lease has been acquired
5256
assertThat(output.getOut())
53-
.contains("Acquired lease 'LeaseLock: default - spring-k8s-leader-election-lock (" + name + ")'");
57+
.contains("Acquired lease 'LeaseLock: default - spring-k8s-leader-election-lock " +
58+
"(" + candidateIdentity + ")'");
5459

5560
// 5. we are the leader (comes from fabric8 code)
56-
assertThat(output.getOut()).contains("Leader changed from null to " + name);
61+
assertThat(output.getOut()).contains("Leader changed from null to " + candidateIdentity);
5762

5863
// 6. wait until a renewal happens (comes from fabric code)
5964
// this one means that we have extended our leadership
6065
awaitUntil(30, 500,
6166
() -> output.getOut()
6267
.contains("Attempting to renew leader lease 'LeaseLock: "
63-
+ "default - spring-k8s-leader-election-lock (" + name + ")'"));
68+
+ "default - spring-k8s-leader-election-lock (" + candidateIdentity + ")'"));
6469

6570
Lease lease = leaseSupplier.get();
6671

@@ -79,4 +84,5 @@ static void assertAcquireAndRenew(CapturedOutput output, Supplier<Lease> leaseSu
7984
});
8085
}
8186

87+
8288
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.kubernetes.fabric8.leader.election;
18+
19+
import org.junit.jupiter.api.BeforeAll;
20+
import org.junit.jupiter.api.Test;
21+
22+
import org.springframework.beans.factory.annotation.Autowired;
23+
import org.springframework.boot.test.system.CapturedOutput;
24+
import org.springframework.test.context.TestPropertySource;
25+
26+
import static org.springframework.cloud.kubernetes.fabric8.leader.election.Assertions.assertAcquireAndRenew;
27+
import static org.springframework.cloud.kubernetes.integration.tests.commons.Awaitilities.awaitUntil;
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
/**
31+
* <pre>
32+
* - we acquire the leadership
33+
* </pre>
34+
*
35+
* @author wind57
36+
*/
37+
@TestPropertySource(properties = { "spring.cloud.kubernetes.leader.election.wait-for-pod-ready=true",
38+
"spring.cloud.kubernetes.leader.election.restart-on-failure=true", "readiness.passes=true" })
39+
class Fabric8LeaderElectionCompletedExceptionallyAndRestartedIT extends AbstractLeaderElection {
40+
41+
private static final String NAME = "acquired-then-fails";
42+
43+
@Autowired
44+
private Fabric8LeaderElectionInitiator initiator;
45+
46+
@BeforeAll
47+
static void beforeAll() {
48+
AbstractLeaderElection.beforeAll(NAME);
49+
}
50+
51+
@Test
52+
void test(CapturedOutput output) {
53+
54+
assertAcquireAndRenew(output, this::getLease, NAME);
55+
56+
initiator.leaderFeature().completeExceptionally(new RuntimeException("we kill the leadership future"));
57+
58+
// from the callback
59+
awaitUntil(5, 50, () -> output.getOut().contains("id : acquired-then-fails stopped being a leader"));
60+
61+
awaitUntil(5, 50, () -> output.getOut().contains("leader failed with : we kill the leadership future"));
62+
63+
awaitUntil(5, 50, () -> output.getOut().contains("leader election failed for : acquired-then-fails"));
64+
65+
int afterLeaderFailure = output.getOut().indexOf("leader election failed for : acquired-then-fails");
66+
67+
afterLeaderFailure(afterLeaderFailure, output);
68+
}
69+
70+
private void afterLeaderFailure(int afterLeaderFailure, CapturedOutput output) {
71+
awaitUntil(60, 100, () -> output.getOut().substring(afterLeaderFailure).
72+
contains(NAME + " is the new leader"));
73+
74+
// lease has been re-acquired
75+
assertThat(output.getOut().substring(afterLeaderFailure))
76+
.contains("Acquired lease 'LeaseLock: default - spring-k8s-leader-election-lock " + "(" + NAME + ")'");
77+
78+
// renewal happens (comes from fabric code)
79+
// this one means that we have extended our leadership
80+
awaitUntil(30, 500,
81+
() -> output.getOut().substring(afterLeaderFailure)
82+
.contains("Attempting to renew leader lease 'LeaseLock: "
83+
+ "default - spring-k8s-leader-election-lock (" + NAME + ")'"));
84+
}
85+
86+
}

0 commit comments

Comments
 (0)