Skip to content

Commit 9cb072f

Browse files
committed
Clean up watcher test intermittent failures
1 parent dbd436d commit 9cb072f

File tree

4 files changed

+69
-21
lines changed

4 files changed

+69
-21
lines changed

operator/src/main/java/oracle/kubernetes/operator/builders/WatchI.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33

44
package oracle.kubernetes.operator.builders;
55

6-
import io.kubernetes.client.util.Watch;
7-
86
import java.util.Iterator;
97

8+
import io.kubernetes.client.util.Watch;
9+
1010
/**
11-
* An interface that allows test-stubbing of the Kubernetes Watch class.
11+
* An iterator over watch responses from the server. These objects maintain resources,
12+
* which will be release when #close() is called.
13+
*
1214
* @param <T> the generic object type
1315
*/
1416
public interface WatchI<T>

operator/src/test/java/oracle/kubernetes/operator/PodWatcherTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@
33

44
package oracle.kubernetes.operator;
55

6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
10+
import com.google.common.collect.ImmutableMap;
11+
612
import io.kubernetes.client.models.V1ObjectMeta;
713
import io.kubernetes.client.models.V1Pod;
814
import io.kubernetes.client.models.V1PodCondition;
@@ -14,13 +20,6 @@
1420
import oracle.kubernetes.operator.work.Packet;
1521
import oracle.kubernetes.operator.work.Step;
1622

17-
import com.google.common.collect.ImmutableMap;
18-
19-
import java.util.Collections;
20-
import java.util.List;
21-
import java.util.concurrent.Executors;
22-
import java.util.concurrent.atomic.AtomicBoolean;
23-
2423
import org.hamcrest.Matchers;
2524
import org.junit.Test;
2625

@@ -74,7 +73,7 @@ protected <T> T createObjectWithMetaData(V1ObjectMeta metaData) {
7473

7574
@Override
7675
protected PodWatcher createWatcher(String nameSpace, AtomicBoolean stopping, int initialResourceVersion) {
77-
return PodWatcher.create(Executors.defaultThreadFactory(), nameSpace,
76+
return PodWatcher.create(this, nameSpace,
7877
Integer.toString(initialResourceVersion), this, stopping);
7978
}
8079

@@ -171,7 +170,7 @@ public void whenPodHasServerName_returnIt() throws Exception {
171170
@Test
172171
public void waitForReady_returnsAStep() throws Exception {
173172
AtomicBoolean stopping = new AtomicBoolean(true);
174-
PodWatcher watcher = PodWatcher.create(Executors.defaultThreadFactory(), "ns",
173+
PodWatcher watcher = PodWatcher.create(this, "ns",
175174
Integer.toString(INITIAL_RESOURCE_VERSION), this, stopping);
176175

177176
assertThat(watcher.waitForReady(pod, null), Matchers.instanceOf(Step.class));
@@ -180,7 +179,7 @@ public void waitForReady_returnsAStep() throws Exception {
180179
@Test
181180
public void WhenWaitForReadyAppliedToReadyPod_performNextStep() throws Exception {
182181
AtomicBoolean stopping = new AtomicBoolean(false);
183-
PodWatcher watcher = PodWatcher.create(Executors.defaultThreadFactory(), "ns",
182+
PodWatcher watcher = PodWatcher.create(this, "ns",
184183
Integer.toString(INITIAL_RESOURCE_VERSION), this, stopping);
185184

186185
makePodReady(pod);

operator/src/test/java/oracle/kubernetes/operator/WatcherTestBase.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import java.util.ArrayList;
77
import java.util.List;
8+
import java.util.concurrent.ThreadFactory;
89
import java.util.concurrent.atomic.AtomicBoolean;
910

1011
import com.meterware.simplestub.Memento;
@@ -17,7 +18,11 @@
1718

1819
import org.junit.After;
1920
import org.junit.Before;
21+
import org.junit.Rule;
2022
import org.junit.Test;
23+
import org.junit.rules.TestRule;
24+
import org.junit.rules.TestWatcher;
25+
import org.junit.runner.Description;
2126

2227
import static java.net.HttpURLConnection.HTTP_GONE;
2328
import static oracle.kubernetes.operator.builders.EventMatcher.addEvent;
@@ -30,7 +35,7 @@
3035
/**
3136
* Tests behavior of the Watcher class.
3237
*/
33-
public abstract class WatcherTestBase implements StubWatchFactory.AllWatchesClosedListener {
38+
public abstract class WatcherTestBase implements StubWatchFactory.AllWatchesClosedListener, ThreadFactory {
3439
private static final int NEXT_RESOURCE_VERSION = 123456;
3540
private static final int INITIAL_RESOURCE_VERSION = 123;
3641
private static final String NAMESPACE = "testspace";
@@ -47,6 +52,25 @@ private V1ObjectMeta createMetaData() {
4752

4853
private AtomicBoolean stopping = new AtomicBoolean(false);
4954

55+
private String testName;
56+
private List<Thread> threads = new ArrayList<>();
57+
58+
@Rule
59+
public TestRule watcher = new TestWatcher() {
60+
@Override
61+
protected void starting(Description description) {
62+
testName = description.getMethodName();
63+
}
64+
};
65+
66+
@Override
67+
public Thread newThread(Runnable r) {
68+
Thread thread = new Thread(r);
69+
threads.add(thread);
70+
thread.setName(String.format("Test thread %d for %s", threads.size(), testName));
71+
return thread;
72+
}
73+
5074
@Override
5175
public void allWatchesClosed() {
5276
stopping.set(true);
@@ -65,9 +89,18 @@ public void setUp() throws Exception {
6589

6690
@After
6791
public void tearDown() throws Exception {
92+
for (Thread thread : threads) shutDown(thread);
6893
for (Memento memento : mementos) memento.revert();
6994
}
7095

96+
private void shutDown(Thread thread) {
97+
try {
98+
thread.interrupt();
99+
thread.join();
100+
} catch (InterruptedException ignored) {
101+
}
102+
}
103+
71104
@SuppressWarnings("unchecked")
72105
void sendInitialRequest(int initialResourceVersion) {
73106
StubWatchFactory.addCallResponses(createAddResponse(createObjectWithMetaData()));

operator/src/test/java/oracle/kubernetes/operator/builders/StubWatchFactory.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.io.IOException;
77
import java.util.ArrayList;
88
import java.util.Arrays;
9+
import java.util.Collections;
910
import java.util.HashMap;
1011
import java.util.Iterator;
1112
import java.util.List;
@@ -22,6 +23,8 @@
2223
import io.kubernetes.client.util.Watch.Response;
2324
import oracle.kubernetes.operator.helpers.Pool;
2425

26+
import javax.annotation.Nonnull;
27+
2528
/**
2629
* A test-time replacement for the factory that creates Watch objects, allowing
2730
* tests to specify directly the events they want returned from the Watch.
@@ -69,15 +72,26 @@ public static List<Map<String, String>> getRecordedParameters() {
6972
public <T> WatchI<T> createWatch(Pool<ApiClient> pool, CallParams callParams, Class<?> responseBodyType, BiFunction<ApiClient, CallParams, Call> function) throws ApiException {
7073
getRecordedParameters().add(recordedParams(callParams));
7174

72-
if (exceptionOnNext == null)
73-
return new WatchStub<T>((List)calls.remove(0));
74-
else try {
75-
return new ExceptionThrowingWatchStub<T>(exceptionOnNext);
76-
} finally {
77-
exceptionOnNext = null;
75+
try {
76+
if (nothingToDo())
77+
return new WatchStub<>(Collections.emptyList());
78+
else if (exceptionOnNext == null)
79+
return new WatchStub<T>((List)calls.remove(0));
80+
else try {
81+
return new ExceptionThrowingWatchStub<>(exceptionOnNext);
82+
} finally {
83+
exceptionOnNext = null;
84+
}
85+
} catch (IndexOutOfBoundsException e) {
86+
System.out.println("Failed in thread " + Thread.currentThread());
87+
throw e;
7888
}
7989
}
8090

91+
public boolean nothingToDo() {
92+
return calls.isEmpty() && exceptionOnNext == null;
93+
}
94+
8195
private Map<String,String> recordedParams(CallParams callParams) {
8296
Map<String,String> result = new HashMap<>();
8397
if (callParams.getResourceVersion() != null)
@@ -116,7 +130,7 @@ public void close() throws IOException {
116130
}
117131

118132
@Override
119-
public Iterator<Watch.Response<T>> iterator() {
133+
public @Nonnull Iterator<Watch.Response<T>> iterator() {
120134
return responses.iterator();
121135
}
122136

0 commit comments

Comments
 (0)