Skip to content

Commit 1a920b5

Browse files
committed
improved algorithm, processing package, logging improvements
1 parent 0b5dae5 commit 1a920b5

File tree

16 files changed

+130
-89
lines changed

16 files changed

+130
-89
lines changed

operator-framework/src/main/java/com/github/containersolutions/operator/Operator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.github.containersolutions.operator;
22

33
import com.github.containersolutions.operator.api.ResourceController;
4+
import com.github.containersolutions.operator.processing.EventDispatcher;
5+
import com.github.containersolutions.operator.processing.EventScheduler;
46
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinition;
57
import io.fabric8.kubernetes.api.model.apiextensions.CustomResourceDefinitionList;
68
import io.fabric8.kubernetes.client.CustomResource;

operator-framework/src/main/java/com/github/containersolutions/operator/Context.java renamed to operator-framework/src/main/java/com/github/containersolutions/operator/api/Context.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.containersolutions.operator;
1+
package com.github.containersolutions.operator.api;
22

33
import io.fabric8.kubernetes.client.CustomResource;
44
import io.fabric8.kubernetes.client.CustomResourceDoneable;

operator-framework/src/main/java/com/github/containersolutions/operator/api/ResourceController.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.github.containersolutions.operator.api;
22

3-
import com.github.containersolutions.operator.Context;
43
import io.fabric8.kubernetes.client.CustomResource;
54

65
import java.util.Optional;

operator-framework/src/main/java/com/github/containersolutions/operator/api/ResourceControllerAdapter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.github.containersolutions.operator.api;
22

3-
import com.github.containersolutions.operator.Context;
43
import io.fabric8.kubernetes.client.CustomResource;
54

65
import java.util.Optional;

operator-framework/src/main/java/com/github/containersolutions/operator/CustomResourceEvent.java renamed to operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
package com.github.containersolutions.operator;
1+
package com.github.containersolutions.operator.processing;
22

33
import io.fabric8.kubernetes.client.CustomResource;
44
import io.fabric8.kubernetes.client.Watcher;
55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
77

8-
import java.util.Objects;
98
import java.util.Optional;
109

1110
class CustomResourceEvent {
1211
private final static Logger log = LoggerFactory.getLogger(CustomResourceEvent.class);
1312

1413
private final Watcher.Action action;
1514
private final CustomResource resource;
16-
private Integer retriesCount = 0;
15+
private Integer retriesIndex = 0;
1716

1817
CustomResourceEvent(Watcher.Action action, CustomResource resource) {
1918
this.action = action;
@@ -38,20 +37,8 @@ public String getEventInfo() {
3837

3938
}
4039

41-
@Override
42-
public boolean equals(Object o) {
43-
if (this == o) return true;
44-
if (o == null || getClass() != o.getClass()) return false;
45-
CustomResourceEvent that = (CustomResourceEvent) o;
46-
// note that action is not interesting for us since, we schedule only modified and added events, and we decide
47-
// between those inside dispatcher not based on fabric8 client
48-
return sameResourceAs(that);
49-
}
50-
51-
@Override
52-
public int hashCode() {
53-
// todo this probably needs to be improved use the api version + kind + name
54-
return Objects.hash(resource);
40+
public String resourceKey() {
41+
return resource.getKind() + "_" + resource.getMetadata().getName();
5542
}
5643

5744
public Boolean sameResourceAs(CustomResourceEvent otherEvent) {
@@ -66,5 +53,20 @@ public Boolean isSameResourceAndNewerGeneration(CustomResourceEvent otherEvent)
6653

6754
}
6855

56+
public Boolean isSameResourceAndNewerVersion(CustomResourceEvent otherEvent) {
57+
return sameResourceAs(otherEvent) &&
58+
Long.parseLong(getResource().getMetadata().getResourceVersion()) >
59+
Long.parseLong(otherEvent.getResource().getMetadata().getResourceVersion());
6960

61+
}
62+
63+
@Override
64+
public String toString() {
65+
return "CustomResourceEvent{" +
66+
"action=" + action +
67+
", resource=[ name=" + resource.getMetadata().getName() + ", kind=" + resource.getKind() +
68+
", apiVersion=" + resource.getApiVersion() + "]" +
69+
", retriesIndex=" + retriesIndex +
70+
'}';
71+
}
7072
}

operator-framework/src/main/java/com/github/containersolutions/operator/EventConsumer.java renamed to operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventConsumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.containersolutions.operator;
1+
package com.github.containersolutions.operator.processing;
22

33
import io.fabric8.kubernetes.client.CustomResource;
44
import io.fabric8.kubernetes.client.Watcher;
@@ -20,7 +20,10 @@ class EventConsumer implements Runnable {
2020

2121
@Override
2222
public void run() {
23-
eventScheduler.eventProcessingStarted(event);
23+
boolean stillScheduledForProcessing = eventScheduler.eventProcessingStarted(event);
24+
if (!stillScheduledForProcessing) {
25+
return;
26+
}
2427
if (processEvent()) {
2528
eventScheduler.eventProcessingFinishedSuccessfully(event);
2629
} else {

operator-framework/src/main/java/com/github/containersolutions/operator/EventDispatcher.java renamed to operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
package com.github.containersolutions.operator;
1+
package com.github.containersolutions.operator.processing;
22

3+
import com.github.containersolutions.operator.api.Context;
34
import com.github.containersolutions.operator.api.ResourceController;
45
import io.fabric8.kubernetes.client.*;
56
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
@@ -14,7 +15,7 @@
1415
/**
1516
* Dispatches events to the Controller and handles Finalizers for a single type of Custom Resource.
1617
*/
17-
class EventDispatcher<R extends CustomResource> {
18+
public class EventDispatcher<R extends CustomResource> {
1819

1920
private final static Logger log = LoggerFactory.getLogger(EventDispatcher.class);
2021

@@ -25,11 +26,11 @@ class EventDispatcher<R extends CustomResource> {
2526
Resource<R, CustomResourceDoneable<R>>> resourceClient;
2627
private final KubernetesClient k8sClient;
2728

28-
EventDispatcher(ResourceController<R> controller,
29-
CustomResourceOperationsImpl<R, CustomResourceList<R>, CustomResourceDoneable<R>> resourceOperation,
30-
NonNamespaceOperation<R, CustomResourceList<R>, CustomResourceDoneable<R>,
29+
public EventDispatcher(ResourceController<R> controller,
30+
CustomResourceOperationsImpl<R, CustomResourceList<R>, CustomResourceDoneable<R>> resourceOperation,
31+
NonNamespaceOperation<R, CustomResourceList<R>, CustomResourceDoneable<R>,
3132
Resource<R, CustomResourceDoneable<R>>> resourceClient, KubernetesClient k8sClient,
32-
String defaultFinalizer
33+
String defaultFinalizer
3334

3435
) {
3536
this.controller = controller;

operator-framework/src/main/java/com/github/containersolutions/operator/EventScheduler.java renamed to operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

Lines changed: 78 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.containersolutions.operator;
1+
package com.github.containersolutions.operator.processing;
22

33

44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -10,8 +10,12 @@
1010
import org.springframework.util.backoff.BackOffExecution;
1111
import org.springframework.util.backoff.ExponentialBackOff;
1212

13-
import java.util.*;
14-
import java.util.concurrent.*;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import java.util.concurrent.ScheduledFuture;
16+
import java.util.concurrent.ScheduledThreadPoolExecutor;
17+
import java.util.concurrent.ThreadFactory;
18+
import java.util.concurrent.TimeUnit;
1519
import java.util.concurrent.atomic.AtomicBoolean;
1620
import java.util.concurrent.locks.ReentrantLock;
1721

@@ -50,14 +54,15 @@ public class EventScheduler<R extends CustomResource> implements Watcher<R> {
5054
private final ScheduledThreadPoolExecutor executor;
5155
private final HashMap<CustomResourceEvent, BackOffExecution> backoffSchedulerCache = new HashMap<>();
5256

53-
private final Set<CustomResourceEvent> eventsNotScheduledYet = Collections.synchronizedSet(new HashSet<>());
54-
private final Map<CustomResourceEvent, ScheduledFuture<?>> eventsScheduledForProcessing = new ConcurrentHashMap<>();
55-
private final Set<CustomResourceEvent> eventsUnderProcessing = Collections.synchronizedSet(new HashSet<>());
57+
// note that these hash maps does not needs to be concurrent, since we are already locking all methods where are used
58+
private final Map<String, CustomResourceEvent> eventsNotScheduledYet = new HashMap<>();
59+
private final Map<String, ResourceScheduleHolder> eventsScheduledForProcessing = new HashMap<>();
60+
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
5661

5762
private AtomicBoolean processingEnabled = new AtomicBoolean(false);
5863
private ReentrantLock lock = new ReentrantLock();
5964

60-
EventScheduler(EventDispatcher<R> eventDispatcher) {
65+
public EventScheduler(EventDispatcher<R> eventDispatcher) {
6166
this.eventDispatcher = eventDispatcher;
6267
ThreadFactory threadFactory = new ThreadFactoryBuilder()
6368
.setNameFormat("event-consumer-%d")
@@ -67,7 +72,7 @@ public class EventScheduler<R extends CustomResource> implements Watcher<R> {
6772
executor.setRemoveOnCancelPolicy(true);
6873
}
6974

70-
void startProcessing() {
75+
public void startProcessing() {
7176
processingEnabled.set(true);
7277
}
7378

@@ -86,6 +91,7 @@ public void eventReceived(Watcher.Action action, R resource) {
8691
// with the incoming event to be scheduled/executed until that one is not finished
8792

8893
// todo handle delete event: cleanup when a real delete arrived
94+
// todo discuss new version vs new generation comparison
8995
void scheduleEvent(CustomResourceEvent newEvent) {
9096
log.debug("Current queue size {}", executor.getQueue().size());
9197
log.info("Scheduling event: {}", newEvent.getEventInfo());
@@ -95,58 +101,58 @@ void scheduleEvent(CustomResourceEvent newEvent) {
95101
lock.lock();
96102

97103
// if there is an event waiting for to be scheduled we just replace that.
98-
if (eventsNotScheduledYet.contains(newEvent)) {
99-
// although the objects equal in name and metadata the data itself can be different
100-
eventsNotScheduledYet.add(newEvent);
101-
} else if (eventsUnderProcessing.contains(newEvent)) {
102-
// we add new event that will be scheduled when previous processing finished
103-
eventsNotScheduledYet.add(newEvent);
104-
} else {
105-
AtomicBoolean scheduleEvent = new AtomicBoolean(true);
106-
if (eventsScheduledForProcessing.containsKey(newEvent)) {
107-
eventsScheduledForProcessing
108-
.entrySet()
109-
.parallelStream()
110-
.forEach(entry -> {
111-
CustomResourceEvent queuedEvent = entry.getKey();
112-
ScheduledFuture<?> scheduledFuture = entry.getValue();
113-
// Cleaning cache
114-
if (scheduledFuture.isDone() || scheduledFuture.isCancelled()) {
115-
log.debug("Event dropped from cache because is done or cancelled. [{}]", queuedEvent.getEventInfo());
116-
eventsScheduledForProcessing.remove(queuedEvent, scheduledFuture);
117-
}
118-
// If newEvent is newer than existing in queue, cancel and remove queuedEvent
119-
if (newEvent.isSameResourceAndNewerGeneration(queuedEvent)) {
120-
log.debug("Queued event canceled because incoming event is newer. [{}]", queuedEvent.getEventInfo());
121-
scheduledFuture.cancel(false);
122-
eventsScheduledForProcessing.remove(queuedEvent, scheduledFuture);
123-
}
124-
// If newEvent is older than existing in queue, don't schedule and remove from cache
125-
if (queuedEvent.isSameResourceAndNewerGeneration(newEvent)) {
126-
log.debug("Incoming event canceled because queued event is newer. [{}]", newEvent.getEventInfo());
127-
// todo this is not in cache at this point, or? (ask Marek)
128-
eventsScheduledForProcessing.remove(newEvent);
129-
scheduleEvent.set(false);
130-
}
131-
});
132-
}
133-
if (!scheduleEvent.get()) return;
104+
if (eventsNotScheduledYet.containsKey(newEvent.resourceKey()) &&
105+
newEvent.isSameResourceAndNewerVersion(eventsNotScheduledYet.get(newEvent.resourceKey()))) {
106+
log.debug("Replacing event which is not scheduled yet, since incoming event is more recent. new Event:{}"
107+
, newEvent);
108+
eventsNotScheduledYet.put(newEvent.resourceKey(), newEvent);
109+
return;
110+
} else if (eventsUnderProcessing.containsKey(newEvent.resourceKey()) &&
111+
newEvent.isSameResourceAndNewerVersion(eventsUnderProcessing.get(newEvent.resourceKey()))) {
112+
log.debug("Scheduling event for later processing since there is an event under processing for same kind." +
113+
" New event: {}", newEvent);
114+
eventsNotScheduledYet.put(newEvent.resourceKey(), newEvent);
115+
return;
134116
}
135117

118+
if (eventsScheduledForProcessing.containsKey(newEvent.resourceKey())) {
119+
ResourceScheduleHolder scheduleHolder = eventsScheduledForProcessing.get(newEvent.resourceKey());
120+
CustomResourceEvent queuedEvent = scheduleHolder.getCustomResourceEvent();
121+
ScheduledFuture<?> scheduledFuture = scheduleHolder.getScheduledFuture();
122+
// If newEvent is newer than existing in queue, cancel and remove queuedEvent
123+
if (newEvent.isSameResourceAndNewerVersion(queuedEvent)) {
124+
log.debug("Queued event canceled because incoming event is newer. [{}]", queuedEvent);
125+
scheduledFuture.cancel(false);
126+
eventsScheduledForProcessing.remove(queuedEvent.resourceKey());
127+
}
128+
// If newEvent is older than existing in queue, don't schedule and remove from cache
129+
if (queuedEvent.isSameResourceAndNewerVersion(newEvent)) {
130+
log.debug("Incoming event discarded because queued event is newer. [{}]", newEvent);
131+
return;
132+
}
133+
}
136134
backoffSchedulerCache.put(newEvent, backOff.start());
137135
ScheduledFuture<?> scheduledTask = executor.schedule(new EventConsumer(newEvent, eventDispatcher, this),
138136
backoffSchedulerCache.get(newEvent).nextBackOff(), TimeUnit.MILLISECONDS);
139-
eventsScheduledForProcessing.put(newEvent, scheduledTask);
137+
eventsScheduledForProcessing.put(newEvent.resourceKey(), new ResourceScheduleHolder(newEvent, scheduledTask));
140138
} finally {
141139
lock.unlock();
142140
}
143141
}
144142

145-
void eventProcessingStarted(CustomResourceEvent event) {
143+
boolean eventProcessingStarted(CustomResourceEvent event) {
146144
try {
147145
lock.lock();
148-
eventsScheduledForProcessing.remove(event);
149-
eventsUnderProcessing.add(event);
146+
ResourceScheduleHolder res = eventsScheduledForProcessing.remove(event.resourceKey());
147+
if (res == null) {
148+
// if its still scheduled for processing.
149+
// note that it can happen that we scheduled an event for processing, it took some time that is was picked
150+
// by executor, and it was removed during that time from the schedule but not cancelled yet. So to be correct
151+
// this should be checked also here. In other word scheduleEvent function can run in parallel with eventDispatcher.
152+
return false;
153+
}
154+
eventsUnderProcessing.put(event.resourceKey(), event);
155+
return true;
150156
} finally {
151157
lock.unlock();
152158
}
@@ -155,9 +161,13 @@ void eventProcessingStarted(CustomResourceEvent event) {
155161
void eventProcessingFinishedSuccessfully(CustomResourceEvent event) {
156162
try {
157163
lock.lock();
158-
eventsUnderProcessing.remove(event);
164+
eventsUnderProcessing.remove(event.resourceKey());
159165
backoffSchedulerCache.remove(event);
160-
// todo schedule from not processed yet if such
166+
167+
CustomResourceEvent notScheduledYetEvent = eventsNotScheduledYet.remove(event.resourceKey());
168+
if (notScheduledYetEvent != null) {
169+
scheduleEvent(notScheduledYetEvent);
170+
}
161171
} finally {
162172
lock.unlock();
163173
}
@@ -167,13 +177,13 @@ void eventProcessingFailed(CustomResourceEvent event) {
167177
try {
168178
lock.lock();
169179
eventsUnderProcessing.remove(event);
170-
// retry
171180
scheduleEvent(event);
172181
} finally {
173182
lock.unlock();
174183
}
175184
}
176185

186+
// todo review this in light of new restart functionality from master
177187
@Override
178188
public void onClose(KubernetesClientException e) {
179189
processingEnabled.set(false);
@@ -187,6 +197,23 @@ public void onClose(KubernetesClientException e) {
187197
}
188198
}
189199

200+
private static class ResourceScheduleHolder {
201+
private CustomResourceEvent customResourceEvent;
202+
private ScheduledFuture<?> scheduledFuture;
203+
204+
public ResourceScheduleHolder(CustomResourceEvent customResourceEvent, ScheduledFuture<?> scheduledFuture) {
205+
this.customResourceEvent = customResourceEvent;
206+
this.scheduledFuture = scheduledFuture;
207+
}
208+
209+
public CustomResourceEvent getCustomResourceEvent() {
210+
return customResourceEvent;
211+
}
212+
213+
public ScheduledFuture<?> getScheduledFuture() {
214+
return scheduledFuture;
215+
}
216+
}
190217
}
191218

192219

operator-framework/src/test/java/com/github/containersolutions/operator/EventConsumerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package com.github.containersolutions.operator;
22

3+
import com.github.containersolutions.operator.processing.CustomResourceEvent;
4+
import com.github.containersolutions.operator.processing.EventConsumer;
5+
import com.github.containersolutions.operator.processing.EventDispatcher;
6+
import com.github.containersolutions.operator.processing.EventScheduler;
37
import org.junit.jupiter.api.Test;
48

59
import static org.mockito.ArgumentMatchers.any;

operator-framework/src/test/java/com/github/containersolutions/operator/EventDispatcherTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import com.github.containersolutions.operator.api.Controller;
55
import com.github.containersolutions.operator.api.ResourceController;
6+
import com.github.containersolutions.operator.processing.EventDispatcher;
67
import com.github.containersolutions.operator.sample.TestCustomResource;
78
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
89
import io.fabric8.kubernetes.client.*;
@@ -181,4 +182,4 @@ CustomResource getResource() {
181182
HashMap getRawResource() {
182183
return new ObjectMapper().convertValue(getResource(), HashMap.class);
183184
}
184-
}
185+
}

0 commit comments

Comments
 (0)