Skip to content

Commit 0b5dae5

Browse files
committed
new approach of scheduling
1 parent a66e463 commit 0b5dae5

File tree

4 files changed

+115
-54
lines changed

4 files changed

+115
-54
lines changed

operator-framework/src/main/java/com/github/containersolutions/operator/CustomResourceEvent.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
77

8+
import java.util.Objects;
89
import java.util.Optional;
910

1011
class CustomResourceEvent {
@@ -27,14 +28,6 @@ public CustomResource getResource() {
2728
return resource;
2829
}
2930

30-
void incrementRetryCounter() {
31-
this.retriesCount++;
32-
}
33-
34-
Integer getRetriesCount() {
35-
return retriesCount;
36-
}
37-
3831
public String getEventInfo() {
3932
CustomResource resource = this.getResource();
4033
return new StringBuilder().
@@ -45,13 +38,31 @@ public String getEventInfo() {
4538

4639
}
4740

41+
@Override
42+
public boolean equals(Object o) {
43+
if (this == o) return true;
44+
if (o == null || getClass() != o.getClass()) return false;
45+
CustomResourceEvent that = (CustomResourceEvent) o;
46+
// note that action is not interesting for us since, we schedule only modified and added events, and we decide
47+
// between those inside dispatcher not based on fabric8 client
48+
return sameResourceAs(that);
49+
}
50+
51+
@Override
52+
public int hashCode() {
53+
// todo this probably needs to be improved use the api version + kind + name
54+
return Objects.hash(resource);
55+
}
56+
57+
public Boolean sameResourceAs(CustomResourceEvent otherEvent) {
58+
return getResource().getKind().equals(otherEvent.getResource().getKind()) &&
59+
getResource().getApiVersion().equals(otherEvent.getResource().getApiVersion()) &&
60+
getResource().getMetadata().getName().equals(otherEvent.getResource().getMetadata().getName());
61+
}
62+
4863
public Boolean isSameResourceAndNewerGeneration(CustomResourceEvent otherEvent) {
49-
return (
50-
getResource().getKind().equals(otherEvent.getResource().getKind()) &&
51-
getResource().getApiVersion().equals(otherEvent.getResource().getApiVersion()) &&
52-
getResource().getMetadata().getName().equals(otherEvent.getResource().getMetadata().getName()) &&
53-
getResource().getMetadata().getGeneration() > otherEvent.getResource().getMetadata().getGeneration()
54-
);
64+
return sameResourceAs(otherEvent) &&
65+
getResource().getMetadata().getGeneration() > otherEvent.getResource().getMetadata().getGeneration();
5566

5667
}
5768

operator-framework/src/main/java/com/github/containersolutions/operator/EventConsumer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,17 @@ class EventConsumer implements Runnable {
2020

2121
@Override
2222
public void run() {
23-
if (processEvent()) return;
24-
this.eventScheduler.retryFailedEvent(this.event);
23+
eventScheduler.eventProcessingStarted(event);
24+
if (processEvent()) {
25+
eventScheduler.eventProcessingFinishedSuccessfully(event);
26+
} else {
27+
this.eventScheduler.eventProcessingFailed(event);
28+
}
2529
}
2630

2731
@SuppressWarnings("unchecked")
2832
private boolean processEvent() {
33+
2934
Watcher.Action action = event.getAction();
3035
CustomResource resource = event.getResource();
3136
log.info("Processing event {}", event.getEventInfo());
@@ -35,6 +40,7 @@ private boolean processEvent() {
3540
log.error("Processing event {} failed.", event.getEventInfo(), e);
3641
return false;
3742
}
43+
3844
return true;
3945
}
4046
}

operator-framework/src/main/java/com/github/containersolutions/operator/EventScheduler.java

Lines changed: 79 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
import org.springframework.util.backoff.BackOffExecution;
1111
import org.springframework.util.backoff.ExponentialBackOff;
1212

13-
import java.util.HashMap;
14-
import java.util.Map;
13+
import java.util.*;
1514
import java.util.concurrent.*;
1615
import java.util.concurrent.atomic.AtomicBoolean;
1716
import java.util.concurrent.locks.ReentrantLock;
@@ -43,14 +42,18 @@
4342

4443
public class EventScheduler<R extends CustomResource> implements Watcher<R> {
4544

46-
// todo this if this is 0 its just works? We want also limit the number of retries etc
47-
private final static ExponentialBackOff backOff = new ExponentialBackOff(0L, 1.5);
45+
// todo limit number of back offs
46+
private final static ExponentialBackOff backOff = new ExponentialBackOff(2000L, 1.5);
4847

4948
private final static Logger log = LoggerFactory.getLogger(EventScheduler.class);
5049
private final EventDispatcher eventDispatcher;
5150
private final ScheduledThreadPoolExecutor executor;
5251
private final HashMap<CustomResourceEvent, BackOffExecution> backoffSchedulerCache = new HashMap<>();
53-
private final Map<CustomResourceEvent, ScheduledFuture<?>> eventCache = new ConcurrentHashMap<>();
52+
53+
private final Set<CustomResourceEvent> eventsNotScheduledYet = Collections.synchronizedSet(new HashSet<>());
54+
private final Map<CustomResourceEvent, ScheduledFuture<?>> eventsScheduledForProcessing = new ConcurrentHashMap<>();
55+
private final Set<CustomResourceEvent> eventsUnderProcessing = Collections.synchronizedSet(new HashSet<>());
56+
5457
private AtomicBoolean processingEnabled = new AtomicBoolean(false);
5558
private ReentrantLock lock = new ReentrantLock();
5659

@@ -90,44 +93,85 @@ void scheduleEvent(CustomResourceEvent newEvent) {
9093
// we have to lock since the fabric8 client event handling is multi-threaded,
9194
// so in the following part could be a race condition when multiple events are received for same resource.
9295
lock.lock();
93-
AtomicBoolean scheduleEvent = new AtomicBoolean(true);
94-
eventCache
95-
.entrySet()
96-
.parallelStream()
97-
.forEach(entry -> {
98-
CustomResourceEvent queuedEvent = entry.getKey();
99-
ScheduledFuture<?> scheduledFuture = entry.getValue();
100-
// Cleaning cache
101-
if (scheduledFuture.isDone() || scheduledFuture.isCancelled()) {
102-
log.debug("Event dropped from cache because is done or cancelled. [{}]", queuedEvent.getEventInfo());
103-
eventCache.remove(queuedEvent, scheduledFuture);
104-
}
105-
// If newEvent is newer than existing in queue, cancel and remove queuedEvent
106-
if (newEvent.isSameResourceAndNewerGeneration(queuedEvent)) {
107-
log.debug("Queued event canceled because incoming event is newer. [{}]", queuedEvent.getEventInfo());
108-
scheduledFuture.cancel(false);
109-
eventCache.remove(queuedEvent, scheduledFuture);
110-
}
111-
// If newEvent is older than existing in queue, don't schedule and remove from cache
112-
if (queuedEvent.isSameResourceAndNewerGeneration(newEvent)) {
113-
log.debug("Incoming event canceled because queued event is newer. [{}]", newEvent.getEventInfo());
114-
// todo this is not in cache at this point, or? (ask Marek)
115-
eventCache.remove(newEvent);
116-
scheduleEvent.set(false);
117-
}
118-
});
119-
if (!scheduleEvent.get()) return;
96+
97+
// if there is an event waiting for to be scheduled we just replace that.
98+
if (eventsNotScheduledYet.contains(newEvent)) {
99+
// although the objects equal in name and metadata the data itself can be different
100+
eventsNotScheduledYet.add(newEvent);
101+
} else if (eventsUnderProcessing.contains(newEvent)) {
102+
// we add new event that will be scheduled when previous processing finished
103+
eventsNotScheduledYet.add(newEvent);
104+
} else {
105+
AtomicBoolean scheduleEvent = new AtomicBoolean(true);
106+
if (eventsScheduledForProcessing.containsKey(newEvent)) {
107+
eventsScheduledForProcessing
108+
.entrySet()
109+
.parallelStream()
110+
.forEach(entry -> {
111+
CustomResourceEvent queuedEvent = entry.getKey();
112+
ScheduledFuture<?> scheduledFuture = entry.getValue();
113+
// Cleaning cache
114+
if (scheduledFuture.isDone() || scheduledFuture.isCancelled()) {
115+
log.debug("Event dropped from cache because is done or cancelled. [{}]", queuedEvent.getEventInfo());
116+
eventsScheduledForProcessing.remove(queuedEvent, scheduledFuture);
117+
}
118+
// If newEvent is newer than existing in queue, cancel and remove queuedEvent
119+
if (newEvent.isSameResourceAndNewerGeneration(queuedEvent)) {
120+
log.debug("Queued event canceled because incoming event is newer. [{}]", queuedEvent.getEventInfo());
121+
scheduledFuture.cancel(false);
122+
eventsScheduledForProcessing.remove(queuedEvent, scheduledFuture);
123+
}
124+
// If newEvent is older than existing in queue, don't schedule and remove from cache
125+
if (queuedEvent.isSameResourceAndNewerGeneration(newEvent)) {
126+
log.debug("Incoming event canceled because queued event is newer. [{}]", newEvent.getEventInfo());
127+
// todo this is not in cache at this point, or? (ask Marek)
128+
eventsScheduledForProcessing.remove(newEvent);
129+
scheduleEvent.set(false);
130+
}
131+
});
132+
}
133+
if (!scheduleEvent.get()) return;
134+
}
135+
120136
backoffSchedulerCache.put(newEvent, backOff.start());
121137
ScheduledFuture<?> scheduledTask = executor.schedule(new EventConsumer(newEvent, eventDispatcher, this),
122138
backoffSchedulerCache.get(newEvent).nextBackOff(), TimeUnit.MILLISECONDS);
123-
eventCache.put(newEvent, scheduledTask);
139+
eventsScheduledForProcessing.put(newEvent, scheduledTask);
124140
} finally {
125141
lock.unlock();
126142
}
127143
}
128144

129-
void retryFailedEvent(CustomResourceEvent event) {
130-
scheduleEvent(event);
145+
void eventProcessingStarted(CustomResourceEvent event) {
146+
try {
147+
lock.lock();
148+
eventsScheduledForProcessing.remove(event);
149+
eventsUnderProcessing.add(event);
150+
} finally {
151+
lock.unlock();
152+
}
153+
}
154+
155+
void eventProcessingFinishedSuccessfully(CustomResourceEvent event) {
156+
try {
157+
lock.lock();
158+
eventsUnderProcessing.remove(event);
159+
backoffSchedulerCache.remove(event);
160+
// todo schedule from not processed yet if such
161+
} finally {
162+
lock.unlock();
163+
}
164+
}
165+
166+
void eventProcessingFailed(CustomResourceEvent event) {
167+
try {
168+
lock.lock();
169+
eventsUnderProcessing.remove(event);
170+
// retry
171+
scheduleEvent(event);
172+
} finally {
173+
lock.unlock();
174+
}
131175
}
132176

133177
@Override

operator-framework/src/test/java/com/github/containersolutions/operator/EventConsumerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ void noRetryOnSuccess() {
1919
eventConsumer.run();
2020

2121
verify(eventDispatcher, times(1)).handleEvent(any(), any());
22-
verify(eventScheduler, times(0)).retryFailedEvent(customResourceEvent);
22+
verify(eventScheduler, times(0)).eventProcessingFailed(customResourceEvent);
2323
verify(customResourceEvent, times(1)).getResource();
2424
verify(customResourceEvent, times(1)).getAction();
2525

@@ -35,9 +35,9 @@ void retryOnFailure() {
3535
eventConsumer.run();
3636

3737
verify(eventDispatcher, times(1)).handleEvent(any(), any());
38-
verify(eventScheduler, times(1)).retryFailedEvent(customResourceEvent);
38+
verify(eventScheduler, times(1)).eventProcessingFailed(customResourceEvent);
3939
verify(customResourceEvent, times(1)).getResource();
4040
verify(customResourceEvent, times(1)).getAction();
4141

4242
}
43-
}
43+
}

0 commit comments

Comments
 (0)