Skip to content

Commit 9f7bebb

Browse files
committed
scheduler algorithm improvements, unit tests
1 parent 586eb57 commit 9f7bebb

File tree

7 files changed

+240
-52
lines changed

7 files changed

+240
-52
lines changed

operator-framework/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,5 +66,11 @@
6666
<version>2.11.2</version>
6767
<scope>test</scope>
6868
</dependency>
69+
<dependency>
70+
<groupId>org.assertj</groupId>
71+
<artifactId>assertj-core</artifactId>
72+
<version>3.4.1</version>
73+
<scope>test</scope>
74+
</dependency>
6975
</dependencies>
7076
</project>

operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@
99

1010
import java.util.Optional;
1111

12-
class CustomResourceEvent {
12+
public class CustomResourceEvent {
1313

14-
public static final int MAX_RETRY_COUNT = 5;
14+
public static final long INITIAL_BACK_OFF_INTERVAL = 2000L;
1515

16+
public static final int MAX_RETRY_COUNT = 5;
17+
public static final double BACK_OFF_MULTIPLIER = 1.5;
1618
private final static Logger log = LoggerFactory.getLogger(CustomResourceEvent.class);
17-
18-
private final static BackOffExecution backOff = new ExponentialBackOff(2000L, 1.5).start();
19+
private final static BackOffExecution backOff = new ExponentialBackOff(INITIAL_BACK_OFF_INTERVAL, BACK_OFF_MULTIPLIER).start();
1920

2021
private final Watcher.Action action;
2122
private final CustomResource resource;
@@ -49,9 +50,7 @@ public String resourceUid() {
4950
}
5051

5152
public Boolean sameResourceAs(CustomResourceEvent otherEvent) {
52-
return getResource().getKind().equals(otherEvent.getResource().getKind()) &&
53-
getResource().getApiVersion().equals(otherEvent.getResource().getApiVersion()) &&
54-
getResource().getMetadata().getName().equals(otherEvent.getResource().getMetadata().getName());
53+
return getResource().getMetadata().getUid().equals(otherEvent.getResource().getMetadata().getUid());
5554
}
5655

5756
public Boolean isSameResourceAndNewerVersion(CustomResourceEvent otherEvent) {

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,24 @@
2323
* <ul>
2424
* <li>Only 1 event should be processed at a time for same custom resource
2525
* (metadata.name is the id, but kind and api should be taken into account)</li>
26-
* <li>Done - If event processing fails it should be rescheduled with retry - with limited number of retried
26+
* <li>If event processing fails it should be rescheduled with retry - with limited number of retried
2727
* and exponential time slacks (pluggable reschedule strategy in future?)</li>
2828
* <li>if there are multiple events received for the same resource process only the last one. (Others can be discarded)
2929
* User resourceVersion to check which is the latest. Put the new one at the and of the queue?
3030
* </li>
3131
* <li>Done - Avoid starvation, so on retry put back resource at the end of the queue.</li>
3232
* <li>The selecting event from a queue should not be naive. So for example:
3333
* If we cannot pick the last event because an event for that resource is currently processing just gor for the next one.
34-
* (Maybe is good to represent this queue with a list.) Or if and event is rescheduled
35-
* (skip if there is not enough time left since last execution)
3634
* </li>
37-
* <li>Impossible, scheduled chosen - Threading approach thus thread pool size and/or implementation should be configurable</li>
38-
* <li>see also: https://github.com/ContainerSolutions/java-operator-sdk/issues/34</li>
35+
* <li>Threading approach thus thread pool size and/or implementation should be configurable</li>
3936
* </ul>
4037
*
4138
* @param <R>
4239
*/
4340

44-
41+
/**
42+
*
43+
**/
4544
public class EventScheduler<R extends CustomResource> implements Watcher<R> {
4645

4746
private final static Logger log = LoggerFactory.getLogger(EventScheduler.class);
@@ -77,12 +76,11 @@ void scheduleEvent(CustomResourceEvent newEvent) {
7776
// we have to lock since the fabric8 client event handling is multi-threaded,
7877
// so in the following part could be a race condition when multiple events are received for same resource.
7978
lock.lock();
79+
if (eventStore.processedNewerVersionBefore(newEvent)) {
80+
log.debug("Skipping event processing since was processed event with newer version before. {}", newEvent);
81+
return;
82+
}
8083
if (newEvent.getAction() == Action.DELETED) {
81-
// this is a tricky situation, do we want to process only events which are marked for deletion?
82-
// or just ignore the problem. Note that marked for deletion event should already be the last event either
83-
// under processing, or scheduled for it.
84-
// There could be some corner case when we do have a event which we received before marked for deletion,
85-
// and did not received the marked for deletion, but this is such corner case that for sake of simplicity will ignore this.
8684
return;
8785
}
8886
// if there is an event waiting for to be scheduled we just replace that.

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public class EventStore {
1111

1212
private final static Logger log = LoggerFactory.getLogger(EventScheduler.class);
1313

14+
private final Map<String, Long> lastResourceVersion = new HashMap<>();
15+
1416
private final Map<String, CustomResourceEvent> eventsNotScheduledYet = new HashMap<>();
1517
private final Map<String, ResourceScheduleHolder> eventsScheduledForProcessing = new HashMap<>();
1618
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
@@ -24,8 +26,9 @@ public CustomResourceEvent removeEventNotScheduledYet(String uid) {
2426
return eventsNotScheduledYet.remove(uid);
2527
}
2628

27-
public void addOrReplaceEventAsNotScheduledYet(CustomResourceEvent newEvent) {
28-
eventsNotScheduledYet.put(newEvent.resourceUid(), newEvent);
29+
public void addOrReplaceEventAsNotScheduledYet(CustomResourceEvent event) {
30+
eventsNotScheduledYet.put(event.resourceUid(), event);
31+
updateLatestResourceVersionProcessed(event);
2932
}
3033

3134
public boolean containsOlderVersionOfEventUnderProcessing(CustomResourceEvent newEvent) {
@@ -37,6 +40,11 @@ public boolean containsEventScheduledForProcessing(String uid) {
3740
return eventsScheduledForProcessing.containsKey(uid);
3841
}
3942

43+
public void addEventUnderProcessing(CustomResourceEvent event) {
44+
eventsUnderProcessing.put(event.resourceUid(), event);
45+
updateLatestResourceVersionProcessed(event);
46+
}
47+
4048
public ResourceScheduleHolder getEventScheduledForProcessing(String uid) {
4149
return eventsScheduledForProcessing.get(uid);
4250
}
@@ -47,16 +55,31 @@ public ResourceScheduleHolder removeEventScheduledForProcessing(String uid) {
4755

4856
public void addEventScheduledForProcessing(ResourceScheduleHolder resourceScheduleHolder) {
4957
eventsScheduledForProcessing.put(resourceScheduleHolder.getCustomResourceEvent().resourceUid(), resourceScheduleHolder);
50-
}
51-
52-
public void addEventUnderProcessing(CustomResourceEvent event) {
53-
eventsUnderProcessing.put(event.resourceUid(), event);
58+
updateLatestResourceVersionProcessed(resourceScheduleHolder.getCustomResourceEvent());
5459
}
5560

5661
public CustomResourceEvent removeEventUnderProcessing(String uid) {
5762
return eventsUnderProcessing.remove(uid);
5863
}
5964

65+
private void updateLatestResourceVersionProcessed(CustomResourceEvent event) {
66+
Long current = lastResourceVersion.get(event.resourceUid());
67+
long received = Long.parseLong(event.getResource().getMetadata().getResourceVersion());
68+
if (current == null || received > current) {
69+
lastResourceVersion.put(event.resourceUid(), received);
70+
}
71+
}
72+
73+
public boolean processedNewerVersionBefore(CustomResourceEvent customResourceEvent) {
74+
Long lastVersionProcessed = lastResourceVersion.get(customResourceEvent.resourceUid());
75+
if (lastVersionProcessed == null) {
76+
return false;
77+
} else {
78+
return lastVersionProcessed > Long.parseLong(customResourceEvent.getResource()
79+
.getMetadata().getResourceVersion());
80+
}
81+
}
82+
6083
public static class ResourceScheduleHolder {
6184
private CustomResourceEvent customResourceEvent;
6285
private ScheduledFuture<?> scheduledFuture;

operator-framework/src/main/resources/log4j2.xml

Lines changed: 0 additions & 13 deletions
This file was deleted.

0 commit comments

Comments
 (0)