@@ -54,8 +54,6 @@ public class EventScheduler<R extends CustomResource> implements Watcher<R> {
54
54
private final ScheduledThreadPoolExecutor executor ;
55
55
private final HashMap <CustomResourceEvent , BackOffExecution > backoffSchedulerCache = new HashMap <>();
56
56
57
- // todo check uid for key
58
- // note that these hash maps does not needs to be concurrent, since we are already locking all methods where are used
59
57
private final Map <String , CustomResourceEvent > eventsNotScheduledYet = new HashMap <>();
60
58
private final Map <String , ResourceScheduleHolder > eventsScheduledForProcessing = new HashMap <>();
61
59
private final Map <String , CustomResourceEvent > eventsUnderProcessing = new HashMap <>();
@@ -102,29 +100,29 @@ void scheduleEvent(CustomResourceEvent newEvent) {
102
100
lock .lock ();
103
101
104
102
// if there is an event waiting for to be scheduled we just replace that.
105
- if (eventsNotScheduledYet .containsKey (newEvent .resourceKey ()) &&
106
- newEvent .isSameResourceAndNewerVersion (eventsNotScheduledYet .get (newEvent .resourceKey ()))) {
103
+ if (eventsNotScheduledYet .containsKey (newEvent .resourceUid ()) &&
104
+ newEvent .isSameResourceAndNewerVersion (eventsNotScheduledYet .get (newEvent .resourceUid ()))) {
107
105
log .debug ("Replacing event which is not scheduled yet, since incoming event is more recent. new Event:{}"
108
106
, newEvent );
109
- eventsNotScheduledYet .put (newEvent .resourceKey (), newEvent );
107
+ eventsNotScheduledYet .put (newEvent .resourceUid (), newEvent );
110
108
return ;
111
- } else if (eventsUnderProcessing .containsKey (newEvent .resourceKey ()) &&
112
- newEvent .isSameResourceAndNewerVersion (eventsUnderProcessing .get (newEvent .resourceKey ()))) {
109
+ } else if (eventsUnderProcessing .containsKey (newEvent .resourceUid ()) &&
110
+ newEvent .isSameResourceAndNewerVersion (eventsUnderProcessing .get (newEvent .resourceUid ()))) {
113
111
log .debug ("Scheduling event for later processing since there is an event under processing for same kind." +
114
112
" New event: {}" , newEvent );
115
- eventsNotScheduledYet .put (newEvent .resourceKey (), newEvent );
113
+ eventsNotScheduledYet .put (newEvent .resourceUid (), newEvent );
116
114
return ;
117
115
}
118
116
119
- if (eventsScheduledForProcessing .containsKey (newEvent .resourceKey ())) {
120
- ResourceScheduleHolder scheduleHolder = eventsScheduledForProcessing .get (newEvent .resourceKey ());
117
+ if (eventsScheduledForProcessing .containsKey (newEvent .resourceUid ())) {
118
+ ResourceScheduleHolder scheduleHolder = eventsScheduledForProcessing .get (newEvent .resourceUid ());
121
119
CustomResourceEvent scheduledEvent = scheduleHolder .getCustomResourceEvent ();
122
120
ScheduledFuture <?> scheduledFuture = scheduleHolder .getScheduledFuture ();
123
121
// If newEvent is newer than existing in queue, cancel and remove queuedEvent
124
122
if (newEvent .isSameResourceAndNewerVersion (scheduledEvent )) {
125
123
log .debug ("Queued event canceled because incoming event is newer. [{}]" , scheduledEvent );
126
124
scheduledFuture .cancel (false );
127
- eventsScheduledForProcessing .remove (scheduledEvent .resourceKey ());
125
+ eventsScheduledForProcessing .remove (scheduledEvent .resourceUid ());
128
126
}
129
127
// If newEvent is older than existing in queue, don't schedule and remove from cache
130
128
if (scheduledEvent .isSameResourceAndNewerVersion (newEvent )) {
@@ -135,7 +133,7 @@ void scheduleEvent(CustomResourceEvent newEvent) {
135
133
backoffSchedulerCache .put (newEvent , backOff .start ());
136
134
ScheduledFuture <?> scheduledTask = executor .schedule (new EventConsumer (newEvent , eventDispatcher , this ),
137
135
backoffSchedulerCache .get (newEvent ).nextBackOff (), TimeUnit .MILLISECONDS );
138
- eventsScheduledForProcessing .put (newEvent .resourceKey (), new ResourceScheduleHolder (newEvent , scheduledTask ));
136
+ eventsScheduledForProcessing .put (newEvent .resourceUid (), new ResourceScheduleHolder (newEvent , scheduledTask ));
139
137
} finally {
140
138
lock .unlock ();
141
139
}
@@ -144,15 +142,15 @@ void scheduleEvent(CustomResourceEvent newEvent) {
144
142
boolean eventProcessingStarted (CustomResourceEvent event ) {
145
143
try {
146
144
lock .lock ();
147
- ResourceScheduleHolder res = eventsScheduledForProcessing .remove (event .resourceKey ());
145
+ ResourceScheduleHolder res = eventsScheduledForProcessing .remove (event .resourceUid ());
148
146
if (res == null ) {
149
147
// if its still scheduled for processing.
150
148
// note that it can happen that we scheduled an event for processing, it took some time that is was picked
151
149
// by executor, and it was removed during that time from the schedule but not cancelled yet. So to be correct
152
150
// this should be checked also here. In other word scheduleEvent function can run in parallel with eventDispatcher.
153
151
return false ;
154
152
}
155
- eventsUnderProcessing .put (event .resourceKey (), event );
153
+ eventsUnderProcessing .put (event .resourceUid (), event );
156
154
return true ;
157
155
} finally {
158
156
lock .unlock ();
@@ -162,10 +160,10 @@ boolean eventProcessingStarted(CustomResourceEvent event) {
162
160
void eventProcessingFinishedSuccessfully (CustomResourceEvent event ) {
163
161
try {
164
162
lock .lock ();
165
- eventsUnderProcessing .remove (event .resourceKey ());
163
+ eventsUnderProcessing .remove (event .resourceUid ());
166
164
backoffSchedulerCache .remove (event );
167
165
168
- CustomResourceEvent notScheduledYetEvent = eventsNotScheduledYet .remove (event .resourceKey ());
166
+ CustomResourceEvent notScheduledYetEvent = eventsNotScheduledYet .remove (event .resourceUid ());
169
167
if (notScheduledYetEvent != null ) {
170
168
scheduleEvent (notScheduledYetEvent );
171
169
}
0 commit comments