Skip to content

Commit b000770

Browse files
authored
Merge pull request #740 from oracle/bad_client_pooling
Clean up fix for failed clients being returned to the pool
2 parents 416f623 + ca04ea3 commit b000770

File tree

7 files changed

+134
-85
lines changed

7 files changed

+134
-85
lines changed

operator/src/main/java/oracle/kubernetes/operator/Watcher.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2017, 2018 Oracle Corporation and/or its affiliates. All rights reserved.
1+
// Copyright 2017, 2019 Oracle Corporation and/or its affiliates. All rights reserved.
22
// Licensed under the Universal Permissive License v 1.0 as shown at
33
// http://oss.oracle.com/licenses/upl.
44

@@ -126,13 +126,7 @@ private void watchForEvents() {
126126
.withResourceVersion(resourceVersion.toString())
127127
.withTimeoutSeconds(tuning.watchLifetime))) {
128128
while (watch.hasNext()) {
129-
Watch.Response<T> item;
130-
try {
131-
item = watch.next();
132-
} catch (Throwable e) {
133-
watch.discardClient();
134-
throw e;
135-
}
129+
Watch.Response<T> item = watch.next();
136130

137131
if (isStopping()) setIsDraining(true);
138132
if (isDraining()) continue;

operator/src/main/java/oracle/kubernetes/operator/builders/WatchI.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2018 Oracle Corporation and/or its affiliates. All rights reserved.
1+
// Copyright 2018, 2019 Oracle Corporation and/or its affiliates. All rights reserved.
22
// Licensed under the Universal Permissive License v 1.0 as shown at
33
// http://oss.oracle.com/licenses/upl.
44

@@ -14,6 +14,4 @@
1414
* @param <T> the generic object type
1515
*/
1616
public interface WatchI<T>
17-
extends Iterable<Watch.Response<T>>, Iterator<Watch.Response<T>>, java.io.Closeable {
18-
default void discardClient() {}
19-
}
17+
extends Iterable<Watch.Response<T>>, Iterator<Watch.Response<T>>, java.io.Closeable {}

operator/src/main/java/oracle/kubernetes/operator/builders/WatchImpl.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2018 Oracle Corporation and/or its affiliates. All rights reserved.
1+
// Copyright 2018, 2019 Oracle Corporation and/or its affiliates. All rights reserved.
22
// Licensed under the Universal Permissive License v 1.0 as shown at
33
// http://oss.oracle.com/licenses/upl.
44

@@ -28,12 +28,7 @@ public class WatchImpl<T> implements WatchI<T> {
2828
@Override
2929
public void close() throws IOException {
3030
impl.close();
31-
pool.recycle(client);
32-
}
33-
34-
@Override
35-
public void discardClient() {
36-
client = pool.take();
31+
if (client != null) pool.recycle(client);
3732
}
3833

3934
@Override
@@ -49,6 +44,11 @@ public boolean hasNext() {
4944

5045
@Override
5146
public Watch.Response<T> next() {
52-
return impl.next();
47+
try {
48+
return impl.next();
49+
} catch (Exception e) {
50+
client = null;
51+
throw e;
52+
}
5353
}
5454
}

operator/src/main/java/oracle/kubernetes/operator/helpers/ClientPool.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2017, Oracle Corporation and/or its affiliates. All rights reserved.
1+
// Copyright 2017, 2019 Oracle Corporation and/or its affiliates. All rights reserved.
22
// Licensed under the Universal Permissive License v 1.0 as shown at
33
// http://oss.oracle.com/licenses/upl.
44

@@ -17,7 +17,7 @@
1717

1818
public class ClientPool extends Pool<ApiClient> {
1919
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
20-
private static final ClientPool SINGLETON = new ClientPool();
20+
private static ClientPool SINGLETON = new ClientPool();
2121

2222
private static final ClientFactory FACTORY = new DefaultClientFactory();
2323

@@ -27,8 +27,6 @@ public static ClientPool getInstance() {
2727

2828
private final AtomicBoolean isFirst = new AtomicBoolean(true);
2929

30-
private ClientPool() {}
31-
3230
@Override
3331
protected ApiClient create() {
3432
return getApiClient();

operator/src/main/java/oracle/kubernetes/operator/helpers/Pool.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
// Copyright 2017, 2018, Oracle Corporation and/or its affiliates. All rights reserved.
1+
// Copyright 2017, 2019 Oracle Corporation and/or its affiliates. All rights reserved.
22
// Licensed under the Universal Permissive License v 1.0 as shown at
33
// http://oss.oracle.com/licenses/upl.
44

55
package oracle.kubernetes.operator.helpers;
66

7-
import java.lang.ref.WeakReference;
7+
import java.util.Queue;
88
import java.util.concurrent.ConcurrentLinkedQueue;
99
import oracle.kubernetes.operator.logging.LoggingFacade;
1010
import oracle.kubernetes.operator.logging.LoggingFactory;
@@ -14,7 +14,7 @@ public abstract class Pool<T> {
1414
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
1515

1616
// volatile since multiple threads may access queue reference
17-
private volatile WeakReference<ConcurrentLinkedQueue<T>> queue;
17+
private volatile Queue<T> queue = new ConcurrentLinkedQueue<>();
1818

1919
/**
2020
* Gets a new object from the pool. If no object is available in the pool, this method creates a
@@ -36,20 +36,8 @@ public final T take() {
3636
return instance;
3737
}
3838

39-
private ConcurrentLinkedQueue<T> getQueue() {
40-
WeakReference<ConcurrentLinkedQueue<T>> referenceQueue = queue;
41-
if (referenceQueue != null) {
42-
ConcurrentLinkedQueue<T> returnQueue = referenceQueue.get();
43-
if (returnQueue != null) {
44-
return returnQueue;
45-
}
46-
}
47-
48-
// overwrite the queue
49-
ConcurrentLinkedQueue<T> d = new ConcurrentLinkedQueue<>();
50-
queue = new WeakReference<>(d);
51-
52-
return d;
39+
protected Queue<T> getQueue() {
40+
return queue;
5341
}
5442

5543
/**
@@ -72,9 +60,4 @@ public final void recycle(T instance) {
7260
* @return Created instance
7361
*/
7462
protected abstract T create();
75-
76-
/** Drains pool of all entries; useful for unit-testing */
77-
public void drain() {
78-
getQueue().clear();
79-
}
8063
}

operator/src/test/java/oracle/kubernetes/operator/builders/WatchBuilderTest.java

Lines changed: 69 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2018, Oracle Corporation and/or its affiliates. All rights reserved.
1+
// Copyright 2018, 2019 Oracle Corporation and/or its affiliates. All rights reserved.
22
// Licensed under the Universal Permissive License v 1.0 as shown at
33
// http://oss.oracle.com/licenses/upl.
44

@@ -8,30 +8,38 @@
88
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
99
import static oracle.kubernetes.operator.LabelConstants.CREATEDBYOPERATOR_LABEL;
1010
import static oracle.kubernetes.operator.LabelConstants.DOMAINUID_LABEL;
11-
import static oracle.kubernetes.operator.builders.EventMatcher.*;
11+
import static oracle.kubernetes.operator.builders.EventMatcher.addEvent;
12+
import static oracle.kubernetes.operator.builders.EventMatcher.deleteEvent;
13+
import static oracle.kubernetes.operator.builders.EventMatcher.errorEvent;
14+
import static oracle.kubernetes.operator.builders.EventMatcher.modifyEvent;
1215
import static oracle.kubernetes.operator.builders.WatchBuilderTest.JsonServletAction.withResponses;
1316
import static oracle.kubernetes.operator.builders.WatchBuilderTest.ParameterValidation.parameter;
1417
import static org.hamcrest.MatcherAssert.assertThat;
15-
import static org.hamcrest.Matchers.*;
18+
import static org.hamcrest.Matchers.contains;
19+
import static org.hamcrest.Matchers.empty;
20+
import static org.hamcrest.Matchers.equalTo;
21+
import static org.hamcrest.Matchers.is;
22+
import static org.hamcrest.Matchers.not;
23+
import static org.junit.Assert.fail;
1624

1725
import com.meterware.pseudoserver.HttpUserAgentTest;
1826
import com.meterware.pseudoserver.PseudoServlet;
1927
import com.meterware.pseudoserver.WebResource;
2028
import com.meterware.simplestub.Memento;
2129
import com.meterware.simplestub.StaticStubSupport;
22-
import com.squareup.okhttp.Call;
2330
import io.kubernetes.client.ApiClient;
24-
import io.kubernetes.client.ApiException;
2531
import io.kubernetes.client.models.V1ObjectMeta;
2632
import io.kubernetes.client.models.V1Pod;
2733
import io.kubernetes.client.models.V1Service;
28-
import java.io.IOException;
34+
import java.util.ArrayDeque;
2935
import java.util.ArrayList;
3036
import java.util.Arrays;
37+
import java.util.Collection;
38+
import java.util.Collections;
3139
import java.util.List;
32-
import java.util.function.BiFunction;
40+
import java.util.Queue;
3341
import oracle.kubernetes.TestUtils;
34-
import oracle.kubernetes.operator.helpers.Pool;
42+
import oracle.kubernetes.operator.helpers.ClientPool;
3543
import oracle.kubernetes.weblogic.domain.v2.Domain;
3644
import org.junit.After;
3745
import org.junit.Before;
@@ -61,12 +69,12 @@ public class WatchBuilderTest extends HttpUserAgentTest {
6169
@Before
6270
public void setUp() throws Exception {
6371
mementos.add(TestUtils.silenceOperatorLogger());
64-
mementos.add(TestServerWatchFactory.install(getHostPath()));
72+
mementos.add(ClientPoolStub.install(getHostPath()));
6573
validationErrors = new ArrayList<>();
6674
}
6775

6876
@After
69-
public void tearDown() throws Exception {
77+
public void tearDown() {
7078
for (Memento memento : mementos) memento.revert();
7179
if (!validationErrors.isEmpty()) throw validationErrors.get(0);
7280
}
@@ -85,7 +93,35 @@ public void whenDomainWatchReceivesAddResponse_returnItFromIterator() throws Exc
8593
assertThat(domainWatch, contains(addEvent(domain)));
8694
}
8795

88-
@SuppressWarnings("unchecked")
96+
@Test
97+
public void afterWatchClosed_returnClientToPool() throws Exception {
98+
Domain domain =
99+
new Domain()
100+
.withApiVersion(API_VERSION)
101+
.withKind("Domain")
102+
.withMetadata(createMetaData("domain1", NAMESPACE));
103+
defineHttpResponse(DOMAIN_RESOURCE, withResponses(createAddedResponse(domain)));
104+
105+
try (WatchI<Domain> domainWatch = new WatchBuilder().createDomainWatch(NAMESPACE)) {
106+
domainWatch.next();
107+
}
108+
109+
assertThat(ClientPoolStub.getPooledClients(), not(empty()));
110+
}
111+
112+
@Test
113+
public void afterWatchError_closeDoesNotReturnClientToPool() throws Exception {
114+
defineHttpResponse(DOMAIN_RESOURCE, withResponses());
115+
116+
try (WatchI<Domain> domainWatch = new WatchBuilder().createDomainWatch(NAMESPACE)) {
117+
domainWatch.next();
118+
fail("Should have thrown an exception");
119+
} catch (Throwable ignore) {
120+
}
121+
122+
assertThat(ClientPoolStub.getPooledClients(), is(empty()));
123+
}
124+
89125
@Test
90126
public void whenDomainWatchReceivesModifyAndDeleteResponses_returnBothFromIterator()
91127
throws Exception {
@@ -239,7 +275,7 @@ private JsonServlet(JsonServletAction... actions) {
239275
}
240276

241277
@Override
242-
public WebResource getGetResponse() throws IOException {
278+
public WebResource getGetResponse() {
243279
if (requestNum >= actions.size())
244280
return new WebResource("Unexpected Request #" + requestNum, HTTP_UNAVAILABLE);
245281

@@ -251,6 +287,7 @@ public WebResource getGetResponse() throws IOException {
251287
}
252288
}
253289

290+
@SuppressWarnings("SameParameterValue")
254291
private V1ObjectMeta createMetaData(String name, String namespace) {
255292
return new V1ObjectMeta()
256293
.name(name)
@@ -274,45 +311,38 @@ private <T> String createDeletedResponse(T object) {
274311
return WatchEvent.createDeleteEvent(object).toJson();
275312
}
276313

314+
@SuppressWarnings("SameParameterValue")
277315
private String createErrorResponse(int statusCode) {
278316
return WatchEvent.createErrorEvent(statusCode).toJson();
279317
}
280318

281-
static class TestServerWatchFactory extends WatchBuilder.WatchFactoryImpl {
319+
static class ClientPoolStub extends ClientPool {
320+
private String basePath;
321+
private static Queue<ApiClient> queue;
322+
282323
static Memento install(String basePath) throws NoSuchFieldException {
283-
return StaticStubSupport.install(
284-
WatchBuilder.class, "FACTORY", new TestServerWatchFactory(basePath));
324+
queue = new ArrayDeque<>();
325+
return StaticStubSupport.install(ClientPool.class, "SINGLETON", new ClientPoolStub(basePath));
285326
}
286327

287-
private String basePath;
328+
static Collection<ApiClient> getPooledClients() {
329+
return Collections.unmodifiableCollection(queue);
330+
}
288331

289-
private TestServerWatchFactory(String basePath) {
332+
ClientPoolStub(String basePath) {
290333
this.basePath = basePath;
291334
}
292335

293336
@Override
294-
public <T> WatchI<T> createWatch(
295-
Pool<ApiClient> pool,
296-
CallParams callParams,
297-
Class<?> responseBodyType,
298-
BiFunction<ApiClient, CallParams, Call> function)
299-
throws ApiException {
300-
Pool<ApiClient> testPool =
301-
new Pool<ApiClient>() {
302-
303-
@Override
304-
protected ApiClient create() {
305-
Memento memento = TestUtils.silenceOperatorLogger();
306-
try {
307-
ApiClient client = pool.take();
308-
client.setBasePath(basePath);
309-
return client;
310-
} finally {
311-
memento.revert();
312-
}
313-
}
314-
};
315-
return super.createWatch(testPool, callParams, responseBodyType, function);
337+
protected ApiClient create() {
338+
ApiClient apiClient = super.create();
339+
apiClient.setBasePath(basePath);
340+
return apiClient;
341+
}
342+
343+
@Override
344+
protected Queue<ApiClient> getQueue() {
345+
return queue;
316346
}
317347
}
318348
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2019 Oracle Corporation and/or its affiliates. All rights reserved.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at
3+
// http://oss.oracle.com/licenses/upl.
4+
5+
package oracle.kubernetes.operator.helpers;
6+
7+
import static org.hamcrest.Matchers.instanceOf;
8+
import static org.hamcrest.Matchers.sameInstance;
9+
import static org.junit.Assert.*;
10+
11+
import com.meterware.simplestub.Memento;
12+
import io.kubernetes.client.ApiClient;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import oracle.kubernetes.TestUtils;
16+
import org.junit.After;
17+
import org.junit.Before;
18+
import org.junit.Test;
19+
20+
public class ClientPoolTest {
21+
22+
private List<Memento> mementos = new ArrayList<>();
23+
24+
@Before
25+
public void setUp() {
26+
mementos.add(TestUtils.silenceOperatorLogger());
27+
}
28+
29+
@After
30+
public void tearDown() {
31+
for (Memento memento : mementos) memento.revert();
32+
}
33+
34+
@Test
35+
public void onTake_returnApiClient() {
36+
assertThat(ClientPool.getInstance().take(), instanceOf(ApiClient.class));
37+
}
38+
39+
@Test
40+
public void afterRecycle_takeReturnsSameClient() {
41+
ApiClient apiClient = ClientPool.getInstance().take();
42+
ClientPool.getInstance().recycle(apiClient);
43+
44+
assertThat(ClientPool.getInstance().take(), sameInstance(apiClient));
45+
}
46+
}

0 commit comments

Comments
 (0)