Skip to content

Commit 89dfda3

Browse files
committed
Prepare for unit testing
1 parent 2a7dbb4 commit 89dfda3

18 files changed

+511
-371
lines changed

operator/src/main/java/oracle/kubernetes/operator/Main.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,14 @@ public class Main {
105105
private static final TuningParameters tuningAndConfig;
106106
static {
107107
try {
108-
tuningAndConfig = TuningParameters.initializeInstance(factory, "/operator/config");
108+
TuningParameters.initializeInstance(factory, "/operator/config");
109+
tuningAndConfig = TuningParameters.getInstance();
109110
} catch (IOException e) {
110111
LOGGER.warning(MessageKeys.EXCEPTION, e);
111112
throw new RuntimeException(e);
112113
}
113114
}
114-
static final CallBuilderFactory callBuilderFactory = new CallBuilderFactory(tuningAndConfig);
115+
static final CallBuilderFactory callBuilderFactory = new CallBuilderFactory();
115116

116117
private static final Container container = new Container();
117118
private static final ScheduledExecutorService wrappedExecutorService = Engine.wrappedExecutorService("operator",

operator/src/main/java/oracle/kubernetes/operator/TuningParameters.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99

1010
public interface TuningParameters extends Map<String, String> {
1111

12-
public static TuningParameters initializeInstance(
12+
static TuningParameters initializeInstance(
1313
ThreadFactory factory, String mountPoint) throws IOException {
1414
return TuningParametersImpl.initializeInstance(factory, mountPoint);
1515
}
16+
17+
public static TuningParameters getInstance() {
18+
return TuningParametersImpl.getInstance();
19+
}
1620

1721
public static class MainTuning {
1822
public final int statusUpdateTimeoutSeconds;

operator/src/main/java/oracle/kubernetes/operator/TuningParametersImpl.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,44 +3,46 @@
33

44
package oracle.kubernetes.operator;
55

6-
import java.io.IOException;
7-
import java.util.concurrent.ThreadFactory;
8-
import java.util.concurrent.locks.ReadWriteLock;
9-
import java.util.concurrent.locks.ReentrantReadWriteLock;
10-
116
import oracle.kubernetes.operator.helpers.ConfigMapConsumer;
127
import oracle.kubernetes.operator.logging.LoggingFacade;
138
import oracle.kubernetes.operator.logging.LoggingFactory;
149
import oracle.kubernetes.operator.logging.MessageKeys;
1510

11+
import java.io.IOException;
12+
import java.util.concurrent.ThreadFactory;
13+
import java.util.concurrent.locks.ReadWriteLock;
14+
import java.util.concurrent.locks.ReentrantReadWriteLock;
15+
1616
public class TuningParametersImpl extends ConfigMapConsumer implements TuningParameters {
1717
private static final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
18-
private static TuningParametersImpl INSTANCE = null;
18+
private static TuningParameters INSTANCE = null;
1919

2020
private final ReadWriteLock lock = new ReentrantReadWriteLock();
2121
private MainTuning main = null;
2222
private CallBuilderTuning callBuilder = null;
2323
private WatchTuning watch = null;
2424
private PodTuning pod = null;
2525

26-
public synchronized static TuningParameters initializeInstance(
27-
ThreadFactory factory, String mountPoint) throws IOException {
26+
synchronized static TuningParameters initializeInstance(
27+
ThreadFactory factory, String mountPoint) throws IOException {
2828
if (INSTANCE == null) {
2929
INSTANCE = new TuningParametersImpl(factory, mountPoint);
3030
return INSTANCE;
3131
}
3232
throw new IllegalStateException();
3333
}
34+
35+
public synchronized static TuningParameters getInstance() {
36+
return INSTANCE;
37+
}
3438

3539
private TuningParametersImpl(ThreadFactory factory, String mountPoint) throws IOException {
36-
super(factory, mountPoint, () -> {
37-
updateTuningParameters();
38-
});
40+
super(factory, mountPoint, TuningParametersImpl::updateTuningParameters);
3941
update();
4042
}
4143

42-
public static void updateTuningParameters() {
43-
INSTANCE.update();
44+
private static void updateTuningParameters() {
45+
((TuningParametersImpl) INSTANCE).update();
4446
}
4547

4648
private void update() {
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
// Copyright 2018, Oracle Corporation and/or its affiliates. All rights reserved.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
3+
4+
package oracle.kubernetes.operator.calls;
5+
6+
import io.kubernetes.client.ApiCallback;
7+
import io.kubernetes.client.ApiClient;
8+
import io.kubernetes.client.ApiException;
9+
import io.kubernetes.client.models.V1ListMeta;
10+
import oracle.kubernetes.operator.helpers.CallBuilder;
11+
import oracle.kubernetes.operator.helpers.ClientPool;
12+
import oracle.kubernetes.operator.helpers.ResponseStep;
13+
import oracle.kubernetes.operator.logging.LoggingFacade;
14+
import oracle.kubernetes.operator.logging.LoggingFactory;
15+
import oracle.kubernetes.operator.logging.MessageKeys;
16+
import oracle.kubernetes.operator.work.Component;
17+
import oracle.kubernetes.operator.work.NextAction;
18+
import oracle.kubernetes.operator.work.Packet;
19+
import oracle.kubernetes.operator.work.Step;
20+
21+
import java.lang.reflect.InvocationTargetException;
22+
import java.lang.reflect.Method;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Random;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
29+
/**
30+
* A Step driven by an asynchronous call to the Kubernetes API, which results in a series of callbacks until canceled.
31+
*/
32+
public class AsyncRequestStep<T> extends Step {
33+
public static final String RESPONSE_COMPONENT_NAME = "response";
34+
private static final Random R = new Random();
35+
private static final int HIGH = 200;
36+
private static final int LOW = 10;
37+
private static final int SCALE = 100;
38+
private static final int MAX = 10000;
39+
40+
private final LoggingFacade LOGGER = LoggingFactory.getLogger("Operator", "Operator");
41+
private final ClientPool helper;
42+
private final RequestParams requestParams;
43+
private final CallFactory<T> factory;
44+
private int timeoutSeconds;
45+
private final int maxRetryCount;
46+
private final String fieldSelector;
47+
private final String labelSelector;
48+
private final String resourceVersion;
49+
50+
public AsyncRequestStep(ResponseStep<T> next, RequestParams requestParams, CallFactory<T> factory, ClientPool helper, int timeoutSeconds, int maxRetryCount, String fieldSelector, String labelSelector, String resourceVersion) {
51+
super(next);
52+
this.helper = helper;
53+
this.requestParams = requestParams;
54+
this.factory = factory;
55+
this.timeoutSeconds = timeoutSeconds;
56+
this.maxRetryCount = maxRetryCount;
57+
this.fieldSelector = fieldSelector;
58+
this.labelSelector = labelSelector;
59+
this.resourceVersion = resourceVersion;
60+
next.setPrevious(this);
61+
}
62+
63+
@Override
64+
public NextAction apply(Packet packet) {
65+
// clear out earlier results
66+
String cont = null;
67+
RetryStrategy retry = null;
68+
Component oldResponse = packet.getComponents().remove(RESPONSE_COMPONENT_NAME);
69+
if (oldResponse != null) {
70+
@SuppressWarnings("unchecked")
71+
CallResponse<T> old = oldResponse.getSPI(CallResponse.class);
72+
if (old != null && old.result != null) {
73+
// called again, access continue value, if available
74+
cont = accessContinue(old.result);
75+
}
76+
77+
retry = oldResponse.getSPI(RetryStrategy.class);
78+
}
79+
String _continue = (cont != null) ? cont : "";
80+
if (retry == null) {
81+
retry = new DefaultRetryStrategy();
82+
retry.setRetryStep(this);
83+
}
84+
RetryStrategy _retry = retry;
85+
86+
LOGGER.fine(MessageKeys.ASYNC_REQUEST, requestParams.call, requestParams.namespace, requestParams.name, requestParams.body, fieldSelector, labelSelector, resourceVersion);
87+
88+
AtomicBoolean didResume = new AtomicBoolean(false);
89+
AtomicBoolean didRecycle = new AtomicBoolean(false);
90+
ApiClient client = helper.take();
91+
return doSuspend((fiber) -> {
92+
ApiCallback<T> callback = new BaseApiCallback<T>() {
93+
@Override
94+
public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
95+
if (didResume.compareAndSet(false, true)) {
96+
if (statusCode != CallBuilder.NOT_FOUND) {
97+
LOGGER.info(MessageKeys.ASYNC_FAILURE, e, statusCode, responseHeaders, requestParams.call, requestParams.namespace, requestParams.name, requestParams.body, fieldSelector, labelSelector, resourceVersion);
98+
}
99+
100+
if (didRecycle.compareAndSet(false, true)) {
101+
helper.recycle(client);
102+
}
103+
packet.getComponents().put(RESPONSE_COMPONENT_NAME, Component.createFor(RetryStrategy.class, _retry, new CallResponse<Void>(null, e, statusCode, responseHeaders)));
104+
fiber.resume(packet);
105+
}
106+
}
107+
108+
@Override
109+
public void onSuccess(T result, int statusCode, Map<String, List<String>> responseHeaders) {
110+
if (didResume.compareAndSet(false, true)) {
111+
LOGGER.fine(MessageKeys.ASYNC_SUCCESS, result, statusCode, responseHeaders);
112+
113+
if (didRecycle.compareAndSet(false, true)) {
114+
helper.recycle(client);
115+
}
116+
packet.getComponents().put(RESPONSE_COMPONENT_NAME, Component.createFor(new CallResponse<>(result, null, statusCode, responseHeaders)));
117+
fiber.resume(packet);
118+
}
119+
}
120+
};
121+
122+
try {
123+
CancelableCall c = factory.generate(requestParams, client, _continue, callback);
124+
125+
// timeout handling
126+
fiber.owner.getExecutor().schedule(() -> {
127+
if (didRecycle.compareAndSet(false, true)) {
128+
// don't recycle on timeout because state is unknown
129+
// usage.recycle();
130+
}
131+
if (didResume.compareAndSet(false, true)) {
132+
try {
133+
c.cancel();
134+
} finally {
135+
LOGGER.info(MessageKeys.ASYNC_TIMEOUT, requestParams.call, requestParams.namespace, requestParams.name, requestParams.body, fieldSelector, labelSelector, resourceVersion);
136+
packet.getComponents().put(RESPONSE_COMPONENT_NAME, Component.createFor(RetryStrategy.class, _retry));
137+
fiber.resume(packet);
138+
}
139+
}
140+
}, timeoutSeconds, TimeUnit.SECONDS);
141+
} catch (Throwable t) {
142+
LOGGER.warning(MessageKeys.ASYNC_FAILURE, t, 0, null, requestParams, requestParams.namespace, requestParams.name, requestParams.body, fieldSelector, labelSelector, resourceVersion);
143+
if (didRecycle.compareAndSet(false, true)) {
144+
// don't recycle on throwable because state is unknown
145+
// usage.recycle();
146+
}
147+
if (didResume.compareAndSet(false, true)) {
148+
packet.getComponents().put(RESPONSE_COMPONENT_NAME, Component.createFor(RetryStrategy.class, _retry));
149+
fiber.resume(packet);
150+
}
151+
}
152+
});
153+
}
154+
155+
private static String accessContinue(Object result) {
156+
String cont = "";
157+
if (result != null) {
158+
try {
159+
Method m = result.getClass().getMethod("getMetadata");
160+
Object meta = m.invoke(result);
161+
if (meta instanceof V1ListMeta) {
162+
return ((V1ListMeta) meta).getContinue();
163+
}
164+
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
165+
// no-op, no-log
166+
}
167+
}
168+
return cont;
169+
}
170+
171+
private final class DefaultRetryStrategy implements RetryStrategy {
172+
private long retryCount = 0;
173+
private Step retryStep = null;
174+
175+
@Override
176+
public void setRetryStep(Step retryStep) {
177+
this.retryStep = retryStep;
178+
}
179+
180+
@Override
181+
public NextAction doPotentialRetry(Step conflictStep, Packet packet, ApiException e, int statusCode,
182+
Map<String, List<String>> responseHeaders) {
183+
// Check statusCode, many statuses should not be retried
184+
// https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#http-status-codes
185+
if (statusCode == 0 /* simple timeout */ ||
186+
statusCode == 429 /* StatusTooManyRequests */ ||
187+
statusCode == 500 /* StatusInternalServerError */ ||
188+
statusCode == 503 /* StatusServiceUnavailable */ ||
189+
statusCode == 504 /* StatusServerTimeout */) {
190+
191+
// exponential back-off
192+
long waitTime = Math.min((2 << ++retryCount) * SCALE, MAX) + (R.nextInt(HIGH - LOW) + LOW);
193+
194+
if (statusCode == 0 || statusCode == 504 /* StatusServerTimeout */) {
195+
// increase server timeout
196+
timeoutSeconds *= 2;
197+
}
198+
199+
NextAction na = new NextAction();
200+
if (statusCode == 0 && retryCount <= maxRetryCount) {
201+
na.invoke(retryStep, packet);
202+
} else {
203+
LOGGER.info(MessageKeys.ASYNC_RETRY, String.valueOf(waitTime));
204+
na.delay(retryStep, packet, waitTime, TimeUnit.MILLISECONDS);
205+
}
206+
return na;
207+
} else if (statusCode == 409 /* Conflict */ && conflictStep != null) {
208+
// Conflict is an optimistic locking failure. Therefore, we can't
209+
// simply retry the request. Instead, application code needs to rebuild
210+
// the request based on latest contents. If provided, a conflict step will do that.
211+
212+
// exponential back-off
213+
long waitTime = Math.min((2 << ++retryCount) * SCALE, MAX) + (R.nextInt(HIGH - LOW) + LOW);
214+
215+
LOGGER.info(MessageKeys.ASYNC_RETRY, String.valueOf(waitTime));
216+
NextAction na = new NextAction();
217+
na.delay(conflictStep, packet, waitTime, TimeUnit.MILLISECONDS);
218+
return na;
219+
}
220+
221+
// otherwise, we will not retry
222+
return null;
223+
}
224+
225+
@Override
226+
public void reset() {
227+
retryCount = 0;
228+
}
229+
}
230+
231+
private static abstract class BaseApiCallback<T> implements ApiCallback<T> {
232+
@Override
233+
public void onDownloadProgress(long bytesRead, long contentLength, boolean done) {
234+
// no-op
235+
}
236+
237+
@Override
238+
public void onUploadProgress(long bytesWritten, long contentLength, boolean done) {
239+
// no-op
240+
}
241+
}
242+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright 2017,2018, Oracle Corporation and/or its affiliates. All rights reserved.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
3+
4+
package oracle.kubernetes.operator.calls;
5+
6+
import io.kubernetes.client.ApiCallback;
7+
import io.kubernetes.client.ApiClient;
8+
import io.kubernetes.client.ApiException;
9+
10+
@FunctionalInterface
11+
public interface CallFactory<T> {
12+
CancelableCall generate(RequestParams requestParams, ApiClient client, String cont, ApiCallback<T> callback) throws ApiException;
13+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2018, Oracle Corporation and/or its affiliates. All rights reserved.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
3+
4+
package oracle.kubernetes.operator.calls;
5+
6+
import io.kubernetes.client.ApiException;
7+
8+
import java.util.List;
9+
import java.util.Map;
10+
11+
public final class CallResponse<T> {
12+
public final T result;
13+
public final ApiException e;
14+
public final int statusCode;
15+
public final Map<String, List<String>> responseHeaders;
16+
17+
CallResponse(T result, ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
18+
this.result = result;
19+
this.e = e;
20+
this.statusCode = statusCode;
21+
this.responseHeaders = responseHeaders;
22+
}
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2018, Oracle Corporation and/or its affiliates. All rights reserved.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at http://oss.oracle.com/licenses/upl.
3+
4+
package oracle.kubernetes.operator.calls;
5+
6+
import com.squareup.okhttp.Call;
7+
8+
/**
9+
* A wrapper for an OKHttp call to isolate its own callers.
10+
*/
11+
public class CallWrapper implements CancelableCall {
12+
13+
private Call underlyingCall;
14+
15+
public CallWrapper(Call underlyingCall) {
16+
this.underlyingCall = underlyingCall;
17+
}
18+
19+
@Override
20+
public void cancel() {
21+
underlyingCall.cancel();
22+
}
23+
}

0 commit comments

Comments
 (0)