Skip to content

Commit 94a0146

Browse files
authored
Merge pull request #76 from ContainerSolutions/scheduling-no-resource-version-compare
Scheduling no resource version compare
2 parents 7ef0302 + a6b31ed commit 94a0146

File tree

3 files changed

+27
-71
lines changed

3 files changed

+27
-71
lines changed

operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,7 @@ public String resourceUid() {
3232
return resource.getMetadata().getUid();
3333
}
3434

35-
public Boolean sameResourceAs(CustomResourceEvent otherEvent) {
36-
return getResource().getMetadata().getUid().equals(otherEvent.getResource().getMetadata().getUid());
37-
}
38-
39-
public Boolean isSameResourceAndNewerVersion(CustomResourceEvent otherEvent) {
40-
return sameResourceAs(otherEvent) &&
41-
Long.parseLong(getResource().getMetadata().getResourceVersion()) >
42-
Long.parseLong(otherEvent.getResource().getMetadata().getResourceVersion());
4335

44-
}
4536

4637
public Optional<Long> nextBackOff() {
4738
retryCount++;

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -79,25 +79,20 @@ void scheduleEvent(CustomResourceEvent event) {
7979
// Note that we always use finalizers, we want to process delete event just in corner case,
8080
// when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly).
8181
// We want to skip in case of finalizer was there since we don't want to execute delete method always at least 2x,
82-
// which would be the result if we don't skip here. (If there is no deletion timestamp if resource deleted without finalizer.
82+
// which would be the result if we don't skip here. (there is no deletion timestamp if resource deleted without finalizer.)
8383
log.debug("Skipping delete event since deletion timestamp is present on resource, so finalizer was in place.");
8484
return;
8585
}
86-
if (eventStore.receivedMoreRecentEventBefore(event)) {
87-
log.debug("Skipping event processing since was processed event with newer version before. {}", event);
88-
return;
89-
}
90-
eventStore.updateLatestResourceVersionReceived(event);
91-
92-
if (eventStore.containsOlderVersionOfNotScheduledEvent(event)) {
93-
log.debug("Replacing event which is not scheduled yet, since incoming event is more recent. new Event:{}", event);
94-
eventStore.addOrReplaceEventAsNotScheduledYet(event);
86+
if (eventStore.containsNotScheduledEvent(event.resourceUid())) {
87+
log.debug("Replacing not scheduled event with actual event." +
88+
" New event: {}", event);
89+
eventStore.addOrReplaceEventAsNotScheduled(event);
9590
return;
9691
}
97-
if (eventStore.containsOlderVersionOfEventUnderProcessing(event)) {
92+
if (eventStore.containsEventUnderProcessing(event.resourceUid())) {
9893
log.debug("Scheduling event for later processing since there is an event under processing for same kind." +
9994
" New event: {}", event);
100-
eventStore.addOrReplaceEventAsNotScheduledYet(event);
95+
eventStore.addOrReplaceEventAsNotScheduled(event);
10196
return;
10297
}
10398

@@ -107,9 +102,9 @@ void scheduleEvent(CustomResourceEvent event) {
107102
return;
108103
}
109104
log.debug("Creating scheduled task for event: {}", event);
105+
eventStore.addEventUnderProcessing(event);
110106
executor.schedule(new EventConsumer(event, eventDispatcher, this),
111107
nextBackOff.get(), TimeUnit.MILLISECONDS);
112-
eventStore.addEventUnderProcessing(event);
113108
} finally {
114109
log.debug("Scheduling event finished: {}", event);
115110
lock.unlock();
@@ -119,10 +114,11 @@ void scheduleEvent(CustomResourceEvent event) {
119114
void eventProcessingFinishedSuccessfully(CustomResourceEvent event) {
120115
try {
121116
lock.lock();
117+
log.debug("Event processing successful for event: {}", event);
122118
eventStore.removeEventUnderProcessing(event.resourceUid());
123-
CustomResourceEvent notScheduledYetEvent = eventStore.removeEventNotScheduledYet(event.resourceUid());
124-
if (notScheduledYetEvent != null) {
125-
scheduleEvent(notScheduledYetEvent);
119+
if (eventStore.containsNotScheduledEvent(event.resourceUid())) {
120+
log.debug("Scheduling recent event for processing processing: {}", event);
121+
scheduleEvent(eventStore.removeEventNotScheduled(event.resourceUid()));
126122
}
127123
} finally {
128124
lock.unlock();
@@ -133,15 +129,13 @@ void eventProcessingFailed(CustomResourceEvent event) {
133129
try {
134130
lock.lock();
135131
eventStore.removeEventUnderProcessing(event.resourceUid());
136-
CustomResourceEvent notScheduledYetEvent = eventStore.removeEventNotScheduledYet(event.resourceUid());
137-
if (notScheduledYetEvent != null) {
138-
if (!notScheduledYetEvent.isSameResourceAndNewerVersion(event)) {
139-
log.warn("The not yet scheduled event has older version then actual event. This is probably a bug.");
140-
}
141-
// this is the case when we failed processing an event but we already received a new one.
142-
// Since since we process declarative resources it correct to schedule the new event.
143-
scheduleEvent(notScheduledYetEvent);
132+
if (eventStore.containsNotScheduledEvent(event.resourceUid())) {
133+
CustomResourceEvent notScheduledEvent = eventStore.removeEventNotScheduled(event.resourceUid());
134+
log.debug("Event processing failed. Scheduling the most recent event. Failed event: {}," +
135+
" Most recent event: {}", event, notScheduledEvent);
136+
scheduleEvent(notScheduledEvent);
144137
} else {
138+
log.debug("Event processing failed. Attempting to re-schedule the event: {}", event);
145139
scheduleEvent(event);
146140
}
147141
} finally {
Lines changed: 9 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,29 @@
11
package com.github.containersolutions.operator.processing;
22

3-
import org.slf4j.Logger;
4-
import org.slf4j.LoggerFactory;
5-
63
import java.util.HashMap;
74
import java.util.Map;
85

96
public class EventStore {
107

11-
private final static Logger log = LoggerFactory.getLogger(EventStore.class);
12-
13-
private final Map<String, Long> lastResourceVersion = new HashMap<>();
14-
private final Map<String, CustomResourceEvent> eventsNotScheduledYet = new HashMap<>();
8+
private final Map<String, CustomResourceEvent> eventsNotScheduled = new HashMap<>();
159
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
1610

17-
public boolean containsOlderVersionOfNotScheduledEvent(CustomResourceEvent newEvent) {
18-
return eventsNotScheduledYet.containsKey(newEvent.resourceUid()) &&
19-
newEvent.isSameResourceAndNewerVersion(eventsNotScheduledYet.get(newEvent.resourceUid()));
11+
public boolean containsNotScheduledEvent(String uuid) {
12+
return eventsNotScheduled.containsKey(uuid);
2013
}
2114

22-
public CustomResourceEvent removeEventNotScheduledYet(String uid) {
23-
return eventsNotScheduledYet.remove(uid);
15+
public CustomResourceEvent removeEventNotScheduled(String uid) {
16+
return eventsNotScheduled.remove(uid);
2417
}
2518

26-
public void addOrReplaceEventAsNotScheduledYet(CustomResourceEvent event) {
27-
eventsNotScheduledYet.put(event.resourceUid(), event);
19+
public void addOrReplaceEventAsNotScheduled(CustomResourceEvent event) {
20+
eventsNotScheduled.put(event.resourceUid(), event);
2821
}
2922

30-
public boolean containsOlderVersionOfEventUnderProcessing(CustomResourceEvent newEvent) {
31-
return eventsUnderProcessing.containsKey(newEvent.resourceUid()) &&
32-
newEvent.isSameResourceAndNewerVersion(eventsUnderProcessing.get(newEvent.resourceUid()));
23+
public boolean containsEventUnderProcessing(String uuid) {
24+
return eventsUnderProcessing.containsKey(uuid);
3325
}
3426

35-
3627
public void addEventUnderProcessing(CustomResourceEvent event) {
3728
eventsUnderProcessing.put(event.resourceUid(), event);
3829
}
@@ -41,24 +32,4 @@ public CustomResourceEvent removeEventUnderProcessing(String uid) {
4132
return eventsUnderProcessing.remove(uid);
4233
}
4334

44-
public void updateLatestResourceVersionReceived(CustomResourceEvent event) {
45-
Long current = lastResourceVersion.get(event.resourceUid());
46-
long received = Long.parseLong(event.getResource().getMetadata().getResourceVersion());
47-
if (current == null || received > current) {
48-
lastResourceVersion.put(event.resourceUid(), received);
49-
log.debug("Resource version for {} updated from {} to {}", event.getResource().getMetadata().getName(), current, received);
50-
} else {
51-
log.debug("Resource version for {} not updated from {}", event.getResource().getMetadata().getName(), current);
52-
}
53-
}
54-
55-
public boolean receivedMoreRecentEventBefore(CustomResourceEvent customResourceEvent) {
56-
Long lastVersionProcessed = lastResourceVersion.get(customResourceEvent.resourceUid());
57-
if (lastVersionProcessed == null) {
58-
return false;
59-
} else {
60-
return lastVersionProcessed > Long.parseLong(customResourceEvent.getResource()
61-
.getMetadata().getResourceVersion());
62-
}
63-
}
6435
}

0 commit comments

Comments
 (0)