Skip to content

Commit 94232ca

Browse files
committed
simplified scheduling algorithm
1 parent 2ca3fa0 commit 94232ca

File tree

3 files changed

+2
-81
lines changed

3 files changed

+2
-81
lines changed

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ class EventConsumer implements Runnable {
2020

2121
@Override
2222
public void run() {
23-
boolean stillScheduledForProcessing = eventScheduler.eventProcessingStarted(event);
24-
if (!stillScheduledForProcessing) {
25-
return;
26-
}
2723
if (processEvent()) {
2824
eventScheduler.eventProcessingFinishedSuccessfully(event);
2925
} else {

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.slf4j.LoggerFactory;
1010

1111
import java.util.Optional;
12-
import java.util.concurrent.ScheduledFuture;
1312
import java.util.concurrent.ScheduledThreadPoolExecutor;
1413
import java.util.concurrent.ThreadFactory;
1514
import java.util.concurrent.TimeUnit;
@@ -100,47 +99,17 @@ void scheduleEvent(CustomResourceEvent event) {
10099
eventStore.addOrReplaceEventAsNotScheduledYet(event);
101100
return;
102101
}
103-
if (eventStore.containsEventScheduledForProcessing(event.resourceUid())) {
104-
EventStore.ResourceScheduleHolder scheduleHolder = eventStore.getEventScheduledForProcessing(event.resourceUid());
105-
CustomResourceEvent scheduledEvent = scheduleHolder.getCustomResourceEvent();
106-
ScheduledFuture<?> scheduledFuture = scheduleHolder.getScheduledFuture();
107-
// If newEvent is newer than existing in queue, cancel and remove queuedEvent
108-
if (event.isSameResourceAndNewerVersion(scheduledEvent)) {
109-
log.debug("Scheduled event canceled because incoming event is newer. Discarded: {}, New: {}", scheduledEvent, event);
110-
scheduledFuture.cancel(false);
111-
eventStore.removeEventScheduledForProcessingVersionAware(scheduledEvent.resourceUid());
112-
}
113-
}
114-
115102
Optional<Long> nextBackOff = event.nextBackOff();
116103
if (!nextBackOff.isPresent()) {
117104
log.warn("Event limited max retry limit ({}), will be discarded. {}", MAX_RETRY_COUNT, event);
118105
return;
119106
}
120107
log.debug("Creating scheduled task for event: {}", event);
121-
ScheduledFuture<?> scheduledTask = executor.schedule(new EventConsumer(event, eventDispatcher, this),
108+
executor.schedule(new EventConsumer(event, eventDispatcher, this),
122109
nextBackOff.get(), TimeUnit.MILLISECONDS);
123-
eventStore.addEventScheduledForProcessing(new EventStore.ResourceScheduleHolder(event, scheduledTask));
124-
} finally {
125-
log.info("Scheduling event finished: {}", event);
126-
lock.unlock();
127-
}
128-
}
129-
130-
boolean eventProcessingStarted(CustomResourceEvent event) {
131-
try {
132-
lock.lock();
133-
EventStore.ResourceScheduleHolder res = eventStore.removeEventScheduledForProcessingVersionAware(event);
134-
if (res == null) {
135-
// Double checking if the event is still scheduled. This is a corner case, but can actually happen.
136-
// In detail: it can happen that we scheduled an event for processing, it took some time that is was picked
137-
// by executor, and it was removed during that time from the schedule but not cancelled yet. So to be correct
138-
// this should be checked also here. In other word scheduleEvent function can run in parallel with eventDispatcher.
139-
return false;
140-
}
141110
eventStore.addEventUnderProcessing(event);
142-
return true;
143111
} finally {
112+
log.info("Scheduling event finished: {}", event);
144113
lock.unlock();
145114
}
146115
}

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import java.util.HashMap;
77
import java.util.Map;
8-
import java.util.concurrent.ScheduledFuture;
98

109
public class EventStore {
1110

@@ -14,7 +13,6 @@ public class EventStore {
1413
private final Map<String, Long> lastResourceVersion = new HashMap<>();
1514

1615
private final Map<String, CustomResourceEvent> eventsNotScheduledYet = new HashMap<>();
17-
private final Map<String, ResourceScheduleHolder> eventsScheduledForProcessing = new HashMap<>();
1816
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
1917

2018
public boolean containsOlderVersionOfNotScheduledEvent(CustomResourceEvent newEvent) {
@@ -35,35 +33,11 @@ public boolean containsOlderVersionOfEventUnderProcessing(CustomResourceEvent ne
3533
newEvent.isSameResourceAndNewerVersion(eventsUnderProcessing.get(newEvent.resourceUid()));
3634
}
3735

38-
public boolean containsEventScheduledForProcessing(String uid) {
39-
return eventsScheduledForProcessing.containsKey(uid);
40-
}
4136

4237
public void addEventUnderProcessing(CustomResourceEvent event) {
4338
eventsUnderProcessing.put(event.resourceUid(), event);
4439
}
4540

46-
public ResourceScheduleHolder getEventScheduledForProcessing(String uid) {
47-
return eventsScheduledForProcessing.get(uid);
48-
}
49-
50-
public ResourceScheduleHolder removeEventScheduledForProcessingVersionAware(String uid) {
51-
return eventsScheduledForProcessing.remove(uid);
52-
}
53-
54-
public ResourceScheduleHolder removeEventScheduledForProcessingVersionAware(CustomResourceEvent customResourceEvent) {
55-
ResourceScheduleHolder holder = eventsScheduledForProcessing.get(customResourceEvent.resourceUid());
56-
if (holder.getCustomResourceEvent().getResource().getMetadata().getResourceVersion().equals(customResourceEvent.getResource().getMetadata().getResourceVersion())) {
57-
return removeEventScheduledForProcessingVersionAware(customResourceEvent.resourceUid());
58-
} else {
59-
return null;
60-
}
61-
}
62-
63-
public void addEventScheduledForProcessing(ResourceScheduleHolder resourceScheduleHolder) {
64-
eventsScheduledForProcessing.put(resourceScheduleHolder.getCustomResourceEvent().resourceUid(), resourceScheduleHolder);
65-
}
66-
6741
public CustomResourceEvent removeEventUnderProcessing(String uid) {
6842
return eventsUnderProcessing.remove(uid);
6943
}
@@ -88,22 +62,4 @@ public boolean processedNewerVersionBefore(CustomResourceEvent customResourceEve
8862
.getMetadata().getResourceVersion());
8963
}
9064
}
91-
92-
public static class ResourceScheduleHolder {
93-
private CustomResourceEvent customResourceEvent;
94-
private ScheduledFuture<?> scheduledFuture;
95-
96-
public ResourceScheduleHolder(CustomResourceEvent customResourceEvent, ScheduledFuture<?> scheduledFuture) {
97-
this.customResourceEvent = customResourceEvent;
98-
this.scheduledFuture = scheduledFuture;
99-
}
100-
101-
public CustomResourceEvent getCustomResourceEvent() {
102-
return customResourceEvent;
103-
}
104-
105-
public ScheduledFuture<?> getScheduledFuture() {
106-
return scheduledFuture;
107-
}
108-
}
10965
}

0 commit comments

Comments
 (0)