Skip to content

Commit 3c92091

Browse files
committed
event store, test case enumeration
1 parent 3582751 commit 3c92091

File tree

4 files changed

+167
-63
lines changed

4 files changed

+167
-63
lines changed

operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@
44
import io.fabric8.kubernetes.client.Watcher;
55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
7+
import org.springframework.util.backoff.BackOffExecution;
8+
import org.springframework.util.backoff.ExponentialBackOff;
79

810
import java.util.Optional;
911

1012
class CustomResourceEvent {
13+
public static final int MAX_RETRY_COUNT = 5;
1114
private final static Logger log = LoggerFactory.getLogger(CustomResourceEvent.class);
1215

16+
private final static BackOffExecution backOff = new ExponentialBackOff(2000L, 1.5).start();
17+
1318
private final Watcher.Action action;
1419
private final CustomResource resource;
15-
private Integer retryIndex = 0;
20+
private Integer retryIndex = -1;
1621

1722
CustomResourceEvent(Watcher.Action action, CustomResource resource) {
1823
this.action = action;
@@ -54,8 +59,19 @@ public Boolean isSameResourceAndNewerVersion(CustomResourceEvent otherEvent) {
5459

5560
}
5661

57-
public Integer getRetryIndex() {
58-
return retryIndex;
62+
63+
public Optional<Long> nextBackOff() {
64+
if (retryIndex == -1) {
65+
retryIndex = 0;
66+
return Optional.of(0l);
67+
} else {
68+
if (retryIndex >= MAX_RETRY_COUNT - 1) {
69+
return Optional.empty();
70+
} else {
71+
retryIndex++;
72+
return Optional.of(backOff.nextBackOff());
73+
}
74+
}
5975
}
6076

6177
@Override

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

Lines changed: 36 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,16 @@
77
import io.fabric8.kubernetes.client.Watcher;
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
10-
import org.springframework.util.backoff.BackOffExecution;
11-
import org.springframework.util.backoff.ExponentialBackOff;
1210

13-
import java.util.HashMap;
14-
import java.util.Map;
11+
import java.util.Optional;
1512
import java.util.concurrent.ScheduledFuture;
1613
import java.util.concurrent.ScheduledThreadPoolExecutor;
1714
import java.util.concurrent.ThreadFactory;
1815
import java.util.concurrent.TimeUnit;
1916
import java.util.concurrent.locks.ReentrantLock;
2017

18+
import static com.github.containersolutions.operator.processing.CustomResourceEvent.MAX_RETRY_COUNT;
19+
2120

2221
/**
2322
* Requirements:
@@ -45,17 +44,11 @@
4544

4645
public class EventScheduler<R extends CustomResource> implements Watcher<R> {
4746

48-
// todo limit number of back offs
49-
private final static ExponentialBackOff backOff = new ExponentialBackOff(2000L, 1.5);
50-
5147
private final static Logger log = LoggerFactory.getLogger(EventScheduler.class);
48+
5249
private final EventDispatcher eventDispatcher;
5350
private final ScheduledThreadPoolExecutor executor;
54-
private final HashMap<CustomResourceEvent, BackOffExecution> backoffSchedulerCache = new HashMap<>();
55-
56-
private final Map<String, CustomResourceEvent> eventsNotScheduledYet = new HashMap<>();
57-
private final Map<String, ResourceScheduleHolder> eventsScheduledForProcessing = new HashMap<>();
58-
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
51+
private final EventStore eventStore = new EventStore();
5952

6053
private ReentrantLock lock = new ReentrantLock();
6154

@@ -86,46 +79,49 @@ void scheduleEvent(CustomResourceEvent newEvent) {
8679
lock.lock();
8780
if (newEvent.getAction() == Action.DELETED) {
8881
// this is a tricky situation, do we want to process only events which are marked for deletion?
89-
// or just ignore the problem
82+
// or just ignore the problem. Note that marked for deletion event should already be the last event either
83+
// under processing, or scheduled for it.
84+
// There could be some corner case when we do have a event which we received before marked for deletion,
85+
// and did not received the marked for deletion, but this is such corner case that for sake of simplicity will ignore this.
9086
return;
9187
}
9288
// if there is an event waiting for to be scheduled we just replace that.
93-
if (eventsNotScheduledYet.containsKey(newEvent.resourceUid()) &&
94-
newEvent.isSameResourceAndNewerVersion(eventsNotScheduledYet.get(newEvent.resourceUid()))) {
95-
log.debug("Replacing event which is not scheduled yet, since incoming event is more recent. new Event:{}"
96-
, newEvent);
97-
eventsNotScheduledYet.put(newEvent.resourceUid(), newEvent);
89+
if (eventStore.containsOlderVersionOfNotScheduledEvent(newEvent)) {
90+
log.debug("Replacing event which is not scheduled yet, since incoming event is more recent. new Event:{}", newEvent);
91+
eventStore.addOrReplaceEventAsNotScheduledYet(newEvent);
9892
return;
99-
} else if (eventsUnderProcessing.containsKey(newEvent.resourceUid()) &&
100-
newEvent.isSameResourceAndNewerVersion(eventsUnderProcessing.get(newEvent.resourceUid()))) {
93+
}
94+
if (eventStore.containsOlderVersionOfEventUnderProcessing(newEvent)) {
10195
log.debug("Scheduling event for later processing since there is an event under processing for same kind." +
10296
" New event: {}", newEvent);
103-
eventsNotScheduledYet.put(newEvent.resourceUid(), newEvent);
97+
eventStore.addOrReplaceEventAsNotScheduledYet(newEvent);
10498
return;
10599
}
106-
107-
if (eventsScheduledForProcessing.containsKey(newEvent.resourceUid())) {
108-
ResourceScheduleHolder scheduleHolder = eventsScheduledForProcessing.get(newEvent.resourceUid());
100+
if (eventStore.containsEventScheduledForProcessing(newEvent.resourceUid())) {
101+
EventStore.ResourceScheduleHolder scheduleHolder = eventStore.getEventScheduledForProcessing(newEvent.resourceUid());
109102
CustomResourceEvent scheduledEvent = scheduleHolder.getCustomResourceEvent();
110103
ScheduledFuture<?> scheduledFuture = scheduleHolder.getScheduledFuture();
111-
// If newEvent is newer than existing in queue, cancel and remove queuedEvent
112-
if (newEvent.isSameResourceAndNewerVersion(scheduledEvent)) {
113-
log.debug("Queued event canceled because incoming event is newer. [{}]", scheduledEvent);
114-
scheduledFuture.cancel(false);
115-
eventsScheduledForProcessing.remove(scheduledEvent.resourceUid());
116-
}
117104
// If newEvent is older than existing in queue, don't schedule and remove from cache
118105
if (scheduledEvent.isSameResourceAndNewerVersion(newEvent)) {
119-
log.debug("Incoming event discarded because queued event is newer. [{}]", newEvent);
106+
log.debug("Incoming event discarded because queued event is newer. {}", newEvent);
120107
return;
121108
}
109+
// If newEvent is newer than existing in queue, cancel and remove queuedEvent
110+
if (newEvent.isSameResourceAndNewerVersion(scheduledEvent)) {
111+
log.debug("Queued event canceled because incoming event is newer. {}", scheduledEvent);
112+
scheduledFuture.cancel(false);
113+
eventStore.removeEventScheduledForProcessing(scheduledEvent.resourceUid());
114+
}
122115
}
123116

124-
// todo handle backoff instances
125-
backoffSchedulerCache.put(newEvent, backOff.start());
117+
Optional<Long> nextBackOff = newEvent.nextBackOff();
118+
if (!nextBackOff.isPresent()) {
119+
log.warn("Event limited max retry limit ({}), will be discarded. {}", MAX_RETRY_COUNT, newEvent);
120+
return;
121+
}
126122
ScheduledFuture<?> scheduledTask = executor.schedule(new EventConsumer(newEvent, eventDispatcher, this),
127-
newEvent.getRetryIndex() < 1 ? 0 : backoffSchedulerCache.get(newEvent).nextBackOff(), TimeUnit.MILLISECONDS);
128-
eventsScheduledForProcessing.put(newEvent.resourceUid(), new ResourceScheduleHolder(newEvent, scheduledTask));
123+
nextBackOff.get(), TimeUnit.MILLISECONDS);
124+
eventStore.addEventScheduledForProcessing(new EventStore.ResourceScheduleHolder(newEvent, scheduledTask));
129125
} finally {
130126
lock.unlock();
131127
}
@@ -134,15 +130,15 @@ void scheduleEvent(CustomResourceEvent newEvent) {
134130
boolean eventProcessingStarted(CustomResourceEvent event) {
135131
try {
136132
lock.lock();
137-
ResourceScheduleHolder res = eventsScheduledForProcessing.remove(event.resourceUid());
133+
EventStore.ResourceScheduleHolder res = eventStore.removeEventScheduledForProcessing(event.resourceUid());
138134
if (res == null) {
139135
// if its still scheduled for processing.
140136
// note that it can happen that we scheduled an event for processing, it took some time that is was picked
141137
// by executor, and it was removed during that time from the schedule but not cancelled yet. So to be correct
142138
// this should be checked also here. In other word scheduleEvent function can run in parallel with eventDispatcher.
143139
return false;
144140
}
145-
eventsUnderProcessing.put(event.resourceUid(), event);
141+
eventStore.addEventUnderProcessing(event);
146142
return true;
147143
} finally {
148144
lock.unlock();
@@ -152,10 +148,8 @@ boolean eventProcessingStarted(CustomResourceEvent event) {
152148
void eventProcessingFinishedSuccessfully(CustomResourceEvent event) {
153149
try {
154150
lock.lock();
155-
eventsUnderProcessing.remove(event.resourceUid());
156-
backoffSchedulerCache.remove(event);
157-
158-
CustomResourceEvent notScheduledYetEvent = eventsNotScheduledYet.remove(event.resourceUid());
151+
eventStore.removeEventUnderProcessing(event.resourceUid());
152+
CustomResourceEvent notScheduledYetEvent = eventStore.removeEventNotScheduledYet(event.resourceUid());
159153
if (notScheduledYetEvent != null) {
160154
scheduleEvent(notScheduledYetEvent);
161155
}
@@ -167,7 +161,7 @@ void eventProcessingFinishedSuccessfully(CustomResourceEvent event) {
167161
void eventProcessingFailed(CustomResourceEvent event) {
168162
try {
169163
lock.lock();
170-
eventsUnderProcessing.remove(event);
164+
eventStore.removeEventUnderProcessing(event.resourceUid());
171165
scheduleEvent(event);
172166
} finally {
173167
lock.unlock();
@@ -179,24 +173,6 @@ void eventProcessingFailed(CustomResourceEvent event) {
179173
public void onClose(KubernetesClientException e) {
180174
// todo re apply the watch
181175
}
182-
183-
private static class ResourceScheduleHolder {
184-
private CustomResourceEvent customResourceEvent;
185-
private ScheduledFuture<?> scheduledFuture;
186-
187-
public ResourceScheduleHolder(CustomResourceEvent customResourceEvent, ScheduledFuture<?> scheduledFuture) {
188-
this.customResourceEvent = customResourceEvent;
189-
this.scheduledFuture = scheduledFuture;
190-
}
191-
192-
public CustomResourceEvent getCustomResourceEvent() {
193-
return customResourceEvent;
194-
}
195-
196-
public ScheduledFuture<?> getScheduledFuture() {
197-
return scheduledFuture;
198-
}
199-
}
200176
}
201177

202178

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.github.containersolutions.operator.processing;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import java.util.concurrent.ScheduledFuture;
9+
10+
public class EventStore {
11+
12+
private final static Logger log = LoggerFactory.getLogger(EventScheduler.class);
13+
14+
private final Map<String, CustomResourceEvent> eventsNotScheduledYet = new HashMap<>();
15+
private final Map<String, ResourceScheduleHolder> eventsScheduledForProcessing = new HashMap<>();
16+
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
17+
18+
public boolean containsOlderVersionOfNotScheduledEvent(CustomResourceEvent newEvent) {
19+
return eventsNotScheduledYet.containsKey(newEvent.resourceUid()) &&
20+
newEvent.isSameResourceAndNewerVersion(eventsNotScheduledYet.get(newEvent.resourceUid()));
21+
}
22+
23+
public CustomResourceEvent removeEventNotScheduledYet(String uid) {
24+
return eventsNotScheduledYet.remove(uid);
25+
}
26+
27+
public void addOrReplaceEventAsNotScheduledYet(CustomResourceEvent newEvent) {
28+
eventsNotScheduledYet.put(newEvent.resourceUid(), newEvent);
29+
}
30+
31+
public boolean containsOlderVersionOfEventUnderProcessing(CustomResourceEvent newEvent) {
32+
return eventsUnderProcessing.containsKey(newEvent.resourceUid()) &&
33+
newEvent.isSameResourceAndNewerVersion(eventsUnderProcessing.get(newEvent.resourceUid()));
34+
}
35+
36+
public boolean containsEventScheduledForProcessing(String uid) {
37+
return eventsScheduledForProcessing.containsKey(uid);
38+
}
39+
40+
public ResourceScheduleHolder getEventScheduledForProcessing(String uid) {
41+
return eventsScheduledForProcessing.get(uid);
42+
}
43+
44+
public ResourceScheduleHolder removeEventScheduledForProcessing(String uid) {
45+
return eventsScheduledForProcessing.remove(uid);
46+
}
47+
48+
public void addEventScheduledForProcessing(ResourceScheduleHolder resourceScheduleHolder) {
49+
eventsScheduledForProcessing.put(resourceScheduleHolder.getCustomResourceEvent().resourceUid(), resourceScheduleHolder);
50+
}
51+
52+
public void addEventUnderProcessing(CustomResourceEvent event) {
53+
eventsUnderProcessing.put(event.resourceUid(), event);
54+
}
55+
56+
public CustomResourceEvent removeEventUnderProcessing(String uid) {
57+
return eventsUnderProcessing.remove(uid);
58+
}
59+
60+
public static class ResourceScheduleHolder {
61+
private CustomResourceEvent customResourceEvent;
62+
private ScheduledFuture<?> scheduledFuture;
63+
64+
public ResourceScheduleHolder(CustomResourceEvent customResourceEvent, ScheduledFuture<?> scheduledFuture) {
65+
this.customResourceEvent = customResourceEvent;
66+
this.scheduledFuture = scheduledFuture;
67+
}
68+
69+
public CustomResourceEvent getCustomResourceEvent() {
70+
return customResourceEvent;
71+
}
72+
73+
public ScheduledFuture<?> getScheduledFuture() {
74+
return scheduledFuture;
75+
}
76+
}
77+
}

operator-framework/src/test/java/com/github/containersolutions/operator/EventSchedulerTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,41 @@ public void eventsAreNotExecutedConcurrentlyForSameResource() {
3232

3333
}
3434

35+
@Test
36+
public void retriesEventsWithErrors() {
37+
38+
}
39+
40+
@Test
41+
public void replacesEventWhichIsScheduledAfterCurrentProcessingEventIfNewerVersion() {
42+
43+
}
44+
45+
@Test
46+
public void discardsEventIfThereIsAlreadyNewerEventWaitingForExecution() {
47+
48+
}
49+
50+
@Test
51+
public void discardsEventIfNewerVersionIsAlreadyUnderProcessing() {
52+
53+
}
54+
55+
@Test
56+
public void schedulesEventIfOlderVersionIsAlreadyUnderProcessing() {
57+
58+
}
59+
60+
@Test
61+
public void retryDelayIncreasesExponentially() {
62+
63+
}
64+
65+
@Test
66+
public void numberOfRetriesIsLimited() {
67+
68+
}
69+
3570
private void waitMinimalTimeForExecution() {
3671
try {
3772
Thread.sleep(300);

0 commit comments

Comments
 (0)