2
2
3
3
4
4
import com .github .containersolutions .operator .processing .retry .Retry ;
5
- import com .google .common .util .concurrent .ThreadFactoryBuilder ;
6
5
import io .fabric8 .kubernetes .client .CustomResource ;
7
6
import io .fabric8 .kubernetes .client .KubernetesClientException ;
8
7
import io .fabric8 .kubernetes .client .Watcher ;
11
10
12
11
import java .util .Optional ;
13
12
import java .util .concurrent .ScheduledThreadPoolExecutor ;
14
- import java .util .concurrent .ThreadFactory ;
15
13
import java .util .concurrent .TimeUnit ;
16
14
import java .util .concurrent .locks .ReentrantLock ;
17
15
32
30
* <li>Threading approach thus thread pool size and/or implementation should be configurable</li>
33
31
* </ul>
34
32
* <p>
35
- * Notes:
36
- * <ul>
37
- * <li> In implementation we have to lock since the fabric8 client event handling is multi-threaded, we can receive multiple events
38
- * for same resource. Also we do callback from other threads.
39
- * </li>
40
- * </ul>
41
33
*/
42
34
43
35
public class EventScheduler implements Watcher <CustomResource > {
@@ -54,11 +46,7 @@ public class EventScheduler implements Watcher<CustomResource> {
54
46
public EventScheduler (EventDispatcher eventDispatcher , Retry retry ) {
55
47
this .eventDispatcher = eventDispatcher ;
56
48
this .retry = retry ;
57
- ThreadFactory threadFactory = new ThreadFactoryBuilder ()
58
- .setNameFormat ("event-consumer-%d" )
59
- .setDaemon (false )
60
- .build ();
61
- executor = new ScheduledThreadPoolExecutor (1 , threadFactory );
49
+ executor = new ScheduledThreadPoolExecutor (1 );
62
50
executor .setRemoveOnCancelPolicy (true );
63
51
}
64
52
@@ -67,14 +55,13 @@ public void eventReceived(Watcher.Action action, CustomResource resource) {
67
55
log .debug ("Event received for action: {}, {}: {}" , action .toString ().toLowerCase (), resource .getClass ().getSimpleName (),
68
56
resource .getMetadata ().getName ());
69
57
CustomResourceEvent event = new CustomResourceEvent (action , resource , retry );
70
- scheduleEvent (event );
58
+ scheduleEventFromApi (event );
71
59
}
72
60
73
- void scheduleEvent (CustomResourceEvent event ) {
74
- log .trace ("Current queue size {}" , executor .getQueue ().size ());
75
- log .debug ("Scheduling event: {}" , event );
61
+ void scheduleEventFromApi (CustomResourceEvent event ) {
76
62
try {
77
63
lock .lock ();
64
+ log .debug ("Scheduling event from Api: {}" , event );
78
65
if (event .getResource ().getMetadata ().getDeletionTimestamp () != null && event .getAction () == Action .DELETED ) {
79
66
// Note that we always use finalizers, we want to process delete event just in corner case,
80
67
// when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly).
@@ -95,30 +82,39 @@ void scheduleEvent(CustomResourceEvent event) {
95
82
eventStore .addOrReplaceEventAsNotScheduled (event );
96
83
return ;
97
84
}
85
+ scheduleEventForExecution (event );
86
+ log .trace ("Scheduling event from API finished: {}" , event );
87
+ } finally {
88
+ lock .unlock ();
89
+ }
90
+ }
98
91
92
+ private void scheduleEventForExecution (CustomResourceEvent event ) {
93
+ try {
94
+ lock .lock ();
95
+ log .trace ("Current queue size {}" , executor .getQueue ().size ());
96
+ log .debug ("Scheduling event for execution: {}" , event );
99
97
Optional <Long > nextBackOff = event .nextBackOff ();
100
98
if (!nextBackOff .isPresent ()) {
101
99
log .warn ("Event max retry limit reached. Will be discarded. {}" , event );
102
100
return ;
103
101
}
104
- log .debug ("Creating scheduled task for event: {}" , event );
105
102
eventStore .addEventUnderProcessing (event );
106
103
executor .schedule (new EventConsumer (event , eventDispatcher , this ),
107
104
nextBackOff .get (), TimeUnit .MILLISECONDS );
105
+ log .trace ("Scheduled task for event: {}" , event );
108
106
} finally {
109
- log .debug ("Scheduling event finished: {}" , event );
110
107
lock .unlock ();
111
108
}
112
109
}
113
110
114
111
void eventProcessingFinishedSuccessfully (CustomResourceEvent event ) {
115
112
try {
116
113
lock .lock ();
117
- log .debug ("Event processing successful for event: {}" , event );
118
114
eventStore .removeEventUnderProcessing (event .resourceUid ());
119
115
if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
120
- log .debug ("Scheduling recent event for processing processing : {}" , event );
121
- scheduleEvent ( eventStore . removeEventNotScheduled ( event .resourceUid () ));
116
+ log .debug ("Scheduling recent event for processing: {}" , event );
117
+ scheduleNotYetScheduledEventForExecution ( event .resourceUid ());
122
118
}
123
119
} finally {
124
120
lock .unlock ();
@@ -130,19 +126,22 @@ void eventProcessingFailed(CustomResourceEvent event) {
130
126
lock .lock ();
131
127
eventStore .removeEventUnderProcessing (event .resourceUid ());
132
128
if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
133
- CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (event .resourceUid ());
134
- log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}," +
135
- " Most recent event: {}" , event , notScheduledEvent );
136
- scheduleEvent (notScheduledEvent );
129
+ log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}" , event );
130
+ scheduleNotYetScheduledEventForExecution (event .resourceUid ());
137
131
} else {
138
132
log .debug ("Event processing failed. Attempting to re-schedule the event: {}" , event );
139
- scheduleEvent (event );
133
+ scheduleEventForExecution (event );
140
134
}
141
135
} finally {
142
136
lock .unlock ();
143
137
}
144
138
}
145
139
140
+ private void scheduleNotYetScheduledEventForExecution (String uuid ) {
141
+ CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (uuid );
142
+ scheduleEventForExecution (notScheduledEvent );
143
+ }
144
+
146
145
@ Override
147
146
public void onClose (KubernetesClientException e ) {
148
147
log .error ("Error: " , e );
0 commit comments