Skip to content

Commit 42b5069

Browse files
authored
Merge pull request #1463 from himanshug/leader_shutdown
leaderelection sanity e2e integration tests
2 parents dd3a900 + 9e08762 commit 42b5069

File tree

4 files changed

+368
-16
lines changed

4 files changed

+368
-16
lines changed

e2e/pom.xml

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
<relativePath>../pom.xml</relativePath>
1515
</parent>
1616

17+
<properties>
18+
<spock.version>2.0-M4-groovy-3.0</spock.version>
19+
</properties>
1720
<dependencies>
1821
<dependency>
1922
<groupId>io.kubernetes</groupId>
@@ -29,7 +32,27 @@
2932
<dependency>
3033
<groupId>org.spockframework</groupId>
3134
<artifactId>spock-core</artifactId>
32-
<version>2.0-M4-groovy-3.0</version>
35+
<version>${spock.version}</version>
36+
<scope>test</scope>
37+
</dependency>
38+
<dependency>
39+
<groupId>junit</groupId>
40+
<artifactId>junit</artifactId>
41+
<scope>test</scope>
42+
</dependency>
43+
44+
<!-- Following two deps exist to make spock2 work with junit4 -->
45+
<!-- Ref: https://github.com/groovy/GMavenPlus/wiki/Examples#spock-2-and-junit -->
46+
<dependency>
47+
<groupId>org.junit.vintage</groupId>
48+
<artifactId>junit-vintage-engine</artifactId>
49+
<version>5.7.0</version>
50+
<scope>test</scope>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.spockframework</groupId>
54+
<artifactId>spock-junit4</artifactId>
55+
<version>${spock.version}</version>
3356
<scope>test</scope>
3457
</dependency>
3558
</dependencies>
@@ -51,19 +74,8 @@
5174
<plugin>
5275
<groupId>org.codehaus.gmavenplus</groupId>
5376
<artifactId>gmavenplus-plugin</artifactId>
54-
<configuration>
55-
<testSources>
56-
<testSource>
57-
<directory>${project.basedir}/src/test/groovy</directory>
58-
<includes>
59-
<include>**/*.groovy</include>
60-
</includes>
61-
</testSource>
62-
</testSources>
63-
</configuration>
6477
<executions>
6578
<execution>
66-
<!-- Without joint compilation - no dependencies between Java and Groovy (inheritance)-->
6779
<goals>
6880
<goal>compileTests</goal>
6981
</goals>
Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.e2e.extended.leaderelection;
14+
15+
import io.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
16+
import io.kubernetes.client.extended.leaderelection.LeaderElector;
17+
import io.kubernetes.client.extended.leaderelection.Lock;
18+
import io.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
19+
import io.kubernetes.client.extended.leaderelection.resourcelock.EndpointsLock;
20+
import io.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock;
21+
import io.kubernetes.client.openapi.ApiClient;
22+
import io.kubernetes.client.openapi.ApiException;
23+
import io.kubernetes.client.openapi.apis.CoordinationV1Api;
24+
import io.kubernetes.client.openapi.apis.CoreV1Api;
25+
import io.kubernetes.client.util.ClientBuilder;
26+
import java.io.IOException;
27+
import java.net.HttpURLConnection;
28+
import java.time.Duration;
29+
import java.util.ArrayList;
30+
import java.util.Collection;
31+
import java.util.List;
32+
import java.util.concurrent.BrokenBarrierException;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.CyclicBarrier;
35+
import java.util.concurrent.atomic.AtomicInteger;
36+
import java.util.concurrent.atomic.AtomicReference;
37+
import org.joda.time.format.*;
38+
import org.junit.Assert;
39+
import org.junit.Before;
40+
import org.junit.Test;
41+
import org.junit.runner.RunWith;
42+
import org.junit.runners.Parameterized;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
@RunWith(Parameterized.class)
47+
public class LeaderElectorTest {
48+
49+
private static final Logger LOGGER = LoggerFactory.getLogger(LeaderElectorTest.class);
50+
51+
private static final String LOCK_RESOURCE_NAME = "leader-election-it";
52+
private static final String NAMESPACE = "default";
53+
54+
private enum LockType {
55+
ConfigMap,
56+
Endpoints,
57+
Lease
58+
}
59+
60+
@Parameterized.Parameters(name = "{0}")
61+
public static Collection<Object[]> constructorFeeder() {
62+
final List<Object[]> args = new ArrayList<>();
63+
64+
args.add(new Object[] {LockType.ConfigMap});
65+
args.add(new Object[] {LockType.Endpoints});
66+
args.add(new Object[] {LockType.Lease});
67+
68+
return args;
69+
}
70+
71+
private final ApiClient apiClient;
72+
private final LockType lockType;
73+
74+
public LeaderElectorTest(LockType lockType) {
75+
try {
76+
apiClient = ClientBuilder.defaultClient();
77+
} catch (IOException ex) {
78+
throw new RuntimeException("Couldn't create ApiClient", ex);
79+
}
80+
this.lockType = lockType;
81+
82+
// Lease resource requires special care with DateTime
83+
if (lockType == LockType.Lease) {
84+
// TODO: switch date-time library so that micro-sec timestamp can be serialized in RFC3339
85+
// format w/ correct precision without the hacks
86+
87+
// This formatter is used for Lease resource spec's acquire/renewTime
88+
DateTimeFormatter isoWithFractionalMicroSecsFormatter =
89+
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'");
90+
91+
DateTimeFormatter formatter =
92+
new DateTimeFormatterBuilder()
93+
.append(
94+
isoWithFractionalMicroSecsFormatter.getPrinter(),
95+
new DateTimeParser[] {
96+
isoWithFractionalMicroSecsFormatter.getParser(),
97+
// need this one to parse "creationTimestamp" format e.g. "2020-12-30T09:29:13Z"
98+
// in Lease resource returned from server
99+
ISODateTimeFormat.dateOptionalTimeParser().getParser(),
100+
})
101+
.toFormatter();
102+
103+
apiClient.setDateTimeFormat(formatter);
104+
}
105+
}
106+
107+
@Before
108+
public void setup() throws Exception {
109+
// delete the lock resource if it exists, or else first leader candidate might need to wait for
110+
// a whole
111+
// leaseDuration configured
112+
switch (lockType) {
113+
case ConfigMap:
114+
deleteConfigMapLockResource();
115+
break;
116+
case Endpoints:
117+
deleteEndpointsLockResource();
118+
break;
119+
case Lease:
120+
deleteLeaseLockResource();
121+
break;
122+
default:
123+
throw new RuntimeException("Unknown LockType " + lockType);
124+
}
125+
}
126+
127+
@Test(timeout = 30000L)
128+
public void testSingleCandidateLeaderElection() throws Exception {
129+
CountDownLatch startLeadershipLatch = new CountDownLatch(1);
130+
CountDownLatch stopLeadershipLatch = new CountDownLatch(1);
131+
132+
LeaderElector leaderElector =
133+
makeAndRunLeaderElectorAsync(
134+
"candidate",
135+
null,
136+
() -> startLeadershipLatch.countDown(),
137+
() -> stopLeadershipLatch.countDown(),
138+
apiClient);
139+
140+
startLeadershipLatch.await();
141+
142+
leaderElector.close();
143+
144+
stopLeadershipLatch.await();
145+
}
146+
147+
@Test(timeout = 30000L)
148+
public void testMultiCandidateLeaderElection() throws Exception {
149+
CyclicBarrier startBarrier = new CyclicBarrier(2);
150+
151+
CountDownLatch startBeingLeader = new CountDownLatch(1);
152+
CountDownLatch stopBeingLeader = new CountDownLatch(1);
153+
154+
AtomicInteger startBeingLeaderCount = new AtomicInteger();
155+
AtomicInteger stopBeingLeaderCount = new AtomicInteger();
156+
157+
AtomicReference<String> leaderRef = new AtomicReference<>();
158+
159+
String candidate1 = "candidate1";
160+
String candidate2 = "candidate2";
161+
162+
LeaderElector leaderElector1 =
163+
makeAndRunLeaderElectorAsync(
164+
candidate1,
165+
startBarrier,
166+
() -> {
167+
startBeingLeaderCount.incrementAndGet();
168+
leaderRef.set(candidate1);
169+
startBeingLeader.countDown();
170+
},
171+
() -> {
172+
stopBeingLeaderCount.incrementAndGet();
173+
stopBeingLeader.countDown();
174+
},
175+
apiClient);
176+
177+
LeaderElector leaderElector2 =
178+
makeAndRunLeaderElectorAsync(
179+
candidate2,
180+
startBarrier,
181+
() -> {
182+
startBeingLeaderCount.incrementAndGet();
183+
leaderRef.set(candidate2);
184+
startBeingLeader.countDown();
185+
},
186+
() -> {
187+
stopBeingLeaderCount.incrementAndGet();
188+
stopBeingLeader.countDown();
189+
},
190+
apiClient);
191+
192+
// wait till someone becomes leader
193+
startBeingLeader.await();
194+
Assert.assertNotNull(leaderRef.get());
195+
Assert.assertTrue(candidate1.equals(leaderRef.get()) || candidate2.equals(leaderRef.get()));
196+
197+
// stop both LeaderElectors, in order .. non-leader, then leader so that non-leader doesn't get
198+
// to become leader
199+
if (candidate1.equals(leaderRef.get())) {
200+
leaderElector2.close();
201+
leaderElector1.close();
202+
} else {
203+
leaderElector1.close();
204+
leaderElector2.close();
205+
}
206+
207+
stopBeingLeader.await();
208+
209+
// make sure that only one candidate became leader
210+
Assert.assertEquals(1, startBeingLeaderCount.get());
211+
Assert.assertEquals(1, stopBeingLeaderCount.get());
212+
}
213+
214+
@Test(timeout = 30000L)
215+
public void testLeaderGracefulShutdown() throws Exception {
216+
CountDownLatch startBeingLeader1 = new CountDownLatch(1);
217+
CountDownLatch stopBeingLeader1 = new CountDownLatch(1);
218+
219+
LeaderElector leaderElector1 =
220+
makeAndRunLeaderElectorAsync(
221+
"candidate1",
222+
null,
223+
() -> startBeingLeader1.countDown(),
224+
() -> stopBeingLeader1.countDown(),
225+
apiClient);
226+
227+
// wait for candidate1 to become leader
228+
startBeingLeader1.await();
229+
230+
CountDownLatch startBeingLeader2 = new CountDownLatch(1);
231+
CountDownLatch stopBeingLeader2 = new CountDownLatch(1);
232+
233+
LeaderElector leaderElector2 =
234+
makeAndRunLeaderElectorAsync(
235+
"candidate2",
236+
null,
237+
() -> startBeingLeader2.countDown(),
238+
() -> stopBeingLeader2.countDown(),
239+
apiClient);
240+
241+
leaderElector1.close();
242+
243+
// ensure stopBeingLeader hook is called
244+
stopBeingLeader1.await();
245+
246+
// wait for candidate2 to become leader
247+
startBeingLeader2.await();
248+
249+
leaderElector2.close();
250+
}
251+
252+
private LeaderElector makeAndRunLeaderElectorAsync(
253+
String candidateId,
254+
CyclicBarrier startBarrier,
255+
Runnable startBeingLeader,
256+
Runnable stopBeingLeader,
257+
ApiClient apiClient) {
258+
259+
Lock lock = makeLock(candidateId, NAMESPACE, LOCK_RESOURCE_NAME, apiClient);
260+
261+
LeaderElectionConfig leaderElectionConfig =
262+
new LeaderElectionConfig(
263+
lock, Duration.ofSeconds(30), Duration.ofSeconds(23), Duration.ofSeconds(3));
264+
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
265+
266+
Thread thread =
267+
new Thread(
268+
() -> {
269+
if (startBarrier != null) {
270+
try {
271+
startBarrier.await();
272+
} catch (InterruptedException | BrokenBarrierException ex) {
273+
LOGGER.error("startBarrier.await() failed", ex);
274+
return;
275+
}
276+
}
277+
278+
leaderElector.run(startBeingLeader, stopBeingLeader);
279+
},
280+
String.format("%s-leader-elector-main", candidateId));
281+
thread.setDaemon(true);
282+
thread.start();
283+
284+
return leaderElector;
285+
}
286+
287+
private Lock makeLock(
288+
String candidateId, String namespace, String lockResourceName, ApiClient k8sApiClient) {
289+
290+
switch (lockType) {
291+
case ConfigMap:
292+
return new ConfigMapLock(namespace, lockResourceName, candidateId, k8sApiClient);
293+
case Endpoints:
294+
return new EndpointsLock(namespace, lockResourceName, candidateId, k8sApiClient);
295+
case Lease:
296+
return new LeaseLock(namespace, lockResourceName, candidateId, k8sApiClient);
297+
default:
298+
throw new RuntimeException("Unknown LockType " + lockType);
299+
}
300+
}
301+
302+
private void deleteConfigMapLockResource() throws Exception {
303+
try {
304+
CoreV1Api coreV1Api = new CoreV1Api(apiClient);
305+
coreV1Api.deleteNamespacedConfigMap(
306+
LOCK_RESOURCE_NAME, NAMESPACE, null, null, null, null, null, null);
307+
} catch (ApiException ex) {
308+
if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
309+
throw ex;
310+
}
311+
}
312+
}
313+
314+
private void deleteEndpointsLockResource() throws Exception {
315+
try {
316+
CoreV1Api coreV1Api = new CoreV1Api(apiClient);
317+
coreV1Api.deleteNamespacedEndpoints(
318+
LOCK_RESOURCE_NAME, NAMESPACE, null, null, null, null, null, null);
319+
} catch (ApiException ex) {
320+
if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
321+
throw ex;
322+
}
323+
}
324+
}
325+
326+
private void deleteLeaseLockResource() throws Exception {
327+
try {
328+
CoordinationV1Api coordinationV1Api = new CoordinationV1Api(apiClient);
329+
coordinationV1Api.deleteNamespacedLease(
330+
LOCK_RESOURCE_NAME, NAMESPACE, null, null, null, null, null, null);
331+
} catch (ApiException ex) {
332+
if (ex.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
333+
throw ex;
334+
}
335+
}
336+
}
337+
}

0 commit comments

Comments
 (0)