4
4
import static io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getName ;
5
5
import static io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getVersion ;
6
6
7
- import io .fabric8 .kubernetes .client .CustomResource ;
8
- import io .fabric8 .kubernetes .client .dsl .MixedOperation ;
9
- import io .javaoperatorsdk .operator .Metrics ;
10
- import io .javaoperatorsdk .operator .api .ResourceController ;
11
- import io .javaoperatorsdk .operator .api .RetryInfo ;
12
- import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
13
7
import io .javaoperatorsdk .operator .api .config .ControllerConfiguration ;
14
- import io .javaoperatorsdk .operator .processing .event .DefaultEventSourceManager ;
15
- import io .javaoperatorsdk .operator .processing .event .Event ;
16
- import io .javaoperatorsdk .operator .processing .event .EventHandler ;
17
- import io .javaoperatorsdk .operator .processing .retry .GenericRetry ;
18
- import io .javaoperatorsdk .operator .processing .retry .Retry ;
19
- import io .javaoperatorsdk .operator .processing .retry .RetryExecution ;
20
- import io .micrometer .core .instrument .Clock ;
21
8
import java .util .HashMap ;
22
9
import java .util .HashSet ;
23
10
import java .util .Map ;
27
14
import java .util .concurrent .TimeUnit ;
28
15
import java .util .concurrent .locks .ReentrantLock ;
29
16
import java .util .function .Predicate ;
17
+
30
18
import org .slf4j .Logger ;
31
19
import org .slf4j .LoggerFactory ;
32
20
21
+ import io .fabric8 .kubernetes .client .CustomResource ;
22
+ import io .javaoperatorsdk .operator .api .RetryInfo ;
23
+ import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
24
+ import io .javaoperatorsdk .operator .processing .event .DefaultEventSourceManager ;
25
+ import io .javaoperatorsdk .operator .processing .event .Event ;
26
+ import io .javaoperatorsdk .operator .processing .event .EventHandler ;
27
+ import io .javaoperatorsdk .operator .processing .retry .GenericRetry ;
28
+ import io .javaoperatorsdk .operator .processing .retry .Retry ;
29
+ import io .javaoperatorsdk .operator .processing .retry .RetryExecution ;
30
+
33
31
/**
34
32
* Event handler that makes sure that events are processed in a "single threaded" way per resource
35
33
* UID, while buffering events which are received during an execution.
36
34
*/
37
- public class DefaultEventHandler implements EventHandler {
35
+ public class DefaultEventHandler < R extends CustomResource <?, ?>> implements EventHandler {
38
36
39
37
private static final Logger log = LoggerFactory .getLogger (DefaultEventHandler .class );
40
38
41
39
private final EventBuffer eventBuffer ;
42
40
private final Set <String > underProcessing = new HashSet <>();
43
41
private final ScheduledThreadPoolExecutor executor ;
44
- private final EventDispatcher eventDispatcher ;
42
+ private final EventDispatcher < R > eventDispatcher ;
45
43
private final Retry retry ;
46
44
private final Map <String , RetryExecution > retryState = new HashMap <>();
47
45
private final String controllerName ;
48
46
private final int terminationTimeout ;
49
47
private final ReentrantLock lock = new ReentrantLock ();
50
- private DefaultEventSourceManager eventSourceManager ;
51
- private ControllerConfiguration configuration ;
48
+ private DefaultEventSourceManager < R > eventSourceManager ;
49
+ private final ControllerConfiguration configuration ;
52
50
53
- public DefaultEventHandler (
54
- ResourceController controller , ControllerConfiguration configuration , MixedOperation client ) {
51
+ public DefaultEventHandler (ConfiguredController <R > controller ) {
55
52
this (
56
- new EventDispatcher (controller , configuration , client ),
57
- configuration .getName (),
58
- GenericRetry .fromConfiguration (configuration .getRetryConfiguration ()),
59
- configuration .getConfigurationService ().concurrentReconciliationThreads (),
60
- configuration .getConfigurationService ().getTerminationTimeoutSeconds (), configuration );
53
+ new EventDispatcher <>(controller ),
54
+ controller .getConfiguration ().getName (),
55
+ GenericRetry .fromConfiguration (controller .getConfiguration ().getRetryConfiguration ()),
56
+ controller .getConfiguration ().getConfigurationService ().concurrentReconciliationThreads (),
57
+ controller .getConfiguration ().getConfigurationService ().getTerminationTimeoutSeconds (),
58
+ controller .getConfiguration ());
61
59
}
62
60
63
61
DefaultEventHandler (
64
- EventDispatcher eventDispatcher ,
62
+ EventDispatcher < R > eventDispatcher ,
65
63
String relatedControllerName ,
66
64
Retry retry ,
67
65
int concurrentReconciliationThreads , ControllerConfiguration configuration ) {
@@ -74,7 +72,7 @@ public DefaultEventHandler(
74
72
}
75
73
76
74
private DefaultEventHandler (
77
- EventDispatcher eventDispatcher ,
75
+ EventDispatcher < R > eventDispatcher ,
78
76
String relatedControllerName ,
79
77
Retry retry ,
80
78
int concurrentReconciliationThreads ,
@@ -104,7 +102,7 @@ public void close() {
104
102
}
105
103
}
106
104
107
- public void setEventSourceManager (DefaultEventSourceManager eventSourceManager ) {
105
+ public void setEventSourceManager (DefaultEventSourceManager < R > eventSourceManager ) {
108
106
this .eventSourceManager = eventSourceManager ;
109
107
}
110
108
@@ -159,7 +157,7 @@ private RetryInfo retryInfo(String customResourceUid) {
159
157
}
160
158
161
159
void eventProcessingFinished (
162
- ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
160
+ ExecutionScope < R > executionScope , PostExecutionControl postExecutionControl ) {
163
161
try {
164
162
lock .lock ();
165
163
log .debug (
@@ -223,7 +221,7 @@ private void handleRetryOnException(ExecutionScope executionScope) {
223
221
});
224
222
}
225
223
226
- private void markSuccessfulExecutionRegardingRetry (ExecutionScope executionScope ) {
224
+ private void markSuccessfulExecutionRegardingRetry (ExecutionScope < R > executionScope ) {
227
225
log .debug (
228
226
"Marking successful execution for resource: {}" ,
229
227
getName (executionScope .getCustomResource ()));
@@ -233,7 +231,7 @@ private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope
233
231
.cancelOnceSchedule (executionScope .getCustomResourceUid ());
234
232
}
235
233
236
- private RetryExecution getOrInitRetryExecution (ExecutionScope executionScope ) {
234
+ private RetryExecution getOrInitRetryExecution (ExecutionScope < R > executionScope ) {
237
235
RetryExecution retryExecution = retryState .get (executionScope .getCustomResourceUid ());
238
236
if (retryExecution == null ) {
239
237
retryExecution = retry .initExecution ();
@@ -259,9 +257,9 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) {
259
257
* would override an additional change coming from a different client.
260
258
*/
261
259
private void cacheUpdatedResourceIfChanged (
262
- ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
260
+ ExecutionScope < R > executionScope , PostExecutionControl postExecutionControl ) {
263
261
if (postExecutionControl .customResourceUpdatedDuringExecution ()) {
264
- CustomResource originalCustomResource = executionScope .getCustomResource ();
262
+ R originalCustomResource = executionScope .getCustomResource ();
265
263
CustomResource customResourceAfterExecution =
266
264
postExecutionControl .getUpdatedCustomResource ().get ();
267
265
String originalResourceVersion = getVersion (originalCustomResource );
0 commit comments