Skip to content

Commit dfa920e

Browse files
shawkinsmanusa
authored andcommitted
fix #4884: using different threads for watch events
1 parent 7c923a5 commit dfa920e

File tree

12 files changed

+86
-21
lines changed

12 files changed

+86
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
#### _**Note**_: Breaking changes
5151
* Fix #4708: The signature of the Interceptor methods changed to pass the full HttpRequest, rather than just the headers, and explicitly pass request tags - in particular the RequestConfig. To simplify authentication concerns the following fields have been removed from RequestConfig: username, password, oauthToken, and oauthTokenProvider. Not all HttpClient implementation support setting the connectionTimeout at a request level, thus it was removed from the RequestConfig as well.
5252
* Fix #4659: The SupportTestingClient interface has been deprecated. Please use one of the supports methods or getApiGroup to determine what is available on the api server.
53+
* Fix #4802: to ease developer burden, and potentially bad behavior with the vertx client, the callbacks for Watcher and ResourceEventHandler will no longer be made by an HttpClient thread, but rather from the thread pool associated with the KubernetesClient. Please ensure that if you are customizing the Executor supplied to the client that it has sufficient threads to handle these callbacks.
5354
* Fix #4825: removed or deprecated/moved methods that are unrelated to the rolling timeout from ImageEditReplacePatchable. Deprecated rollout methods for timeout and edit - future versions will not support
5455
* Fix #4826: removed RequestConfig upload connection and rolling timeouts. Both were no longer used with no plans to re-introduce their usage.
5556
* Fix #4861: several breaking changes related to resourceVersion handling and the replace operation:

doc/FAQ.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@ That will align all of the transitive Fabric8 Kubernetes Client dependencies to
5151

5252
There has been a lot of changes under the covers with thread utilization in the Fabric8 client over the 5.x and 6.x releases. So the exact details of what threads are created / used where will depend on the particular release version.
5353

54-
At the core the thread utilization will depend upon the http client implementation. Per client OkHttp maintains a pool of threads for task execution. It will dedicate 2 threads out of that pool per WebSocket connection. If you have a lot of WebSocket usage (Informer or Watches) with OkHttp, you can expect to see a large number of threads in use - which can potentially exhaust the OkHttp defaults.
54+
At the core the thread utilization will depend upon the HTTP client implementation. Per client OkHttp maintains a pool of threads for task execution. It will dedicate 2 threads out of that pool per WebSocket connection. If you have a lot of WebSocket usage (Informer or Watches) with OkHttp, you can expect to see a large number of threads in use.
5555

56-
With the JDK http client it will only maintain a selector thread and a small worker pool which will be based upon your available processors per client. It does not matter how many Informers or Watches you run, the same worker pool is shared.
56+
With the JDK, Jetty, or Vert.x http clients they will maintain a smaller worker pool for all tasks that is typically sized by default based upon your available processors, and typically a selector / coordinator thread(s). It does not matter how many Informers or Watches you run, the same threads are shared.
5757

58-
> **Note:** It is recommended with either HTTP client implementation that logic you supply via Watchers, ExecListeners, ResourceEventHandlers, Predicates, etc. does not execute long running tasks.
58+
To ease developer burden the callbacks on Watchers and ResourceEventHandlers will not be done directly by an http client thread, but the order of calls will be guaranteed. This will make the logic you include in those callbacks tolerant to some blocking without compromising the on-going work at the HTTP client level. However you should avoid truly long running operations as this will cause further events to queue and may eventually case memory issues.
5959

60-
For non-ResourceEventHandlers call backs long-running operation can be a problem. When using the OkHttp client and default settings holding a IO thread inhibits websocket processing that can timeout the ping and may prevent additional requests since the okhttp client defaults to only 5 concurrent requests per host. When using the JDK http client the long running task will inhibit the use of that IO thread for ALL http processing. Note that calling other KubernetesClient operations, especially those with waits, can be long-running. We are working towards providing non-blocking mode for many of these operations, but until that is available consider using a separate task queue for such work.
60+
> **Note:** It is recommended with any HTTP client implementation that logic you supply via Watchers, ExecListeners, ResourceEventHandlers, Predicates, Interceptors, LeaderCallbacks, etc. does not execute long running tasks.
6161
62-
On top of the http client threads the Fabric8 client maintains a task thread pool for scheduled tasks and for potentially long-running tasks that are called from WebSocket operations, such as handling input and output streams and ResourceEventHandler call backs. This thread pool defaults to an unlimited number of cached threads, which will be shutdown when the client is closed - that is a sensible default with either http client as the amount of concurrently running async tasks will typically be low. If you would rather take full control over the threading use KubernetesClientBuilder.withExecutor or KubernetesClientBuilder.withExecutorSupplier - however note that constraining this thread pool too much will result in a build up of event processing queues.
62+
On top of the http client threads the Fabric8 client maintains a task thread pool for scheduled tasks and for tasks that are called from WebSocket operations, such as handling input and output streams and ResourceEventHandler call backs. This thread pool defaults to an unlimited number of cached threads, which will be shutdown when the client is closed - that is a sensible default with as the amount of concurrently running async tasks will typically be low. If you would rather take full control over the threading use KubernetesClientBuilder.withExecutor or KubernetesClientBuilder.withExecutorSupplier - however note that constraining this thread pool too much will result in a build up of event processing queues.
6363

6464
Finally the fabric8 client will use 1 thread per PortForward and an additional thread per socket connection - this may be improved upon in the future.
6565

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/Config.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public class Config {
134134

135135
public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc";
136136
public static final Long DEFAULT_SCALE_TIMEOUT = 10 * 60 * 1000L;
137+
public static final int DEFAULT_REQUEST_TIMEOUT = 10 * 1000;
137138
public static final int DEFAULT_LOGGING_INTERVAL = 20 * 1000;
138139
public static final Long DEFAULT_WEBSOCKET_TIMEOUT = 5 * 1000L;
139140
public static final Long DEFAULT_WEBSOCKET_PING_INTERVAL = 30 * 1000L;
@@ -193,7 +194,7 @@ public class Config {
193194
private int uploadRequestTimeout = DEFAULT_UPLOAD_REQUEST_TIMEOUT;
194195
private int requestRetryBackoffLimit;
195196
private int requestRetryBackoffInterval;
196-
private int requestTimeout = 10 * 1000;
197+
private int requestTimeout = DEFAULT_REQUEST_TIMEOUT;
197198
private long scaleTimeout = DEFAULT_SCALE_TIMEOUT;
198199
private int loggingInterval = DEFAULT_LOGGING_INTERVAL;
199200
private long websocketTimeout = DEFAULT_WEBSOCKET_TIMEOUT;

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/RequestConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static io.fabric8.kubernetes.client.Config.DEFAULT_LOGGING_INTERVAL;
2727
import static io.fabric8.kubernetes.client.Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
2828
import static io.fabric8.kubernetes.client.Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
29+
import static io.fabric8.kubernetes.client.Config.DEFAULT_REQUEST_TIMEOUT;
2930
import static io.fabric8.kubernetes.client.Config.DEFAULT_SCALE_TIMEOUT;
3031
import static io.fabric8.kubernetes.client.Config.DEFAULT_UPLOAD_REQUEST_TIMEOUT;
3132
import static io.fabric8.kubernetes.client.Config.DEFAULT_WEBSOCKET_TIMEOUT;
@@ -42,7 +43,7 @@ public class RequestConfig {
4243
private int uploadRequestTimeout = DEFAULT_UPLOAD_REQUEST_TIMEOUT;
4344
private int requestRetryBackoffLimit = DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
4445
private int requestRetryBackoffInterval = DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
45-
private int requestTimeout = 10 * 1000;
46+
private int requestTimeout = DEFAULT_REQUEST_TIMEOUT;
4647
private long scaleTimeout = DEFAULT_SCALE_TIMEOUT;
4748
private int loggingInterval = DEFAULT_LOGGING_INTERVAL;
4849
private long websocketTimeout = DEFAULT_WEBSOCKET_TIMEOUT;

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/Watcher.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ public interface Watcher<T> {
2121

2222
/**
2323
* If the Watcher can reconnect itself after an error
24-
*
24+
* <p>
25+
* Currently only used to indicate if the Watch should ignore the watch reconnect limit
26+
*
2527
* @return
2628
*/
2729
default boolean reconnecting() {
@@ -31,8 +33,7 @@ default boolean reconnecting() {
3133
/**
3234
* Handle the given event.
3335
* <p>
34-
* Should not be implemented with long-running logic as this method is called directly from
35-
* an IO thread.
36+
* Should not be implemented with long-running logic as that may lead to memory issues.
3637
*/
3738
void eventReceived(Action action, T resource);
3839

@@ -46,9 +47,8 @@ default void onClose() {
4647
/**
4748
* Invoked when the watcher closes due to an Exception.
4849
* <p>
49-
* Should not be implemented with long-running logic as this method is called directly from
50-
* an IO thread.
51-
*
50+
* Should not be implemented with long-running logic as that may lead to memory issues.
51+
*
5252
* @param cause What caused the watcher to be closed.
5353
*/
5454
void onClose(WatcherException cause);

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/ResourceEventHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,17 @@ public interface ResourceEventHandler<T> {
2828

2929
/**
3030
* Called after an empty list is retrieved on start or after an HTTP GONE when the {@link Store} is empty
31+
* <p>
32+
* Should not be implemented with long-running logic as that may lead to memory issues.
3133
*/
3234
default void onNothing() {
3335

3436
}
3537

3638
/**
3739
* Called when an object is added.
40+
* <p>
41+
* Should not be implemented with long-running logic as that may lead to memory issues.
3842
*
3943
* @param obj object
4044
*/
@@ -46,6 +50,8 @@ default void onNothing() {
4650
* were combined together, so you can't use this to see every single
4751
* change. It is also called when a sync happens - oldObj will be
4852
* the same as newObj.
53+
* <p>
54+
* Should not be implemented with long-running logic as that may lead to memory issues.
4955
*
5056
* @param oldObj old object
5157
* @param newObj new object
@@ -57,6 +63,8 @@ default void onNothing() {
5763
* it would get an object of the DeletedFinalStateUnknown. This can
5864
* happen if the watch is closed and misses the delete event and
5965
* we don't notice the deletion until the subsequent re-list.
66+
* <p>
67+
* Should not be implemented with long-running logic as that may lead to memory issues.
6068
*
6169
* @param obj object to delete
6270
* @param deletedFinalStateUnknown get final state of item if it is known or not.

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ default boolean hasSynced() {
121121
* The resource version observed when last synced with the underlying store.
122122
* The value returned is not synchronized with access to the underlying store
123123
* and is not thread-safe.
124+
* <p>
125+
* Since the store processes events asynchronously this value should not be
126+
* used as an indication of the last resourceVersion seen. Also after an
127+
* informer is stopped any pending event processing may not happen.
124128
*
125129
* @return string value or null if never synced
126130
*/

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator;
3333
import io.fabric8.kubernetes.client.utils.Serialization;
3434
import io.fabric8.kubernetes.client.utils.Utils;
35+
import io.fabric8.kubernetes.client.utils.internal.SerialExecutor;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

@@ -50,6 +51,42 @@
5051

5152
public abstract class AbstractWatchManager<T extends HasMetadata> implements Watch {
5253

54+
private static final class SerialWatcher<T> implements Watcher<T> {
55+
private final Watcher<T> watcher;
56+
SerialExecutor serialExecutor;
57+
58+
private SerialWatcher(Watcher<T> watcher, SerialExecutor serialExecutor) {
59+
this.watcher = watcher;
60+
this.serialExecutor = serialExecutor;
61+
}
62+
63+
@Override
64+
public void eventReceived(Action action, T resource) {
65+
serialExecutor.execute(() -> watcher.eventReceived(action, resource));
66+
}
67+
68+
@Override
69+
public void onClose(WatcherException cause) {
70+
serialExecutor.execute(() -> {
71+
watcher.onClose(cause);
72+
serialExecutor.shutdownNow();
73+
});
74+
}
75+
76+
@Override
77+
public void onClose() {
78+
serialExecutor.execute(() -> {
79+
watcher.onClose();
80+
serialExecutor.shutdownNow();
81+
});
82+
}
83+
84+
@Override
85+
public boolean reconnecting() {
86+
return watcher.reconnecting();
87+
}
88+
}
89+
5390
public static class WatchRequestState {
5491

5592
private final AtomicBoolean reconnected = new AtomicBoolean();
@@ -79,7 +116,8 @@ public static class WatchRequestState {
79116
AbstractWatchManager(
80117
Watcher<T> watcher, BaseOperation<T, ?, ?> baseOperation, ListOptions listOptions, int reconnectLimit,
81118
int reconnectInterval, Supplier<HttpClient> clientSupplier) throws MalformedURLException {
82-
this.watcher = watcher;
119+
// prevent the callbacks from happening in the httpclient thread
120+
this.watcher = new SerialWatcher<>(watcher, new SerialExecutor(baseOperation.getOperationContext().getExecutor()));
83121
this.reconnectLimit = reconnectLimit;
84122
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, reconnectLimit);
85123
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/SerialExecutor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,16 @@ protected synchronized void scheduleNext() {
7777
}
7878
}
7979

80+
/**
81+
* Shutdown the executor without executing any more tasks.
82+
* <p>
83+
* The the current task will be interrupting if it is not the thread that initiated the shutdown.
84+
*/
8085
public void shutdownNow() {
8186
this.shutdown = true;
8287
tasks.clear();
8388
synchronized (threadLock) {
84-
if (thread != null) {
89+
if (thread != null && thread != Thread.currentThread()) {
8590
thread.interrupt();
8691
}
8792
}

kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,19 @@ public void onClose() {
239239
}
240240
}
241241

242+
static BaseOperation mockOperation() {
243+
BaseOperation operation = mock(BaseOperation.class, Mockito.RETURNS_DEEP_STUBS);
244+
Mockito.when(operation.getOperationContext().getExecutor()).thenReturn(Runnable::run);
245+
return operation;
246+
}
247+
242248
private static class WatchManager<T extends HasMetadata> extends AbstractWatchManager<T> {
243249

244250
private final AtomicInteger closeCount = new AtomicInteger(0);
245251

246252
public WatchManager(Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval)
247253
throws MalformedURLException {
248-
super(watcher, mock(BaseOperation.class), listOptions, reconnectLimit, reconnectInterval,
254+
super(watcher, mockOperation(), listOptions, reconnectLimit, reconnectInterval,
249255
() -> null);
250256
}
251257

0 commit comments

Comments
 (0)