Skip to content

Commit d83d00a

Browse files
committed
add concurrent test
Signed-off-by: wind57 <[email protected]>
1 parent cc5d04a commit d83d00a

File tree

1 file changed

+251
-0
lines changed
  • spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-fabric8-leader-election/src/test/java/org/springframework/cloud/kubernetes/fabric8/leader/election

1 file changed

+251
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.kubernetes.fabric8.leader.election;
18+
19+
import java.time.Duration;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.function.Consumer;
22+
23+
import io.fabric8.kubernetes.client.Config;
24+
import io.fabric8.kubernetes.client.KubernetesClient;
25+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
26+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
27+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder;
28+
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock;
29+
import org.junit.jupiter.api.AfterAll;
30+
import org.junit.jupiter.api.AfterEach;
31+
import org.junit.jupiter.api.BeforeAll;
32+
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.api.extension.ExtendWith;
34+
import org.testcontainers.k3s.K3sContainer;
35+
36+
import org.springframework.boot.test.system.CapturedOutput;
37+
import org.springframework.boot.test.system.OutputCaptureExtension;
38+
import org.springframework.cloud.kubernetes.commons.leader.election.LeaderElectionProperties;
39+
import org.springframework.cloud.kubernetes.integration.tests.commons.Commons;
40+
41+
import static org.springframework.cloud.kubernetes.integration.tests.commons.Awaitilities.awaitUntil;
42+
43+
/**
44+
* @author wind57
45+
*/
46+
@ExtendWith(OutputCaptureExtension.class)
47+
class Fabric8LeaderElectionConcurrentIT {
48+
49+
private static final String LEASE_NAME = "lease-lock";
50+
51+
private static final LeaderElectionProperties PROPERTIES = new LeaderElectionProperties(false, false,
52+
Duration.ofSeconds(15), "default", LEASE_NAME, Duration.ofSeconds(5), Duration.ofSeconds(2),
53+
Duration.ofSeconds(5), false, true);
54+
55+
private static final String CANDIDATE_IDENTITY_ONE = "one";
56+
57+
private static final String CANDIDATE_IDENTITY_TWO = "two";
58+
59+
private Fabric8LeaderElectionInitiator one;
60+
61+
private Fabric8LeaderElectionInitiator two;
62+
63+
private static KubernetesClient kubernetesClient;
64+
65+
@BeforeAll
66+
static void beforeAll() {
67+
68+
K3sContainer container = Commons.container();
69+
container.start();
70+
71+
String kubeConfigYaml = container.getKubeConfigYaml();
72+
Config config = Config.fromKubeconfig(kubeConfigYaml);
73+
kubernetesClient = new KubernetesClientBuilder().withConfig(config).build();
74+
75+
}
76+
77+
@AfterAll
78+
static void afterAll() {
79+
kubernetesClient.leases()
80+
.inNamespace("default")
81+
.withName(LEASE_NAME)
82+
.withTimeout(10, TimeUnit.SECONDS)
83+
.delete();
84+
}
85+
86+
@AfterEach
87+
void afterEach() {
88+
one.preDestroy();
89+
two.preDestroy();
90+
91+
kubernetesClient.leases()
92+
.inNamespace("default")
93+
.withName(LEASE_NAME)
94+
.withTimeout(10, TimeUnit.SECONDS)
95+
.delete();
96+
}
97+
98+
@Test
99+
void test(CapturedOutput output) {
100+
101+
LeaderElectionConfig leaderElectionConfigOne = leaderElectionConfig(CANDIDATE_IDENTITY_ONE);
102+
one = new Fabric8LeaderElectionInitiator(CANDIDATE_IDENTITY_ONE, "default", kubernetesClient,
103+
leaderElectionConfigOne, PROPERTIES, () -> true);
104+
105+
LeaderElectionConfig leaderElectionConfigTwo = leaderElectionConfig(CANDIDATE_IDENTITY_TWO);
106+
two = new Fabric8LeaderElectionInitiator(CANDIDATE_IDENTITY_TWO, "default", kubernetesClient,
107+
leaderElectionConfigTwo, PROPERTIES, () -> true);
108+
109+
one.postConstruct();
110+
two.postConstruct();
111+
112+
// both try to acquire the lock
113+
awaitUntil(3, 100, () -> output.getOut()
114+
.contains("Attempting to acquire leader lease 'LeaseLock: default - lease-lock (two)'..."));
115+
awaitUntil(3, 100, () -> output.getOut()
116+
.contains("Attempting to acquire leader lease 'LeaseLock: default - lease-lock (one)'..."));
117+
awaitUntil(3, 100, () -> output.getOut().contains("Leader changed from null to "));
118+
119+
LeaderAndFollower leaderAndFollower = leaderAndFollower(leaderElectionConfigOne, kubernetesClient);
120+
String leader = leaderAndFollower.leader();
121+
String follower = leaderAndFollower.follower();
122+
123+
awaitUntil(3, 100, () -> output.getOut().contains("Leader changed from null to " + leader));
124+
awaitUntil(3, 100, () -> output.getOut().contains("id : " + leader + " is the new leader"));
125+
awaitUntil(3, 100, () -> output.getOut()
126+
.contains("Successfully Acquired leader lease 'LeaseLock: " + "default - lease-lock (" + leader + ")'"));
127+
128+
// renewal happens for the current leader
129+
awaitUntil(3, 100, () -> output.getOut()
130+
.contains("Attempting to renew leader lease 'LeaseLock: " + "default - lease-lock (" + leader + ")'..."));
131+
awaitUntil(3, 100,
132+
() -> output.getOut().contains("Acquired lease 'LeaseLock: default - lease-lock (" + leader + ")'"));
133+
134+
// the other elector says it can't acquire the lock
135+
awaitUntil(10, 100, () -> output.getOut().contains("Lock is held by " + leader + " and has not yet expired"));
136+
awaitUntil(3, 100, () -> output.getOut()
137+
.contains("Failed to acquire lease 'LeaseLock: " + "default - lease-lock (" + follower + ")' retrying..."));
138+
139+
int beforeRelease = output.getOut().length();
140+
failLeaderRenewal(leader, one, two);
141+
142+
/*
143+
* we simulated above that renewal failed and leader future was canceled. In this
144+
* case, the 'notLeader' picks up the leadership, the 'leader' is now a
145+
* "follower", it re-tries to take leadership.
146+
*/
147+
awaitUntil(10, 100,
148+
() -> output.getOut().substring(beforeRelease).contains("id : " + follower + " is the new leader"));
149+
awaitUntil(3, 100, () -> output.getOut()
150+
.substring(beforeRelease)
151+
.contains("Attempting to renew leader lease 'LeaseLock: " + "default - lease-lock (" + follower + ")'..."));
152+
awaitUntil(3, 100,
153+
() -> output.getOut()
154+
.substring(beforeRelease)
155+
.contains("Acquired lease 'LeaseLock: default - lease-lock (" + follower + ")'"));
156+
157+
// proves that the canceled leader tries to acquire again the leadership
158+
awaitUntil(3, 100, () -> output.getOut()
159+
.substring(beforeRelease)
160+
.contains("Attempting to acquire leader lease 'LeaseLock: default - lease-lock (" + leader + ")'..."));
161+
awaitUntil(3, 100,
162+
() -> output.getOut()
163+
.substring(beforeRelease)
164+
.contains("Lock is held by " + follower + " and has not yet expired"));
165+
166+
/*
167+
* we simulate the renewal failure one more time. we know that leader = 'follower'
168+
*/
169+
int failAgain = output.getOut().length();
170+
failLeaderRenewal(follower, one, two);
171+
172+
awaitUntil(10, 100,
173+
() -> output.getOut().substring(failAgain).contains("id : " + leader + " is the new leader"));
174+
awaitUntil(3, 100, () -> output.getOut()
175+
.substring(failAgain)
176+
.contains("Attempting to renew leader lease 'LeaseLock: " + "default - lease-lock (" + leader + ")'..."));
177+
awaitUntil(3, 100,
178+
() -> output.getOut()
179+
.substring(failAgain)
180+
.contains("Acquired lease 'LeaseLock: default - lease-lock (" + leader + ")'"));
181+
182+
// proves that the canceled leader tries to acquire again the leadership
183+
awaitUntil(10, 100, () -> output.getOut()
184+
.substring(failAgain)
185+
.contains("Attempting to acquire leader lease 'LeaseLock: default - lease-lock (" + follower + ")'..."));
186+
awaitUntil(3, 100,
187+
() -> output.getOut()
188+
.substring(failAgain)
189+
.contains("Lock is held by " + leader + " and has not yet expired"));
190+
191+
}
192+
193+
private LeaderElectionConfig leaderElectionConfig(String holderIdentity) {
194+
195+
LeaseLock lock = leaseLock(holderIdentity);
196+
Fabric8LeaderElectionCallbacks callbacks = callbacks(holderIdentity);
197+
198+
return new LeaderElectionConfigBuilder().withReleaseOnCancel()
199+
.withName("leader-election-config")
200+
.withLeaseDuration(PROPERTIES.leaseDuration())
201+
.withLock(lock)
202+
.withRenewDeadline(PROPERTIES.renewDeadline())
203+
.withRetryPeriod(PROPERTIES.retryPeriod())
204+
.withLeaderCallbacks(callbacks)
205+
.build();
206+
}
207+
208+
private LeaseLock leaseLock(String holderIdentity) {
209+
return new LeaseLock("default", LEASE_NAME, holderIdentity);
210+
}
211+
212+
private Fabric8LeaderElectionCallbacks callbacks(String holderIdentity) {
213+
Fabric8LeaderElectionCallbacksAutoConfiguration configuration = new Fabric8LeaderElectionCallbacksAutoConfiguration();
214+
215+
Runnable onStartLeadingCallback = configuration.onStartLeadingCallback(null, holderIdentity, PROPERTIES);
216+
Runnable onStopLeadingCallback = configuration.onStopLeadingCallback(null, holderIdentity, PROPERTIES);
217+
Consumer<String> onNewLeaderCallback = configuration.onNewLeaderCallback(null, PROPERTIES);
218+
219+
return new Fabric8LeaderElectionCallbacks(onStartLeadingCallback, onStopLeadingCallback, onNewLeaderCallback);
220+
}
221+
222+
private LeaderAndFollower leaderAndFollower(LeaderElectionConfig leaderElectionConfig,
223+
KubernetesClient kubernetesClient) {
224+
boolean oneIsLeader = leaderElectionConfig.getLock()
225+
.get(kubernetesClient)
226+
.getHolderIdentity()
227+
.equals(CANDIDATE_IDENTITY_ONE);
228+
229+
if (oneIsLeader) {
230+
return new LeaderAndFollower("one", "two");
231+
}
232+
else {
233+
return new LeaderAndFollower("two", "one");
234+
}
235+
}
236+
237+
private void failLeaderRenewal(String currentLeader, Fabric8LeaderElectionInitiator one,
238+
Fabric8LeaderElectionInitiator two) {
239+
if (currentLeader.equals("one")) {
240+
one.leaderFeature().completeExceptionally(new RuntimeException("just because"));
241+
}
242+
else {
243+
two.leaderFeature().completeExceptionally(new RuntimeException("just because"));
244+
}
245+
}
246+
247+
private record LeaderAndFollower(String leader, String follower) {
248+
249+
}
250+
251+
}

0 commit comments

Comments
 (0)