8
8
import io .javaoperatorsdk .operator .processing .event .DefaultEventSourceManager ;
9
9
import io .javaoperatorsdk .operator .processing .event .Event ;
10
10
import io .javaoperatorsdk .operator .processing .event .EventHandler ;
11
- import java .util .HashSet ;
12
- import java .util .Optional ;
13
- import java .util .Set ;
14
11
import io .javaoperatorsdk .operator .processing .retry .Retry ;
15
12
import io .javaoperatorsdk .operator .processing .retry .RetryExecution ;
16
- import org .slf4j .Logger ;
17
- import org .slf4j .LoggerFactory ;
18
-
19
13
import java .util .*;
14
+ import java .util .HashSet ;
15
+ import java .util .Optional ;
16
+ import java .util .Set ;
20
17
import java .util .concurrent .ScheduledThreadPoolExecutor ;
21
18
import java .util .concurrent .ThreadFactory ;
22
19
import java .util .concurrent .locks .ReentrantLock ;
23
20
import org .slf4j .Logger ;
24
21
import org .slf4j .LoggerFactory ;
25
22
26
- import static io .javaoperatorsdk .operator .EventListUtils .containsCustomResourceDeletedEvent ;
27
- import static io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getUID ;
28
- import static io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getVersion ;
29
-
30
23
/**
31
24
* Event handler that makes sure that events are processed in a "single threaded" way per resource
32
25
* UID, while buffering events which are received during an execution.
@@ -35,26 +28,32 @@ public class DefaultEventHandler implements EventHandler {
35
28
36
29
private static final Logger log = LoggerFactory .getLogger (DefaultEventHandler .class );
37
30
38
- private final CustomResourceCache customResourceCache ;
39
- private final EventBuffer eventBuffer ;
40
- private final Set <String > underProcessing = new HashSet <>();
41
- private final ScheduledThreadPoolExecutor executor ;
42
- private final EventDispatcher eventDispatcher ;
43
- private final Retry retry ;
44
- private final Map <String , RetryExecution > retryState = new HashMap <>();
45
- private DefaultEventSourceManager defaultEventSourceManager ;
31
+ private final CustomResourceCache customResourceCache ;
32
+ private final EventBuffer eventBuffer ;
33
+ private final Set <String > underProcessing = new HashSet <>();
34
+ private final ScheduledThreadPoolExecutor executor ;
35
+ private final EventDispatcher eventDispatcher ;
36
+ private final Retry retry ;
37
+ private final Map <String , RetryExecution > retryState = new HashMap <>();
38
+ private DefaultEventSourceManager defaultEventSourceManager ;
46
39
47
40
private final ReentrantLock lock = new ReentrantLock ();
48
41
49
- public DefaultEventHandler (CustomResourceCache customResourceCache , EventDispatcher eventDispatcher , String relatedControllerName ,
50
- Retry retry ) {
51
- this .customResourceCache = customResourceCache ;
52
- this .eventDispatcher = eventDispatcher ;
53
- this .retry = retry ;
54
- eventBuffer = new EventBuffer ();
55
- executor = new ScheduledThreadPoolExecutor (5 , new ThreadFactory () {
56
- @ Override
57
- public Thread newThread (Runnable runnable ) {
42
+ public DefaultEventHandler (
43
+ CustomResourceCache customResourceCache ,
44
+ EventDispatcher eventDispatcher ,
45
+ String relatedControllerName ,
46
+ Retry retry ) {
47
+ this .customResourceCache = customResourceCache ;
48
+ this .eventDispatcher = eventDispatcher ;
49
+ this .retry = retry ;
50
+ eventBuffer = new EventBuffer ();
51
+ executor =
52
+ new ScheduledThreadPoolExecutor (
53
+ 5 ,
54
+ new ThreadFactory () {
55
+ @ Override
56
+ public Thread newThread (Runnable runnable ) {
58
57
return new Thread (runnable , "EventHandler-" + relatedControllerName );
59
58
}
60
59
});
@@ -101,80 +100,90 @@ private void executeBufferedEvents(String customResourceUid) {
101
100
}
102
101
}
103
102
104
- void eventProcessingFinished (ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
105
- try {
106
- lock . lock ();
107
- log . debug ( "Event processing finished. Scope: {}" , executionScope );
108
- unsetUnderExecution ( executionScope . getCustomResourceUid () );
109
-
110
- if ( retry != null && postExecutionControl . exceptionDuringExecution ()) {
111
- handleRetryOnException ( executionScope , postExecutionControl );
112
- } else if ( retry != null ) {
113
- handleSuccessfulExecutionRegardingRetry ( executionScope );
114
- }
115
-
116
- if ( containsCustomResourceDeletedEvent ( executionScope . getEvents ())) {
117
- cleanupAfterDeletedEvent ( executionScope .getCustomResourceUid ());
118
- } else {
119
- cacheUpdatedResourceIfChanged ( executionScope , postExecutionControl );
120
- executeBufferedEvents (executionScope . getCustomResourceUid () );
121
- }
122
- } finally {
123
- lock . unlock ();
124
- }
103
+ void eventProcessingFinished (
104
+ ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
105
+ try {
106
+ lock . lock ( );
107
+ log . debug ( "Event processing finished. Scope: {}" , executionScope );
108
+ unsetUnderExecution ( executionScope . getCustomResourceUid ());
109
+
110
+ if ( retry != null && postExecutionControl . exceptionDuringExecution ()) {
111
+ handleRetryOnException ( executionScope , postExecutionControl );
112
+ } else if ( retry != null ) {
113
+ handleSuccessfulExecutionRegardingRetry ( executionScope );
114
+ }
115
+
116
+ if ( containsCustomResourceDeletedEvent ( executionScope .getEvents ())) {
117
+ cleanupAfterDeletedEvent ( executionScope . getCustomResourceUid ());
118
+ } else {
119
+ cacheUpdatedResourceIfChanged (executionScope , postExecutionControl );
120
+ executeBufferedEvents ( executionScope . getCustomResourceUid ());
121
+ }
122
+ } finally {
123
+ lock . unlock ();
125
124
}
125
+ }
126
126
127
- /**
128
- * Regarding the events there are 2 approaches we can take. Either retry always when there are new events (received meanwhile retry
129
- * is in place or already in buffer) instantly or always wait according to the retry timing if there was an exception.
130
- */
131
- private void handleRetryOnException (ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
132
- RetryExecution execution = getOrInitRetryExecution (executionScope );
133
- boolean newEventsExists = eventBuffer .newEventsExists (executionScope .getCustomResourceUid ());
134
- eventBuffer .putBackEvents (executionScope .getCustomResourceUid (), executionScope .getEvents ());
135
-
136
- Optional <Long > nextDelay = execution .nextDelay ();
137
- if (newEventsExists ) {
138
- executeBufferedEvents (executionScope .getCustomResourceUid ());
139
- return ;
140
- }
141
- nextDelay .ifPresent (delay ->
142
- defaultEventSourceManager .getRetryTimerEventSource ()
143
- .scheduleOnce (executionScope .getCustomResource (), delay ));
127
+ /**
128
+ * Regarding the events there are 2 approaches we can take. Either retry always when there are new
129
+ * events (received meanwhile retry is in place or already in buffer) instantly or always wait
130
+ * according to the retry timing if there was an exception.
131
+ */
132
+ private void handleRetryOnException (
133
+ ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
134
+ RetryExecution execution = getOrInitRetryExecution (executionScope );
135
+ boolean newEventsExists = eventBuffer .newEventsExists (executionScope .getCustomResourceUid ());
136
+ eventBuffer .putBackEvents (executionScope .getCustomResourceUid (), executionScope .getEvents ());
137
+
138
+ Optional <Long > nextDelay = execution .nextDelay ();
139
+ if (newEventsExists ) {
140
+ executeBufferedEvents (executionScope .getCustomResourceUid ());
141
+ return ;
144
142
}
143
+ nextDelay .ifPresent (
144
+ delay ->
145
+ defaultEventSourceManager
146
+ .getRetryTimerEventSource ()
147
+ .scheduleOnce (executionScope .getCustomResource (), delay ));
148
+ }
145
149
146
- private void handleSuccessfulExecutionRegardingRetry (ExecutionScope executionScope ) {
147
- retryState .remove (executionScope .getCustomResourceUid ());
148
- defaultEventSourceManager .getRetryTimerEventSource ().cancelOnceSchedule (executionScope .getCustomResourceUid ());
149
- }
150
+ private void handleSuccessfulExecutionRegardingRetry (ExecutionScope executionScope ) {
151
+ retryState .remove (executionScope .getCustomResourceUid ());
152
+ defaultEventSourceManager
153
+ .getRetryTimerEventSource ()
154
+ .cancelOnceSchedule (executionScope .getCustomResourceUid ());
155
+ }
150
156
151
- private RetryExecution getOrInitRetryExecution (ExecutionScope executionScope ) {
152
- RetryExecution retryExecution = retryState .get (executionScope .getCustomResourceUid ());
153
- if (retryExecution == null ) {
154
- retryExecution = retry .initExecution ();
155
- retryState .put (executionScope .getCustomResourceUid (), retryExecution );
156
- }
157
- return retryExecution ;
157
+ private RetryExecution getOrInitRetryExecution (ExecutionScope executionScope ) {
158
+ RetryExecution retryExecution = retryState .get (executionScope .getCustomResourceUid ());
159
+ if (retryExecution == null ) {
160
+ retryExecution = retry .initExecution ();
161
+ retryState .put (executionScope .getCustomResourceUid (), retryExecution );
158
162
}
163
+ return retryExecution ;
164
+ }
159
165
160
- /**
161
- * Here we try to cache the latest resource after an update. The goal is to solve a concurrency issue we've seen:
162
- * If an execution is finished, where we updated a custom resource, but there are other events already buffered for next
163
- * execution, we might not get the newest custom resource from CustomResource event source in time. Thus we execute
164
- * the next batch of events but with a non up to date CR. Here we cache the latest CustomResource from the update
165
- * execution so we make sure its already used in the up-coming execution.
166
- * <p>
167
- * Note that this is an improvement, not a bug fix. This situation can happen naturally, we just make the execution more
168
- * efficient, and avoid questions about conflicts.
169
- * <p>
170
- * Note that without the conditional locking in the cache, there is a very minor chance that we would override an
171
- * additional change coming from a different client.
172
- */
173
- private void cacheUpdatedResourceIfChanged (ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
174
- if (postExecutionControl .customResourceUpdatedDuringExecution ()) {
175
- CustomResource originalCustomResource = executionScope .getCustomResource ();
176
- CustomResource customResourceAfterExecution = postExecutionControl .getUpdatedCustomResource ().get ();
177
- String originalResourceVersion = getVersion (originalCustomResource );
166
+ /**
167
+ * Here we try to cache the latest resource after an update. The goal is to solve a concurrency
168
+ * issue we've seen: If an execution is finished, where we updated a custom resource, but there
169
+ * are other events already buffered for next execution, we might not get the newest custom
170
+ * resource from CustomResource event source in time. Thus we execute the next batch of events but
171
+ * with a non up to date CR. Here we cache the latest CustomResource from the update execution so
172
+ * we make sure its already used in the up-coming execution.
173
+ *
174
+ * <p>Note that this is an improvement, not a bug fix. This situation can happen naturally, we
175
+ * just make the execution more efficient, and avoid questions about conflicts.
176
+ *
177
+ * <p>Note that without the conditional locking in the cache, there is a very minor chance that we
178
+ * would override an additional change coming from a different client.
179
+ */
180
+ private void cacheUpdatedResourceIfChanged (
181
+ ExecutionScope executionScope , PostExecutionControl postExecutionControl ) {
182
+ if (postExecutionControl .customResourceUpdatedDuringExecution ()) {
183
+ CustomResource originalCustomResource = executionScope .getCustomResource ();
184
+ CustomResource customResourceAfterExecution =
185
+ postExecutionControl .getUpdatedCustomResource ().get ();
186
+ String originalResourceVersion = getVersion (originalCustomResource );
178
187
179
188
log .debug (
180
189
"Trying to update resource cache from update response for resource uid: {} new version: {} old version: {}" ,
0 commit comments