Skip to content

Commit e22b8d2

Browse files
authored
Merge pull request #1118 from yue9944882/custom-work-queue-key-func
Support custom workqueue-keyfunc to directly manipulate work-queue instance
2 parents 41f0220 + d13c134 commit e22b8d2

File tree

6 files changed

+131
-17
lines changed

6 files changed

+131
-17
lines changed

extended/src/main/java/io/kubernetes/client/extended/controller/DefaultController.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,15 @@ private void worker() {
199199
}
200200
}
201201

202+
public RateLimitingQueue<Request> getWorkQueue() {
203+
return workQueue;
204+
}
205+
206+
public DefaultController setWorkQueue(RateLimitingQueue<Request> workQueue) {
207+
this.workQueue = workQueue;
208+
return this;
209+
}
210+
202211
public String getName() {
203212
return name;
204213
}

extended/src/main/java/io/kubernetes/client/extended/controller/DefaultControllerWatch.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,22 +94,31 @@ public ResourceEventHandler<ApiType> getResourceEventHandler() {
9494
@Override
9595
public void onAdd(ApiType obj) {
9696
if (onAddFilterPredicate == null || onAddFilterPredicate.test(obj)) {
97-
workQueue.add(workKeyGenerator.apply(obj));
97+
Request req = workKeyGenerator.apply(obj);
98+
if (null != req) {
99+
workQueue.add(req);
100+
}
98101
}
99102
}
100103

101104
@Override
102105
public void onUpdate(ApiType oldObj, ApiType newObj) {
103106
if (onUpdateFilterPredicate == null || onUpdateFilterPredicate.test(oldObj, newObj)) {
104-
workQueue.add(workKeyGenerator.apply(newObj));
107+
Request req = workKeyGenerator.apply(newObj);
108+
if (null != req) {
109+
workQueue.add(req);
110+
}
105111
}
106112
}
107113

108114
@Override
109115
public void onDelete(ApiType obj, boolean deletedFinalStateUnknown) {
110116
if (onDeleteFilterPredicate == null
111117
|| onDeleteFilterPredicate.test(obj, deletedFinalStateUnknown)) {
112-
workQueue.add(workKeyGenerator.apply(obj));
118+
Request req = workKeyGenerator.apply(obj);
119+
if (null != req) {
120+
workQueue.add(req);
121+
}
113122
}
114123
}
115124
};

spring/src/main/java/io/kubernetes/client/spring/extended/controller/KubernetesReconcilerProcessor.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package io.kubernetes.client.spring.extended.controller;
1414

15+
import io.kubernetes.client.common.KubernetesObject;
1516
import io.kubernetes.client.extended.controller.Controller;
1617
import io.kubernetes.client.extended.controller.ControllerManager;
1718
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
@@ -20,6 +21,7 @@
2021
import io.kubernetes.client.extended.controller.reconciler.Request;
2122
import io.kubernetes.client.extended.workqueue.DefaultRateLimitingQueue;
2223
import io.kubernetes.client.extended.workqueue.RateLimitingQueue;
24+
import io.kubernetes.client.extended.workqueue.WorkQueue;
2325
import io.kubernetes.client.informer.SharedInformerFactory;
2426
import io.kubernetes.client.spring.extended.controller.annotation.*;
2527
import java.lang.reflect.InvocationTargetException;
@@ -99,15 +101,28 @@ private Controller buildController(SharedInformerFactory sharedInformerFactory,
99101
List<ReadyFuncAdaptor> readyFuncs = getReadyFuncs(r);
100102
for (KubernetesReconcilerWatch watch : watches.value()) {
101103
try {
102-
Function<?, Request> workQueueKeyFunc = watch.workQueueKeyFunc().newInstance();
104+
105+
Function<? extends KubernetesObject, Request> workQueueKeyFunc;
106+
try {
107+
workQueueKeyFunc =
108+
watch.workQueueKeyFunc().getConstructor(WorkQueue.class).newInstance(workQueue);
109+
} catch (NoSuchMethodException e) {
110+
workQueueKeyFunc = watch.workQueueKeyFunc().newInstance();
111+
} catch (InvocationTargetException e) {
112+
throw new BeanCreationException(
113+
"Failed instantiating controller watch: " + e.getMessage());
114+
}
115+
116+
final Function<? extends KubernetesObject, Request> finalWorkQueueKeyFunc =
117+
workQueueKeyFunc;
103118
builder =
104119
builder.watch(
105120
(q) -> {
106121
return ControllerBuilder.controllerWatchBuilder(watch.apiTypeClass(), q)
107122
.withOnAddFilter(addFilters.get(watch.apiTypeClass()))
108123
.withOnUpdateFilter(updateFilters.get(watch.apiTypeClass()))
109124
.withOnDeleteFilter(deleteFilters.get(watch.apiTypeClass()))
110-
.withWorkQueueKeyFunc(workQueueKeyFunc)
125+
.withWorkQueueKeyFunc(finalWorkQueueKeyFunc)
111126
.withResyncPeriod(Duration.ofMillis(watch.resyncPeriodMillis()))
112127
.build();
113128
});

spring/src/main/java/io/kubernetes/client/spring/extended/controller/annotation/KubernetesReconcilerWatch.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,14 @@
3838
* Work queue key func class maps the source resource of the watch event to a standard reconciler
3939
* request.
4040
*
41+
* <p>Optionally you can declare the constructor of the class to receive a {@link
42+
* io.kubernetes.client.extended.workqueue.WorkQueue} in the parameter in order to customize the
43+
* work-queue key-func.
44+
*
4145
* @return the class
4246
*/
43-
Class<? extends Function<?, Request>> workQueueKeyFunc() default DefaultReflectiveKeyFunc.class;
47+
Class<? extends Function<? extends KubernetesObject, Request>> workQueueKeyFunc() default
48+
DefaultReflectiveKeyFunc.class;
4449

4550
/**
4651
* Resync period in milliseconds .

spring/src/test/java/io/kubernetes/client/spring/extended/controller/KubernetesReconcilerCreatorTest.java

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,41 @@
1212
*/
1313
package io.kubernetes.client.spring.extended.controller;
1414

15+
import static org.junit.Assert.assertEquals;
1516
import static org.junit.Assert.assertNotNull;
17+
import static org.junit.Assert.fail;
1618

1719
import com.github.tomakehurst.wiremock.junit.WireMockRule;
20+
import io.kubernetes.client.common.KubernetesObject;
1821
import io.kubernetes.client.extended.controller.Controller;
22+
import io.kubernetes.client.extended.controller.DefaultController;
1923
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
2024
import io.kubernetes.client.extended.controller.reconciler.Request;
2125
import io.kubernetes.client.extended.controller.reconciler.Result;
26+
import io.kubernetes.client.extended.workqueue.WorkQueue;
2227
import io.kubernetes.client.informer.SharedInformer;
28+
import io.kubernetes.client.informer.SharedInformerFactory;
29+
import io.kubernetes.client.informer.cache.DeltaFIFO;
2330
import io.kubernetes.client.informer.cache.Lister;
31+
import io.kubernetes.client.informer.impl.DefaultSharedIndexInformer;
32+
import io.kubernetes.client.openapi.models.V1ConfigMap;
33+
import io.kubernetes.client.openapi.models.V1ObjectMeta;
2434
import io.kubernetes.client.openapi.models.V1Pod;
25-
import io.kubernetes.client.spring.extended.controller.annotation.*;
35+
import io.kubernetes.client.openapi.models.V1PodList;
2636
import io.kubernetes.client.spring.extended.controller.annotation.AddWatchEventFilter;
2737
import io.kubernetes.client.spring.extended.controller.annotation.DeleteWatchEventFilter;
38+
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconciler;
39+
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerReadyFunc;
40+
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerWatch;
41+
import io.kubernetes.client.spring.extended.controller.annotation.KubernetesReconcilerWatches;
2842
import io.kubernetes.client.spring.extended.controller.annotation.UpdateWatchEventFilter;
43+
import java.util.LinkedList;
44+
import java.util.function.Function;
45+
import javax.annotation.Resource;
46+
import org.apache.commons.lang3.tuple.MutablePair;
2947
import org.junit.Rule;
3048
import org.junit.Test;
3149
import org.junit.runner.RunWith;
32-
import org.springframework.beans.factory.annotation.Autowired;
3350
import org.springframework.boot.test.context.SpringBootTest;
3451
import org.springframework.boot.test.context.TestConfiguration;
3552
import org.springframework.context.annotation.Bean;
@@ -47,8 +64,12 @@ public class KubernetesReconcilerCreatorTest {
4764
static class TestConfig {
4865

4966
@Bean
50-
public TestReconciler podReconciler() {
51-
return new TestReconciler();
67+
public TestReconciler podReconciler(
68+
SharedInformer<V1Pod> podInformer,
69+
Lister<V1Pod> podLister,
70+
SharedInformer<V1ConfigMap> configMapInformer,
71+
Lister<V1ConfigMap> configMapLister) {
72+
return new TestReconciler(podInformer, podLister, configMapLister);
5273
}
5374
}
5475

@@ -58,20 +79,32 @@ public TestReconciler podReconciler() {
5879
@KubernetesReconcilerWatches({
5980
@KubernetesReconcilerWatch(
6081
apiTypeClass = V1Pod.class,
82+
workQueueKeyFunc = CustomWorkQueueKeyFunc.class,
6183
resyncPeriodMillis = 60 * 1000L // resync every 60s
6284
)
6385
}))
6486
static class TestReconciler implements Reconciler {
6587

66-
private int observedPodCount;
88+
private Request receivedRequest;
6789

68-
@Autowired private SharedInformer<V1Pod> podInformer;
90+
public TestReconciler(
91+
SharedInformer<V1Pod> podInformer,
92+
Lister<V1Pod> podLister,
93+
Lister<V1ConfigMap> configMapLister) {
94+
this.podInformer = podInformer;
95+
this.podLister = podLister;
96+
this.configMapLister = configMapLister;
97+
}
98+
99+
private final SharedInformer<V1Pod> podInformer;
69100

70-
@Autowired private Lister<V1Pod> podLister;
101+
private final Lister<V1Pod> podLister;
102+
103+
private final Lister<V1ConfigMap> configMapLister;
71104

72105
@Override
73106
public Result reconcile(Request request) {
74-
observedPodCount = podLister.list().size();
107+
receivedRequest = request;
75108
return new Result(false);
76109
}
77110

@@ -96,10 +129,53 @@ private boolean podInformerCacheReady() {
96129
}
97130
}
98131

99-
@Autowired private Controller testController;
132+
@Resource(name = "test-reconcile")
133+
private Controller testController;
134+
135+
@Resource private TestReconciler testReconciler;
136+
137+
@Resource private SharedInformerFactory sharedInformerFactory;
138+
139+
public static class CustomWorkQueueKeyFunc implements Function<KubernetesObject, Request> {
140+
141+
public CustomWorkQueueKeyFunc(WorkQueue<Request> workQueue) {
142+
this.workQueue = workQueue;
143+
}
144+
145+
private final WorkQueue<Request> workQueue;
146+
147+
@Override
148+
public Request apply(KubernetesObject item) {
149+
workQueue.add(new Request("foo"));
150+
return null;
151+
}
152+
}
100153

101154
@Test
102-
public void testSimplePodController() {
155+
public void testSimplePodController() throws InterruptedException {
103156
assertNotNull(testController);
157+
assertNotNull(testReconciler);
158+
159+
sharedInformerFactory.startAllRegisteredInformers();
160+
161+
((DefaultSharedIndexInformer<V1Pod, V1PodList>) testReconciler.podInformer)
162+
.handleDeltas(
163+
new LinkedList<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>() {
164+
{
165+
add(
166+
new MutablePair<>(
167+
DeltaFIFO.DeltaType.Added,
168+
new V1Pod().metadata(new V1ObjectMeta().namespace("a").name("b"))));
169+
}
170+
});
171+
172+
Thread.sleep(500);
173+
174+
WorkQueue<Request> workQueue = ((DefaultController) testController).getWorkQueue();
175+
assertEquals(1, workQueue.length());
176+
if (workQueue.length() != 1) {
177+
fail();
178+
}
179+
assertEquals("foo", workQueue.get().getName());
104180
}
105181
}

util/src/main/java/io/kubernetes/client/informer/impl/DefaultSharedIndexInformer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public boolean hasSynced() {
216216
*
217217
* @param deltas deltas
218218
*/
219-
private void handleDeltas(Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>> deltas) {
219+
public void handleDeltas(Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>> deltas) {
220220
if (CollectionUtils.isEmpty(deltas)) {
221221
return;
222222
}

0 commit comments

Comments
 (0)