Skip to content

Commit c3ff48f

Browse files
committed
java controller runtime
1 parent 79fbdcf commit c3ff48f

20 files changed

+1624
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ We prepared a few examples for common use-cases which are shown below:
9292
- [WebSocketExample](https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/WebSocketExample.java):
9393
Establish an arbitrary web-socket session to certain resources.
9494
- __Advanced__:
95+
- [ControllerExample](https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/ControllerExample.java):
96+
Build a controller reconciling the state of world by list-watching one or multiple resources.
9597
- [InformerExample](https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/InformerExample.java):
9698
Build an informer which list-watches resources and reflects the notifications to a local cache.
9799
- [LeaderElectionExample](https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java):
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package io.kubernetes.client.examples;
2+
3+
import io.kubernetes.client.apis.CoreV1Api;
4+
import io.kubernetes.client.extended.controller.Controller;
5+
import io.kubernetes.client.extended.controller.ControllerManager;
6+
import io.kubernetes.client.extended.controller.LeaderElectingController;
7+
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
8+
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
9+
import io.kubernetes.client.extended.controller.reconciler.Request;
10+
import io.kubernetes.client.extended.controller.reconciler.Result;
11+
import io.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
12+
import io.kubernetes.client.extended.leaderelection.LeaderElector;
13+
import io.kubernetes.client.extended.leaderelection.resourcelock.EndpointsLock;
14+
import io.kubernetes.client.informer.SharedIndexInformer;
15+
import io.kubernetes.client.informer.SharedInformerFactory;
16+
import io.kubernetes.client.informer.cache.Lister;
17+
import io.kubernetes.client.models.V1Node;
18+
import io.kubernetes.client.models.V1NodeList;
19+
import io.kubernetes.client.util.CallGeneratorParams;
20+
import java.io.IOException;
21+
import java.time.Duration;
22+
23+
public class ControllerExample {
24+
public static void main(String[] args) throws IOException {
25+
26+
CoreV1Api coreV1Api = new CoreV1Api();
27+
28+
// instantiating an informer-facotry, and there should be only one informer-factory globally.
29+
SharedInformerFactory informerFactory = new SharedInformerFactory();
30+
// registering node-informer into the informer-facotry.
31+
SharedIndexInformer<V1Node> nodeInformer =
32+
informerFactory.sharedIndexInformerFor(
33+
(CallGeneratorParams params) -> {
34+
return coreV1Api.listNodeCall(
35+
null,
36+
null,
37+
null,
38+
null,
39+
null,
40+
params.resourceVersion,
41+
params.timeoutSeconds,
42+
params.watch,
43+
null,
44+
null);
45+
},
46+
V1Node.class,
47+
V1NodeList.class);
48+
informerFactory.startAllRegisteredInformers();
49+
50+
// nodeReconciler prints node information on events
51+
NodePrintingReconciler nodeReconciler = new NodePrintingReconciler(nodeInformer);
52+
53+
// Use builder library to construct a default controller.
54+
Controller controller =
55+
ControllerBuilder.defaultBuilder(informerFactory)
56+
.watch(V1Node.class)
57+
.withWorkQueueKeyFunc(
58+
(V1Node node) -> new Request(node.getMetadata().getName())) // optional, default to
59+
// Controllers#defaultReflectiveKeyFunc
60+
.withOnAddFilter(
61+
(V1Node createdNode) ->
62+
createdNode
63+
.getMetadata()
64+
.getName()
65+
.startsWith("docker-")) // optional, set onAdd filter
66+
.withOnUpdateFilter(
67+
(V1Node oldNode, V1Node newNode) ->
68+
newNode
69+
.getMetadata()
70+
.getName()
71+
.startsWith("docker-")) // optional, set onUpdate filter
72+
.withOnDeleteFilter(
73+
(V1Node deletedNode, Boolean stateUnknown) ->
74+
deletedNode
75+
.getMetadata()
76+
.getName()
77+
.startsWith("docker-")) // optional, set onDelete filter
78+
.endWatch()
79+
.withReconciler(nodeReconciler) // required, set the actual reconciler
80+
.withName("node-printing-controller") // optional, set name for controller
81+
.withWorkerCount(4) // optional, set worker thread count
82+
.withReadyFunc(
83+
nodeInformer
84+
::hasSynced) // optional, only starts controller when the cache has synced up
85+
.build();
86+
87+
// Use builder library to manage one or multiple controllers.
88+
ControllerManager controllerManager =
89+
ControllerBuilder.controllerManagerBuilder().addController(controller).build();
90+
91+
LeaderElectingController leaderElectingController =
92+
new LeaderElectingController(
93+
new LeaderElector(
94+
new LeaderElectionConfig(
95+
new EndpointsLock("kube-system", "leader-election", "foo"),
96+
Duration.ofMillis(10000),
97+
null,
98+
Duration.ofMillis(5000))),
99+
controllerManager);
100+
101+
leaderElectingController.run();
102+
}
103+
104+
static class NodePrintingReconciler implements Reconciler {
105+
106+
private Lister<V1Node> nodeLister;
107+
108+
public NodePrintingReconciler(SharedIndexInformer<V1Node> nodeInformer) {
109+
this.nodeLister = new Lister<>(nodeInformer.getIndexer());
110+
}
111+
112+
@Override
113+
public Result reconcile(Request request) {
114+
V1Node node = this.nodeLister.get(request.getName());
115+
System.out.println("triggered reconciling " + node.getMetadata().getName());
116+
return new Result(false);
117+
}
118+
}
119+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.kubernetes.client.extended.controller;
2+
3+
/** The interface Controller defines the interface for operating a controller. */
4+
public interface Controller extends Runnable {
5+
/** Shutdown the controller. */
6+
void shutdown();
7+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.kubernetes.client.extended.controller;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.ExecutorService;
5+
import java.util.concurrent.Executors;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
/** The type Controller manager manages a set of controllers' lifecycle. */
10+
public class ControllerManager implements Controller {
11+
private static final Logger log = LoggerFactory.getLogger(DefaultController.class);
12+
private Controller[] controllers;
13+
private ExecutorService controllerThreadPool;
14+
15+
/**
16+
* Instantiates a new Controller manager.
17+
*
18+
* @param controllers the controllers to be managed.
19+
*/
20+
public ControllerManager(Controller... controllers) {
21+
this.controllers = controllers;
22+
}
23+
24+
@Override
25+
public void shutdown() {
26+
for (Controller controller : this.controllers) {
27+
controller.shutdown();
28+
}
29+
if (controllerThreadPool != null) {
30+
this.controllerThreadPool.shutdown();
31+
}
32+
}
33+
34+
@Override
35+
public void run() {
36+
if (controllers.length == 0) {
37+
throw new RuntimeException("no controller registered in the manager..");
38+
}
39+
CountDownLatch latch = new CountDownLatch(controllers.length);
40+
this.controllerThreadPool = Executors.newFixedThreadPool(controllers.length);
41+
for (Controller controller : this.controllers) {
42+
controllerThreadPool.submit(
43+
() -> {
44+
controller.run();
45+
latch.countDown();
46+
});
47+
}
48+
try {
49+
log.debug("Controller-Manager {} bootstrapping..");
50+
latch.await();
51+
} catch (InterruptedException e) {
52+
log.error("Aborting controller-manager.", e);
53+
} finally {
54+
log.info("Controller-Manager {} exited");
55+
}
56+
}
57+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.kubernetes.client.extended.controller;
2+
3+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4+
import io.kubernetes.client.extended.controller.reconciler.Request;
5+
import io.kubernetes.client.models.V1ObjectMeta;
6+
import io.kubernetes.client.util.Reflect;
7+
import io.kubernetes.client.util.exception.ObjectMetaReflectException;
8+
import java.util.concurrent.ThreadFactory;
9+
import java.util.function.Function;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
/** The Controllers is a set of commonly used utility functions for constructing controller. */
14+
public class Controllers {
15+
16+
private static final Logger log = LoggerFactory.getLogger(Controllers.class);
17+
18+
/**
19+
* The Default key func function works for work-queue, which extracts namespace and name via
20+
* reflection from the objects.
21+
*
22+
* @param <ApiType> the type parameter
23+
* @return the function
24+
*/
25+
public static <ApiType> Function<ApiType, Request> defaultReflectiveKeyFunc() {
26+
return (ApiType obj) -> {
27+
try {
28+
V1ObjectMeta objectMeta = Reflect.objectMetadata(obj);
29+
return new Request(objectMeta.getNamespace(), objectMeta.getName());
30+
} catch (ObjectMetaReflectException e) {
31+
log.error("Fail to access object-meta from {}..", obj.getClass());
32+
return null;
33+
}
34+
};
35+
}
36+
37+
/**
38+
* Named thread factory for constructing controller, useful when debugging dumping status of
39+
* controller worker threads. e.g. for a controller named `foo`, its threads will be named
40+
* `foo-1`, `foo-2`...
41+
*
42+
* @param controllerName the controller name
43+
* @return the thread factory
44+
*/
45+
public static ThreadFactory namedControllerThreadFactory(String controllerName) {
46+
return new ThreadFactoryBuilder().setNameFormat(controllerName + "-%d").build();
47+
}
48+
}

0 commit comments

Comments
 (0)