Skip to content

Commit 0e13dd0

Browse files
authored
Merge pull request #1352 from mingjie-li/master
unable to use autowired in the reconciler
2 parents fb88703 + 15d5ab2 commit 0e13dd0

File tree

4 files changed

+330
-271
lines changed

4 files changed

+330
-271
lines changed

examples/src/main/java/io/kubernetes/client/examples/SpringControllerExample.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,20 @@
2525
import io.kubernetes.client.openapi.models.V1Pod;
2626
import io.kubernetes.client.openapi.models.V1PodList;
2727
import io.kubernetes.client.spring.extended.controller.KubernetesInformerConfigurer;
28-
import io.kubernetes.client.spring.extended.controller.KubernetesReconcilerConfigurer;
2928
import io.kubernetes.client.spring.extended.controller.annotation.*;
29+
import io.kubernetes.client.spring.extended.controller.factory.KubernetesControllerFactory;
3030
import io.kubernetes.client.util.ClientBuilder;
3131
import java.io.IOException;
3232
import java.time.Duration;
33+
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.springframework.beans.factory.annotation.Qualifier;
35+
import org.springframework.beans.factory.annotation.Value;
3436
import org.springframework.boot.CommandLineRunner;
3537
import org.springframework.boot.SpringApplication;
3638
import org.springframework.boot.autoconfigure.SpringBootApplication;
3739
import org.springframework.context.annotation.Bean;
3840
import org.springframework.context.annotation.Configuration;
41+
import org.springframework.stereotype.Component;
3942

4043
@SpringBootApplication
4144
public class SpringControllerExample {
@@ -68,11 +71,11 @@ public KubernetesInformerConfigurer kubernetesInformerConfigurer(ApiClient apiCl
6871
}
6972

7073
// *REQUIRED*
71-
// Configurer components that registers reconciler to the controller-manager in the context.
72-
@Bean
73-
public KubernetesReconcilerConfigurer kubernetesReconcilerConfigurer(
74-
SharedInformerFactory sharedInformerFactory) {
75-
return new KubernetesReconcilerConfigurer(sharedInformerFactory);
74+
// factorybean to crete controller
75+
@Bean("node-printing-controller")
76+
public KubernetesControllerFactory kubernetesReconcilerConfigurer(
77+
SharedInformerFactory sharedInformerFactory, Reconciler reconciler) {
78+
return new KubernetesControllerFactory(sharedInformerFactory, reconciler);
7679
}
7780

7881
// *OPTIONAL*
@@ -91,12 +94,6 @@ public ApiClient myApiClient() throws IOException {
9194
public SharedInformerFactory sharedInformerFactory() {
9295
return new MySharedInformerFactory();
9396
}
94-
95-
@Bean
96-
public NodePrintingReconciler nodePrintingReconciler(
97-
Lister<V1Pod> podLister, Lister<V1Node> nodeLister, SharedInformer<V1Node> nodeInformer) {
98-
return new NodePrintingReconciler(podLister, nodeLister, nodeInformer);
99-
}
10097
}
10198

10299
@KubernetesInformers({ // Defining what resources is the informer-factory actually watching.
@@ -128,31 +125,33 @@ public static class MySharedInformerFactory extends SharedInformerFactory {}
128125
resyncPeriodMillis = 60 * 1000L // fully resync every 1 minute
129126
),
130127
}))
128+
@Component
131129
public static class NodePrintingReconciler implements Reconciler {
132130

133-
public NodePrintingReconciler(
134-
Lister<V1Pod> podLister, Lister<V1Node> nodeLister, SharedInformer<V1Node> nodeInformer) {
135-
this.nodeLister = nodeLister;
136-
this.podLister = podLister;
137-
this.nodeInformer = nodeInformer;
138-
}
131+
@Value("${namespace}")
132+
private String namespace;
139133

140-
private SharedInformer<V1Node> nodeInformer;
141-
142-
private Lister<V1Node> nodeLister;
143-
144-
private Lister<V1Pod> podLister;
134+
@Autowired private SharedInformer<V1Node> nodeInformer;
135+
@Autowired private SharedInformer<V1Pod> podInformer;
136+
@Autowired private Lister<V1Node> nodeLister;
137+
@Autowired private Lister<V1Pod> podLister;
145138

146139
// *OPTIONAL*
147140
// If you feed like hold the controller from running util some condition..
148141
@KubernetesReconcilerReadyFunc
149-
boolean informerReady() {
150-
return nodeInformer.hasSynced();
142+
public boolean informerReady() {
143+
return podInformer.hasSynced() && nodeInformer.hasSynced();
151144
}
152145

153146
@Override
154147
public Result reconcile(Request request) {
155148
V1Node node = nodeLister.get(request.getName());
149+
150+
System.out.println("get all pods in namespace " + namespace);
151+
podLister.namespace(namespace).list().stream()
152+
.map(pod -> pod.getMetadata().getName())
153+
.forEach(System.out::println);
154+
156155
System.out.println("triggered reconciling " + node.getMetadata().getName());
157156
return new Result(false);
158157
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
namespace=airflow

spring/src/main/java/io/kubernetes/client/spring/extended/controller/KubernetesReconcilerProcessor.java

Lines changed: 5 additions & 246 deletions
Original file line numberDiff line numberDiff line change
@@ -12,38 +12,17 @@
1212
*/
1313
package io.kubernetes.client.spring.extended.controller;
1414

15-
import io.kubernetes.client.common.KubernetesObject;
1615
import io.kubernetes.client.extended.controller.Controller;
1716
import io.kubernetes.client.extended.controller.ControllerManager;
18-
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
19-
import io.kubernetes.client.extended.controller.builder.DefaultControllerBuilder;
2017
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
21-
import io.kubernetes.client.extended.controller.reconciler.Request;
22-
import io.kubernetes.client.extended.workqueue.DefaultRateLimitingQueue;
23-
import io.kubernetes.client.extended.workqueue.RateLimitingQueue;
24-
import io.kubernetes.client.extended.workqueue.WorkQueue;
2518
import io.kubernetes.client.informer.SharedInformerFactory;
2619
import io.kubernetes.client.spring.extended.controller.annotation.*;
27-
import java.lang.reflect.InvocationTargetException;
28-
import java.lang.reflect.Method;
29-
import java.time.Duration;
30-
import java.util.ArrayList;
31-
import java.util.HashMap;
32-
import java.util.HashSet;
33-
import java.util.List;
34-
import java.util.Map;
35-
import java.util.Set;
20+
import io.kubernetes.client.spring.extended.controller.factory.KubernetesControllerFactory;
3621
import java.util.concurrent.ExecutorService;
3722
import java.util.concurrent.Executors;
38-
import java.util.function.BiPredicate;
39-
import java.util.function.Function;
40-
import java.util.function.Predicate;
41-
import java.util.function.Supplier;
42-
import org.apache.commons.lang3.StringUtils;
4323
import org.slf4j.Logger;
4424
import org.slf4j.LoggerFactory;
4525
import org.springframework.beans.BeansException;
46-
import org.springframework.beans.factory.BeanCreationException;
4726
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
4827
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
4928
import org.springframework.core.Ordered;
@@ -88,232 +67,12 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
8867
KubernetesReconciler kubernetesReconciler =
8968
reconciler.getClass().getAnnotation(KubernetesReconciler.class);
9069
String reconcilerName = kubernetesReconciler.value();
91-
Controller controller = buildController(this.sharedInformerFactory, reconciler);
92-
beanFactory.registerSingleton(reconcilerName, controller);
93-
}
94-
}
95-
96-
private Controller buildController(SharedInformerFactory sharedInformerFactory, Reconciler r)
97-
throws BeansException {
98-
KubernetesReconciler kubernetesReconciler =
99-
r.getClass().getAnnotation(KubernetesReconciler.class);
100-
String reconcilerName = kubernetesReconciler.value();
101-
102-
KubernetesReconcilerWatches watches = kubernetesReconciler.watches();
103-
DefaultControllerBuilder builder = ControllerBuilder.defaultBuilder(sharedInformerFactory);
104-
RateLimitingQueue<Request> workQueue = new DefaultRateLimitingQueue<>();
105-
builder = builder.withWorkQueue(workQueue);
106-
Map<Class, AddFilterAdaptor> addFilters = getAddFilters(watches, r);
107-
Map<Class, UpdateFilterAdaptor> updateFilters = getUpdateFilters(watches, r);
108-
Map<Class, DeleteFilterAdaptor> deleteFilters = getDeleteFilters(watches, r);
109-
List<ReadyFuncAdaptor> readyFuncs = getReadyFuncs(r);
110-
for (KubernetesReconcilerWatch watch : watches.value()) {
111-
try {
112-
113-
Function<? extends KubernetesObject, Request> workQueueKeyFunc;
114-
try {
115-
workQueueKeyFunc =
116-
watch.workQueueKeyFunc().getConstructor(WorkQueue.class).newInstance(workQueue);
117-
} catch (NoSuchMethodException e) {
118-
workQueueKeyFunc = watch.workQueueKeyFunc().newInstance();
119-
} catch (InvocationTargetException e) {
120-
throw new BeanCreationException(
121-
"Failed instantiating controller watch: " + e.getMessage());
122-
}
123-
124-
final Function<? extends KubernetesObject, Request> finalWorkQueueKeyFunc =
125-
workQueueKeyFunc;
126-
builder =
127-
builder.watch(
128-
(q) -> {
129-
return ControllerBuilder.controllerWatchBuilder(watch.apiTypeClass(), q)
130-
.withOnAddFilter(addFilters.get(watch.apiTypeClass()))
131-
.withOnUpdateFilter(updateFilters.get(watch.apiTypeClass()))
132-
.withOnDeleteFilter(deleteFilters.get(watch.apiTypeClass()))
133-
.withWorkQueueKeyFunc(finalWorkQueueKeyFunc)
134-
.withResyncPeriod(Duration.ofMillis(watch.resyncPeriodMillis()))
135-
.build();
136-
});
137-
for (Supplier<Boolean> readyFunc : readyFuncs) {
138-
builder = builder.withReadyFunc(readyFunc);
139-
}
140-
} catch (IllegalAccessException | InstantiationException e) {
141-
throw new BeanCreationException("Failed instantiating controller: " + e.getMessage());
142-
}
143-
}
144-
145-
builder = builder.withWorkerCount(kubernetesReconciler.workerCount());
146-
147-
return builder.withReconciler(r).withName(reconcilerName).build();
148-
}
149-
150-
private Map<Class, AddFilterAdaptor> getAddFilters(
151-
KubernetesReconcilerWatches watches, Reconciler reconciler) {
152-
Map<Class, AddFilterAdaptor> filters = new HashMap<>();
153-
Set<Method> allAnnotatedMethods = new HashSet<>();
154-
Set<Method> adoptedMethods = new HashSet<>();
155-
for (KubernetesReconcilerWatch watch : watches.value()) {
156-
for (Method method : reconciler.getClass().getMethods()) {
157-
AddWatchEventFilter annotation = method.getAnnotation(AddWatchEventFilter.class);
158-
if (annotation != null && watch.apiTypeClass().equals(annotation.apiTypeClass())) {
159-
if (filters.containsKey(watch.apiTypeClass())) {
160-
log.warn(
161-
"Duplicated watch ADD event filter upon apiType {}", annotation.apiTypeClass());
162-
}
163-
filters.put(watch.apiTypeClass(), new AddFilterAdaptor(reconciler, method));
164-
adoptedMethods.add(method);
165-
}
166-
allAnnotatedMethods.add(method);
167-
}
168-
}
169-
allAnnotatedMethods.removeAll(adoptedMethods);
170-
if (allAnnotatedMethods.size() > 0) {
171-
log.warn("Dangling watch ADD event filters {}", StringUtils.join(allAnnotatedMethods, ","));
172-
}
173-
return filters;
174-
}
175-
176-
private Map<Class, UpdateFilterAdaptor> getUpdateFilters(
177-
KubernetesReconcilerWatches watches, Reconciler reconciler) {
178-
Map<Class, UpdateFilterAdaptor> filters = new HashMap<>();
179-
Set<Method> allAnnotatedMethods = new HashSet<>();
180-
Set<Method> adoptedMethods = new HashSet<>();
181-
for (KubernetesReconcilerWatch watch : watches.value()) {
182-
for (Method method : reconciler.getClass().getMethods()) {
183-
UpdateWatchEventFilter annotation = method.getAnnotation(UpdateWatchEventFilter.class);
184-
if (annotation != null && watch.apiTypeClass().equals(annotation.apiTypeClass())) {
185-
if (filters.containsKey(watch.apiTypeClass())) {
186-
log.warn(
187-
"Duplicated watch UPDATE event filter upon apiType {}", annotation.apiTypeClass());
188-
}
189-
filters.put(watch.apiTypeClass(), new UpdateFilterAdaptor(reconciler, method));
190-
adoptedMethods.add(method);
191-
}
192-
allAnnotatedMethods.add(method);
193-
}
194-
}
195-
allAnnotatedMethods.removeAll(adoptedMethods);
196-
if (allAnnotatedMethods.size() > 0) {
197-
log.warn(
198-
"Dangling watch UPDATE event filters {}", StringUtils.join(allAnnotatedMethods, ","));
199-
}
200-
return filters;
201-
}
202-
203-
private Map<Class, DeleteFilterAdaptor> getDeleteFilters(
204-
KubernetesReconcilerWatches watches, Reconciler reconciler) {
205-
Map<Class, DeleteFilterAdaptor> filters = new HashMap<>();
206-
Set<Method> allAnnotatedMethods = new HashSet<>();
207-
Set<Method> adoptedMethods = new HashSet<>();
208-
for (KubernetesReconcilerWatch watch : watches.value()) {
209-
for (Method method : reconciler.getClass().getMethods()) {
210-
DeleteWatchEventFilter annotation = method.getAnnotation(DeleteWatchEventFilter.class);
211-
if (annotation != null && watch.apiTypeClass().equals(annotation.apiTypeClass())) {
212-
if (filters.containsKey(watch.apiTypeClass())) {
213-
log.warn(
214-
"Duplicated watch DELETE event filter upon apiType {}", annotation.apiTypeClass());
215-
}
216-
filters.put(watch.apiTypeClass(), new DeleteFilterAdaptor(reconciler, method));
217-
adoptedMethods.add(method);
218-
}
219-
allAnnotatedMethods.add(method);
220-
}
221-
}
222-
allAnnotatedMethods.removeAll(adoptedMethods);
223-
if (allAnnotatedMethods.size() > 0) {
224-
log.warn(
225-
"Dangling watch DELETE event filters {}", StringUtils.join(allAnnotatedMethods, ","));
226-
}
227-
return filters;
228-
}
22970

230-
private List<ReadyFuncAdaptor> getReadyFuncs(Reconciler reconciler) {
231-
List<ReadyFuncAdaptor> readyFuncs = new ArrayList<>();
232-
for (Method method : reconciler.getClass().getMethods()) {
233-
if (method.isAnnotationPresent(KubernetesReconcilerReadyFunc.class)) {
234-
readyFuncs.add(new ReadyFuncAdaptor(reconciler, method));
235-
}
236-
}
237-
return readyFuncs;
238-
}
239-
240-
private static class AddFilterAdaptor implements Predicate {
241-
private Method method;
242-
private Object target;
243-
244-
private AddFilterAdaptor(Object target, Method method) {
245-
this.method = method;
246-
this.target = target;
247-
}
248-
249-
@Override
250-
public boolean test(Object addedObj) {
251-
try {
252-
return (boolean) method.invoke(target, addedObj);
253-
} catch (IllegalAccessException | InvocationTargetException e) {
254-
log.error("invalid EventAddFilter method signature", e);
255-
return true;
256-
}
257-
}
258-
}
259-
260-
private static class UpdateFilterAdaptor implements BiPredicate {
261-
private Method method;
262-
private Object target;
263-
264-
private UpdateFilterAdaptor(Object target, Method method) {
265-
this.method = method;
266-
this.target = target;
267-
}
268-
269-
@Override
270-
public boolean test(Object oldObj, Object newObj) {
271-
try {
272-
return (boolean) method.invoke(target, oldObj, newObj);
273-
} catch (IllegalAccessException | InvocationTargetException e) {
274-
log.error("invalid EventUpdateFilter method signature", e);
275-
return true;
276-
}
277-
}
278-
}
279-
280-
private static class DeleteFilterAdaptor implements BiPredicate {
281-
private Method method;
282-
private Object target;
283-
284-
private DeleteFilterAdaptor(Object target, Method method) {
285-
this.method = method;
286-
this.target = target;
287-
}
288-
289-
@Override
290-
public boolean test(Object deleteObj, Object cacheStatusUnknown) {
291-
try {
292-
return (boolean) method.invoke(target, deleteObj, cacheStatusUnknown);
293-
} catch (IllegalAccessException | InvocationTargetException e) {
294-
log.error("invalid EventDeleteFilter method signature", e);
295-
return true;
296-
}
297-
}
298-
}
299-
300-
private static class ReadyFuncAdaptor implements Supplier<Boolean> {
301-
private Method method;
302-
private Object target;
71+
KubernetesControllerFactory controllerFactory =
72+
new KubernetesControllerFactory(sharedInformerFactory, reconciler);
30373

304-
private ReadyFuncAdaptor(Object target, Method method) {
305-
this.method = method;
306-
this.target = target;
307-
}
308-
309-
@Override
310-
public Boolean get() {
311-
try {
312-
return (boolean) method.invoke(target);
313-
} catch (IllegalAccessException | InvocationTargetException e) {
314-
log.error("invalid ReadyFunc method signature", e);
315-
return false;
316-
}
74+
Controller controller = controllerFactory.getObject();
75+
beanFactory.registerSingleton(reconcilerName, controller);
31776
}
31877
}
31978
}

0 commit comments

Comments
 (0)