14
14
import java .lang .reflect .Method ;
15
15
import java .time .Duration ;
16
16
import java .util .ArrayList ;
17
+ import java .util .HashMap ;
18
+ import java .util .HashSet ;
17
19
import java .util .List ;
20
+ import java .util .Map ;
21
+ import java .util .Set ;
18
22
import java .util .concurrent .ExecutorService ;
19
23
import java .util .concurrent .Executors ;
20
24
import java .util .function .BiPredicate ;
21
25
import java .util .function .Function ;
22
26
import java .util .function .Predicate ;
23
27
import java .util .function .Supplier ;
28
+ import org .apache .commons .lang3 .StringUtils ;
24
29
import org .slf4j .Logger ;
25
30
import org .slf4j .LoggerFactory ;
26
31
import org .springframework .beans .BeansException ;
@@ -76,38 +81,20 @@ private Controller buildController(SharedInformerFactory sharedInformerFactory,
76
81
DefaultControllerBuilder builder = ControllerBuilder .defaultBuilder (sharedInformerFactory );
77
82
RateLimitingQueue <Request > workQueue = new DefaultRateLimitingQueue <>();
78
83
builder = builder .withWorkQueue (workQueue );
84
+ Map <Class , AddFilterAdaptor > addFilters = getAddFilters (watches , r );
85
+ Map <Class , UpdateFilterAdaptor > updateFilters = getUpdateFilters (watches , r );
86
+ Map <Class , DeleteFilterAdaptor > deleteFilters = getDeleteFilters (watches , r );
87
+ List <ReadyFuncAdaptor > readyFuncs = getReadyFuncs (r );
79
88
for (KubernetesReconcilerWatch watch : watches .value ()) {
80
89
try {
81
- Predicate addFilter = null ;
82
- BiPredicate updateFilter = null ;
83
- BiPredicate deleteFilter = null ;
84
- final List <Supplier <Boolean >> readyFuncs = new ArrayList <>();
85
90
Function <?, Request > workQueueKeyFunc = watch .workQueueKeyFunc ().newInstance ();
86
- for (Method method : r .getClass ().getMethods ()) {
87
- if (method .isAnnotationPresent (AddWatchEventFilter .class )) {
88
- addFilter = new AddFilterAdaptor (r , method );
89
- }
90
- if (method .isAnnotationPresent (UpdateWatchEventFilter .class )) {
91
- updateFilter = new UpdateFilterAdaptor (r , method );
92
- }
93
- if (method .isAnnotationPresent (DeleteWatchEventFilter .class )) {
94
- deleteFilter = new DeleteFilterAdaptor (r , method );
95
- }
96
- if (method .isAnnotationPresent (KubernetesReconcilerReadyFunc .class )) {
97
- readyFuncs .add (new ReadyFuncAdaptor (r , method ));
98
- }
99
- }
100
-
101
- final Predicate finalAddFilter = addFilter ;
102
- final BiPredicate finalUpdateFilter = updateFilter ;
103
- final BiPredicate finalDeleteFilter = deleteFilter ;
104
91
builder =
105
92
builder .watch (
106
93
(q ) -> {
107
94
return ControllerBuilder .controllerWatchBuilder (watch .apiTypeClass (), q )
108
- .withOnAddFilter (finalAddFilter )
109
- .withOnUpdateFilter (finalUpdateFilter )
110
- .withOnDeleteFilter (finalDeleteFilter )
95
+ .withOnAddFilter (addFilters . get ( watch . apiTypeClass ()) )
96
+ .withOnUpdateFilter (updateFilters . get ( watch . apiTypeClass ()) )
97
+ .withOnDeleteFilter (deleteFilters . get ( watch . apiTypeClass ()) )
111
98
.withWorkQueueKeyFunc (workQueueKeyFunc )
112
99
.withResyncPeriod (Duration .ofMillis (watch .resyncPeriodMillis ()))
113
100
.build ();
@@ -120,15 +107,101 @@ private Controller buildController(SharedInformerFactory sharedInformerFactory,
120
107
}
121
108
}
122
109
123
- if (r .getClass ().isAnnotationPresent (KubernetesReconcilerWorkerCount .class )) {
124
- KubernetesReconcilerWorkerCount workerCount =
125
- r .getClass ().getAnnotation (KubernetesReconcilerWorkerCount .class );
126
- builder = builder .withWorkerCount (workerCount .value ());
127
- }
110
+ builder = builder .withWorkerCount (kubernetesReconciler .workerCount ());
128
111
129
112
return builder .withReconciler (r ).withName (reconcilerName ).build ();
130
113
}
131
114
115
+ private Map <Class , AddFilterAdaptor > getAddFilters (
116
+ KubernetesReconcilerWatches watches , Reconciler reconciler ) {
117
+ Map <Class , AddFilterAdaptor > filters = new HashMap <>();
118
+ Set <Method > allAnnotatedMethods = new HashSet <>();
119
+ Set <Method > adoptedMethods = new HashSet <>();
120
+ for (KubernetesReconcilerWatch watch : watches .value ()) {
121
+ for (Method method : reconciler .getClass ().getMethods ()) {
122
+ AddWatchEventFilter annotation = method .getAnnotation (AddWatchEventFilter .class );
123
+ if (watch .apiTypeClass ().equals (annotation .apiTypeClass ())) {
124
+ if (filters .containsKey (watch .apiTypeClass ())) {
125
+ log .warn (
126
+ "Duplicated watch ADD event filter upon apiType {}" , annotation .apiTypeClass ());
127
+ }
128
+ filters .put (watch .apiTypeClass (), new AddFilterAdaptor (reconciler , method ));
129
+ adoptedMethods .add (method );
130
+ }
131
+ allAnnotatedMethods .add (method );
132
+ }
133
+ }
134
+ allAnnotatedMethods .removeAll (adoptedMethods );
135
+ if (allAnnotatedMethods .size () > 0 ) {
136
+ log .warn ("Dangling watch ADD event filters {}" , StringUtils .join (allAnnotatedMethods , "," ));
137
+ }
138
+ return filters ;
139
+ }
140
+
141
+ private Map <Class , UpdateFilterAdaptor > getUpdateFilters (
142
+ KubernetesReconcilerWatches watches , Reconciler reconciler ) {
143
+ Map <Class , UpdateFilterAdaptor > filters = new HashMap <>();
144
+ Set <Method > allAnnotatedMethods = new HashSet <>();
145
+ Set <Method > adoptedMethods = new HashSet <>();
146
+ for (KubernetesReconcilerWatch watch : watches .value ()) {
147
+ for (Method method : reconciler .getClass ().getMethods ()) {
148
+ UpdateWatchEventFilter annotation = method .getAnnotation (UpdateWatchEventFilter .class );
149
+ if (watch .apiTypeClass ().equals (annotation .apiTypeClass ())) {
150
+ if (filters .containsKey (watch .apiTypeClass ())) {
151
+ log .warn (
152
+ "Duplicated watch UPDATE event filter upon apiType {}" , annotation .apiTypeClass ());
153
+ }
154
+ filters .put (watch .apiTypeClass (), new UpdateFilterAdaptor (reconciler , method ));
155
+ adoptedMethods .add (method );
156
+ }
157
+ allAnnotatedMethods .add (method );
158
+ }
159
+ }
160
+ allAnnotatedMethods .removeAll (adoptedMethods );
161
+ if (allAnnotatedMethods .size () > 0 ) {
162
+ log .warn (
163
+ "Dangling watch UPDATE event filters {}" , StringUtils .join (allAnnotatedMethods , "," ));
164
+ }
165
+ return filters ;
166
+ }
167
+
168
+ private Map <Class , DeleteFilterAdaptor > getDeleteFilters (
169
+ KubernetesReconcilerWatches watches , Reconciler reconciler ) {
170
+ Map <Class , DeleteFilterAdaptor > filters = new HashMap <>();
171
+ Set <Method > allAnnotatedMethods = new HashSet <>();
172
+ Set <Method > adoptedMethods = new HashSet <>();
173
+ for (KubernetesReconcilerWatch watch : watches .value ()) {
174
+ for (Method method : reconciler .getClass ().getMethods ()) {
175
+ DeleteWatchEventFilter annotation = method .getAnnotation (DeleteWatchEventFilter .class );
176
+ if (watch .apiTypeClass ().equals (annotation .apiTypeClass ())) {
177
+ if (filters .containsKey (watch .apiTypeClass ())) {
178
+ log .warn (
179
+ "Duplicated watch DELETE event filter upon apiType {}" , annotation .apiTypeClass ());
180
+ }
181
+ filters .put (watch .apiTypeClass (), new DeleteFilterAdaptor (reconciler , method ));
182
+ adoptedMethods .add (method );
183
+ }
184
+ allAnnotatedMethods .add (method );
185
+ }
186
+ }
187
+ allAnnotatedMethods .removeAll (adoptedMethods );
188
+ if (allAnnotatedMethods .size () > 0 ) {
189
+ log .warn (
190
+ "Dangling watch DELETE event filters {}" , StringUtils .join (allAnnotatedMethods , "," ));
191
+ }
192
+ return filters ;
193
+ }
194
+
195
+ private List <ReadyFuncAdaptor > getReadyFuncs (Reconciler reconciler ) {
196
+ List <ReadyFuncAdaptor > readyFuncs = new ArrayList <>();
197
+ for (Method method : reconciler .getClass ().getMethods ()) {
198
+ if (method .isAnnotationPresent (KubernetesReconcilerReadyFunc .class )) {
199
+ readyFuncs .add (new ReadyFuncAdaptor (reconciler , method ));
200
+ }
201
+ }
202
+ return readyFuncs ;
203
+ }
204
+
132
205
private static class AddFilterAdaptor implements Predicate {
133
206
private Method method ;
134
207
private Object target ;
0 commit comments