Skip to content

Commit b431f48

Browse files
committed
Properly pool clients used by watches
1 parent cc2ff1f commit b431f48

File tree

5 files changed

+56
-52
lines changed

5 files changed

+56
-52
lines changed

src/main/java/oracle/kubernetes/operator/Watcher.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,17 @@
33

44
package oracle.kubernetes.operator;
55

6-
import io.kubernetes.client.ApiClient;
76
import io.kubernetes.client.ApiException;
87
import io.kubernetes.client.models.V1ObjectMeta;
98
import io.kubernetes.client.models.V1Status;
109
import io.kubernetes.client.util.Watch;
1110
import oracle.kubernetes.operator.builders.WatchBuilder;
1211
import oracle.kubernetes.operator.builders.WatchI;
13-
import oracle.kubernetes.operator.helpers.ClientPool;
1412
import oracle.kubernetes.operator.logging.LoggingFacade;
1513
import oracle.kubernetes.operator.logging.LoggingFactory;
1614
import oracle.kubernetes.operator.logging.MessageKeys;
1715
import oracle.kubernetes.operator.watcher.WatchListener;
1816

19-
import java.io.IOException;
2017
import java.lang.reflect.Method;
2118
import java.util.concurrent.ThreadFactory;
2219
import java.util.concurrent.atomic.AtomicBoolean;
@@ -91,19 +88,13 @@ void start(ThreadFactory factory) {
9188
}
9289

9390
private void doWatch() {
94-
ClientPool helper = ClientPool.getInstance();
95-
ApiClient client = helper.take();
96-
try {
97-
setIsDraining(false);
91+
setIsDraining(false);
9892

99-
while (!isDraining()) {
100-
if (isStopping())
101-
setIsDraining(true);
102-
else
103-
watchForEvents(client);
104-
}
105-
} finally {
106-
helper.recycle(client);
93+
while (!isDraining()) {
94+
if (isStopping())
95+
setIsDraining(true);
96+
else
97+
watchForEvents();
10798
}
10899
}
109100

@@ -121,8 +112,8 @@ protected boolean isStopping() {
121112
return stopping.get();
122113
}
123114

124-
private void watchForEvents(ApiClient client) {
125-
try (WatchI<T> watch = initiateWatch(new WatchBuilder(client).withResourceVersion(resourceVersion))) {
115+
private void watchForEvents() {
116+
try (WatchI<T> watch = initiateWatch(new WatchBuilder().withResourceVersion(resourceVersion))) {
126117
while (watch.hasNext()) {
127118
Watch.Response<T> item = watch.next();
128119

src/main/java/oracle/kubernetes/operator/builders/WatchBuilder.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import io.kubernetes.client.models.V1beta1Ingress;
1616
import io.kubernetes.client.util.Watch;
1717
import oracle.kubernetes.operator.TuningParameters;
18+
import oracle.kubernetes.operator.helpers.ClientPool;
19+
import oracle.kubernetes.operator.helpers.Pool;
1820
import oracle.kubernetes.operator.work.ContainerResolver;
1921
import oracle.kubernetes.weblogic.domain.v1.Domain;
2022
import oracle.kubernetes.weblogic.domain.v1.api.WeblogicApi;
@@ -35,15 +37,13 @@ public class WatchBuilder {
3537

3638
private static WatchFactory FACTORY = new WatchFactoryImpl();
3739

38-
private ApiClient client;
3940
private CallParamsImpl callParams = new CallParamsImpl();
4041

4142
public interface WatchFactory {
42-
<T> WatchI<T> createWatch(ApiClient client, CallParams callParams, Class<?> responseBodyType, BiFunction<ApiClient, CallParams, Call> function) throws ApiException;
43+
<T> WatchI<T> createWatch(Pool<ApiClient> pool, CallParams callParams, Class<?> responseBodyType, BiFunction<ApiClient, CallParams, Call> function) throws ApiException;
4344
}
4445

45-
public WatchBuilder(ApiClient client) {
46-
this.client = client;
46+
public WatchBuilder() {
4747
TuningParameters tuning = ContainerResolver.getInstance().getContainer().getSPI(TuningParameters.class);
4848
if (tuning != null) {
4949
callParams.setTimeoutSeconds(tuning.getWatchTuning().watchLifetime);
@@ -76,7 +76,7 @@ public Type getOwnerType() {
7676
* @throws ApiException if there is an error on the call that sets up the web hook.
7777
*/
7878
public WatchI<V1Service> createServiceWatch(String namespace) throws ApiException {
79-
return FACTORY.createWatch(client, callParams, V1Service.class, new ListNamespacedServiceCall(namespace));
79+
return FACTORY.createWatch(ClientPool.getInstance(), callParams, V1Service.class, new ListNamespacedServiceCall(namespace));
8080
}
8181

8282
private class ListNamespacedServiceCall implements BiFunction<ApiClient, CallParams, Call> {
@@ -106,7 +106,7 @@ public Call apply(ApiClient client, CallParams callParams) {
106106
* @throws ApiException if there is an error on the call that sets up the web hook.
107107
*/
108108
public WatchI<V1Pod> createPodWatch(String namespace) throws ApiException {
109-
return FACTORY.createWatch(client, callParams, V1Pod.class, new ListPodCall(namespace));
109+
return FACTORY.createWatch(ClientPool.getInstance(), callParams, V1Pod.class, new ListPodCall(namespace));
110110
}
111111

112112
private class ListPodCall implements BiFunction<ApiClient, CallParams, Call> {
@@ -136,7 +136,7 @@ public Call apply(ApiClient client, CallParams callParams) {
136136
* @throws ApiException if there is an error on the call that sets up the web hook.
137137
*/
138138
public WatchI<V1beta1Ingress> createIngressWatch(String namespace) throws ApiException {
139-
return FACTORY.createWatch(client, callParams, V1beta1Ingress.class, new ListIngressCall(namespace));
139+
return FACTORY.createWatch(ClientPool.getInstance(), callParams, V1beta1Ingress.class, new ListIngressCall(namespace));
140140
}
141141

142142
private class ListIngressCall implements BiFunction<ApiClient, CallParams, Call> {
@@ -166,7 +166,7 @@ public Call apply(ApiClient client, CallParams callParams) {
166166
* @throws ApiException if there is an error on the call that sets up the web hook.
167167
*/
168168
public WatchI<Domain> createDomainWatch(String namespace) throws ApiException {
169-
return FACTORY.createWatch(client, callParams, Domain.class, new ListDomainsCall(namespace));
169+
return FACTORY.createWatch(ClientPool.getInstance(), callParams, Domain.class, new ListDomainsCall(namespace));
170170
}
171171

172172
private class ListDomainsCall implements BiFunction<ApiClient, CallParams, Call> {
@@ -196,7 +196,7 @@ public Call apply(ApiClient client, CallParams callParams) {
196196
* @throws ApiException if there is an error on the call that sets up the web hook.
197197
*/
198198
public WatchI<V1ConfigMap> createConfigMapWatch(String namespace) throws ApiException {
199-
return FACTORY.createWatch(client, callParams, V1ConfigMap.class, new ListNamespacedConfigMapCall(namespace));
199+
return FACTORY.createWatch(ClientPool.getInstance(), callParams, V1ConfigMap.class, new ListNamespacedConfigMapCall(namespace));
200200
}
201201

202202
private class ListNamespacedConfigMapCall implements BiFunction<ApiClient, CallParams, Call> {
@@ -280,9 +280,10 @@ public WatchBuilder withProgressRequestListener(ProgressRequestBody.ProgressRequ
280280

281281
static class WatchFactoryImpl implements WatchFactory {
282282
@Override
283-
public <T> WatchI<T> createWatch(ApiClient client, CallParams callParams, Class<?> responseBodyType, BiFunction<ApiClient, CallParams, Call> function) throws ApiException {
283+
public <T> WatchI<T> createWatch(Pool<ApiClient> pool, CallParams callParams, Class<?> responseBodyType, BiFunction<ApiClient, CallParams, Call> function) throws ApiException {
284+
ApiClient client = pool.take();
284285
try {
285-
return new WatchImpl<T>(Watch.createWatch(client, function.apply(client, callParams), getType(responseBodyType)));
286+
return new WatchImpl<T>(pool, client, Watch.createWatch(client, function.apply(client, callParams), getType(responseBodyType)));
286287
} catch (UncheckedApiException e) {
287288
throw e.getCause();
288289
}

src/main/java/oracle/kubernetes/operator/builders/WatchImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
package oracle.kubernetes.operator.builders;
55

6+
import io.kubernetes.client.ApiClient;
67
import io.kubernetes.client.util.Watch;
8+
import oracle.kubernetes.operator.helpers.Pool;
79

810
import java.io.IOException;
911
import java.util.Iterator;
@@ -13,15 +15,20 @@
1315
* interface.
1416
*/
1517
public class WatchImpl<T> implements WatchI<T> {
18+
private final Pool<ApiClient> pool;
19+
private final ApiClient client;
1620
private Watch<T> impl;
1721

18-
WatchImpl(Watch<T> impl) {
22+
WatchImpl(Pool<ApiClient> pool, ApiClient client, Watch<T> impl) {
23+
this.pool = pool;
24+
this.client = client;
1925
this.impl = impl;
2026
}
2127

2228
@Override
2329
public void close() throws IOException {
2430
impl.close();
31+
pool.recycle(client);
2532
}
2633

2734
@Override

src/test/java/oracle/kubernetes/operator/builders/StubWatchFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.kubernetes.client.ApiClient;
1111
import io.kubernetes.client.ApiException;
1212
import io.kubernetes.client.util.Watch;
13+
import oracle.kubernetes.operator.helpers.Pool;
1314

1415
import java.io.IOException;
1516
import java.util.ArrayList;
@@ -63,7 +64,7 @@ public static List<Map<String, String>> getRecordedParameters() {
6364

6465
@SuppressWarnings("unchecked")
6566
@Override
66-
public <T> WatchI<T> createWatch(ApiClient client, CallParams callParams, Class<?> responseBodyType, BiFunction<ApiClient, CallParams, Call> function) throws ApiException {
67+
public <T> WatchI<T> createWatch(Pool<ApiClient> pool, CallParams callParams, Class<?> responseBodyType, BiFunction<ApiClient, CallParams, Call> function) throws ApiException {
6768
getRecordedParameters().add(recordedParams(callParams));
6869

6970
if (exceptionOnNext == null)

src/test/java/oracle/kubernetes/operator/builders/WatchBuilderTest.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
import io.kubernetes.client.models.V1beta1Ingress;
1919
import oracle.kubernetes.TestUtils;
2020
import oracle.kubernetes.weblogic.domain.v1.Domain;
21-
import oracle.kubernetes.operator.helpers.ClientPool;
21+
import oracle.kubernetes.operator.helpers.Pool;
22+
2223
import org.junit.After;
2324
import org.junit.Before;
2425
import org.junit.Test;
@@ -57,19 +58,8 @@ public class WatchBuilderTest extends HttpUserAgentTest {
5758
private static List<AssertionError> validationErrors;
5859

5960
private int resourceVersion = INITIAL_RESOURCE_VERSION;
60-
private final static ApiClient client = createTestClient();
6161
private List<Memento> mementos = new ArrayList<>();
6262

63-
// create a client to manipulate during this test to avoid recycling it and breaking other tests
64-
private static ApiClient createTestClient() {
65-
Memento memento = TestUtils.silenceOperatorLogger();
66-
try {
67-
return ClientPool.getInstance().take();
68-
} finally {
69-
memento.revert();
70-
}
71-
}
72-
7363
@Before
7464
public void setUp() throws Exception {
7565
mementos.add(TestUtils.silenceOperatorLogger());
@@ -88,7 +78,7 @@ public void whenDomainWatchReceivesAddResponse_returnItFromIterator() throws Exc
8878
Domain domain = new Domain().withApiVersion(API_VERSION).withKind("Domain").withMetadata(createMetaData("domain1", NAMESPACE));
8979
defineHttpResponse(DOMAIN_RESOURCE, withResponses(createAddedResponse(domain)));
9080

91-
WatchI<Domain> domainWatch = new WatchBuilder(client).createDomainWatch(NAMESPACE);
81+
WatchI<Domain> domainWatch = new WatchBuilder().createDomainWatch(NAMESPACE);
9282

9383
assertThat(domainWatch, contains(addEvent(domain)));
9484
}
@@ -99,7 +89,7 @@ public void whenDomainWatchReceivesModifyAndDeleteResponses_returnBothFromIterat
9989
Domain domain2 = new Domain().withApiVersion(API_VERSION).withKind("Domain").withMetadata(createMetaData("domain2", NAMESPACE));
10090
defineHttpResponse(DOMAIN_RESOURCE, withResponses(createModifiedResponse(domain1), createDeletedResponse(domain2)));
10191

102-
WatchI<Domain> domainWatch = new WatchBuilder(client).createDomainWatch(NAMESPACE);
92+
WatchI<Domain> domainWatch = new WatchBuilder().createDomainWatch(NAMESPACE);
10393

10494
assertThat(domainWatch, contains(modifyEvent(domain1), deleteEvent(domain2)));
10595
}
@@ -108,7 +98,7 @@ public void whenDomainWatchReceivesModifyAndDeleteResponses_returnBothFromIterat
10898
public void whenDomainWatchReceivesErrorResponse_returnItFromIterator() throws Exception {
10999
defineHttpResponse(DOMAIN_RESOURCE, withResponses(createErrorResponse(HTTP_ENTITY_TOO_LARGE)));
110100

111-
WatchI<Domain> domainWatch = new WatchBuilder(client).createDomainWatch(NAMESPACE);
101+
WatchI<Domain> domainWatch = new WatchBuilder().createDomainWatch(NAMESPACE);
112102

113103
assertThat(domainWatch, contains(errorEvent(HTTP_ENTITY_TOO_LARGE)));
114104
}
@@ -124,7 +114,7 @@ public void whenServiceWatchSpecifiesParameters_verifyAndReturnResponse() throws
124114
+ "," + CREATEDBYOPERATOR_LABEL),
125115
parameter("watch").withValue("true")));
126116

127-
WatchI<V1Service> serviceWatch = new WatchBuilder(client)
117+
WatchI<V1Service> serviceWatch = new WatchBuilder()
128118
.withResourceVersion(startResourceVersion)
129119
.withLabelSelector(DOMAINUID_LABEL
130120
+ "," + CREATEDBYOPERATOR_LABEL)
@@ -142,7 +132,7 @@ public void whenPodWatchSpecifiesParameters_verifyAndReturnResponse() throws Exc
142132
parameter("includeUninitialized").withValue("false"),
143133
parameter("limit").withValue("25")));
144134

145-
WatchI<V1Pod> podWatch = new WatchBuilder(client)
135+
WatchI<V1Pod> podWatch = new WatchBuilder()
146136
.withFieldSelector("thisValue")
147137
.withIncludeUninitialized(false)
148138
.withLimit(25)
@@ -155,7 +145,7 @@ public void whenPodWatchSpecifiesParameters_verifyAndReturnResponse() throws Exc
155145
public void whenPodWatchFindsNoData_hasNextReturnsFalse() throws Exception {
156146
defineHttpResponse(POD_RESOURCE, NO_RESPONSES);
157147

158-
WatchI<V1Pod> podWatch = new WatchBuilder(client).createPodWatch(NAMESPACE);
148+
WatchI<V1Pod> podWatch = new WatchBuilder().createPodWatch(NAMESPACE);
159149

160150
assertThat(podWatch.hasNext(), is(false));
161151
}
@@ -169,7 +159,7 @@ public void whenIngressWatchSpecifiesParameters_verifyAndReturnResponse() throws
169159
parameter("timeoutSeconds").withValue("15"),
170160
parameter("limit").withValue("500")));
171161

172-
WatchI<V1beta1Ingress> ingressWatch = new WatchBuilder(client)
162+
WatchI<V1beta1Ingress> ingressWatch = new WatchBuilder()
173163
.withTimeoutSeconds(15)
174164
.withPrettyPrinting()
175165
.createIngressWatch(NAMESPACE);
@@ -292,9 +282,23 @@ private TestServerWatchFactory(String basePath) {
292282
}
293283

294284
@Override
295-
public <T> WatchI<T> createWatch(ApiClient client, CallParams callParams, Class<?> responseBodyType, BiFunction<ApiClient, CallParams, Call> function) throws ApiException {
296-
client.setBasePath(basePath);
297-
return super.createWatch(client, callParams, responseBodyType, function);
285+
public <T> WatchI<T> createWatch(Pool<ApiClient> pool, CallParams callParams, Class<?> responseBodyType, BiFunction<ApiClient, CallParams, Call> function) throws ApiException {
286+
Pool<ApiClient> testPool = new Pool<ApiClient>() {
287+
288+
@Override
289+
protected ApiClient create() {
290+
Memento memento = TestUtils.silenceOperatorLogger();
291+
try {
292+
ApiClient client = pool.take();
293+
client.setBasePath(basePath);
294+
return client;
295+
} finally {
296+
memento.revert();
297+
}
298+
}
299+
300+
};
301+
return super.createWatch(testPool, callParams, responseBodyType, function);
298302
}
299303
}
300304
}

0 commit comments

Comments
 (0)