Skip to content

Commit 3582751

Browse files
committed
first unit test, minor tweaks
1 parent 1ef73b0 commit 3582751

File tree

5 files changed

+32
-36
lines changed

5 files changed

+32
-36
lines changed

operator-framework/src/main/java/com/github/containersolutions/operator/Operator.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ private <R extends CustomResource> void registerController(ResourceController<R>
6262

6363
eventScheduler = new EventScheduler(eventDispatcher);
6464

65-
eventScheduler.startProcessing();
66-
6765
registerWatches(controller, client, resClass, watchAllNamespaces, targetNamespaces);
6866
} else {
6967
throw new OperatorException("CRD '" + resClass.getSimpleName() + "' with version '"

operator-framework/src/main/java/com/github/containersolutions/operator/processing/CustomResourceEvent.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class CustomResourceEvent {
1212

1313
private final Watcher.Action action;
1414
private final CustomResource resource;
15-
private Integer retriesIndex = 0;
15+
private Integer retryIndex = 0;
1616

1717
CustomResourceEvent(Watcher.Action action, CustomResource resource) {
1818
this.action = action;
@@ -54,13 +54,17 @@ public Boolean isSameResourceAndNewerVersion(CustomResourceEvent otherEvent) {
5454

5555
}
5656

57+
public Integer getRetryIndex() {
58+
return retryIndex;
59+
}
60+
5761
@Override
5862
public String toString() {
5963
return "CustomResourceEvent{" +
6064
"action=" + action +
6165
", resource=[ name=" + resource.getMetadata().getName() + ", kind=" + resource.getKind() +
6266
", apiVersion=" + resource.getApiVersion() + "]" +
63-
", retriesIndex=" + retriesIndex +
67+
", retriesIndex=" + retryIndex +
6468
'}';
6569
}
6670
}

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public EventDispatcher(ResourceController<R> controller,
4040
this.k8sClient = k8sClient;
4141
}
4242

43-
void handleEvent(Watcher.Action action, R resource) {
43+
public void handleEvent(Watcher.Action action, R resource) {
4444
if (action == Watcher.Action.MODIFIED || action == Watcher.Action.ADDED) {
4545
// we don't want to call delete resource if it not contains our finalizer,
4646
// since the resource still can be updates when marked for deletion and contains other finalizers

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.util.concurrent.ScheduledThreadPoolExecutor;
1717
import java.util.concurrent.ThreadFactory;
1818
import java.util.concurrent.TimeUnit;
19-
import java.util.concurrent.atomic.AtomicBoolean;
2019
import java.util.concurrent.locks.ReentrantLock;
2120

2221

@@ -58,7 +57,6 @@ public class EventScheduler<R extends CustomResource> implements Watcher<R> {
5857
private final Map<String, ResourceScheduleHolder> eventsScheduledForProcessing = new HashMap<>();
5958
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
6059

61-
private AtomicBoolean processingEnabled = new AtomicBoolean(false);
6260
private ReentrantLock lock = new ReentrantLock();
6361

6462
public EventScheduler(EventDispatcher<R> eventDispatcher) {
@@ -71,34 +69,26 @@ public EventScheduler(EventDispatcher<R> eventDispatcher) {
7169
executor.setRemoveOnCancelPolicy(true);
7270
}
7371

74-
public void startProcessing() {
75-
processingEnabled.set(true);
76-
}
77-
7872
@Override
7973
public void eventReceived(Watcher.Action action, R resource) {
80-
if (!processingEnabled.get()) return;
81-
8274
log.debug("Event received for action: {}, {}: {}", action.toString().toLowerCase(), resource.getClass().getSimpleName(),
8375
resource.getMetadata().getName());
84-
8576
CustomResourceEvent event = new CustomResourceEvent(action, resource);
8677
scheduleEvent(event);
8778
}
8879

89-
// todo we want to strictly scheduler resources for execution, so if an event is under processing we should wait
90-
// with the incoming event to be scheduled/executed until that one is not finished
91-
92-
// todo handle delete event: cleanup when a real delete arrived
93-
// todo discuss new version vs new generation comparison
9480
void scheduleEvent(CustomResourceEvent newEvent) {
9581
log.debug("Current queue size {}", executor.getQueue().size());
9682
log.info("Scheduling event: {}", newEvent.getEventInfo());
9783
try {
9884
// we have to lock since the fabric8 client event handling is multi-threaded,
9985
// so in the following part could be a race condition when multiple events are received for same resource.
10086
lock.lock();
101-
87+
if (newEvent.getAction() == Action.DELETED) {
88+
// this is a tricky situation, do we want to process only events which are marked for deletion?
89+
// or just ignore the problem
90+
return;
91+
}
10292
// if there is an event waiting for to be scheduled we just replace that.
10393
if (eventsNotScheduledYet.containsKey(newEvent.resourceUid()) &&
10494
newEvent.isSameResourceAndNewerVersion(eventsNotScheduledYet.get(newEvent.resourceUid()))) {
@@ -130,9 +120,11 @@ void scheduleEvent(CustomResourceEvent newEvent) {
130120
return;
131121
}
132122
}
123+
124+
// todo handle backoff instances
133125
backoffSchedulerCache.put(newEvent, backOff.start());
134126
ScheduledFuture<?> scheduledTask = executor.schedule(new EventConsumer(newEvent, eventDispatcher, this),
135-
backoffSchedulerCache.get(newEvent).nextBackOff(), TimeUnit.MILLISECONDS);
127+
newEvent.getRetryIndex() < 1 ? 0 : backoffSchedulerCache.get(newEvent).nextBackOff(), TimeUnit.MILLISECONDS);
136128
eventsScheduledForProcessing.put(newEvent.resourceUid(), new ResourceScheduleHolder(newEvent, scheduledTask));
137129
} finally {
138130
lock.unlock();

operator-framework/src/test/java/com/github/containersolutions/operator/EventSchedulerTest.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,37 +8,39 @@
88
import io.fabric8.kubernetes.client.Watcher;
99
import org.junit.jupiter.api.Test;
1010

11-
import static org.mockito.ArgumentMatchers.any;
12-
import static org.mockito.Mockito.mock;
13-
import static org.mockito.Mockito.spy;
11+
import static org.mockito.Mockito.*;
1412

1513
class EventSchedulerTest {
1614

1715
@SuppressWarnings("unchecked")
1816
private EventDispatcher<CustomResource> eventDispatcher = mock(EventDispatcher.class);
1917

18+
private EventScheduler<CustomResource> eventScheduler = new EventScheduler(eventDispatcher);
19+
2020
@Test
21-
void dontScheduleReceivedEventIfProcessingNotStarted() {
22-
EventScheduler<CustomResource> eventScheduler = spy(new EventScheduler<>(eventDispatcher));
21+
public void schedulesEvent() {
22+
CustomResource resource = sampleResource();
2323

24-
eventScheduler.eventReceived(any(), any());
24+
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource);
2525

26-
// verify(eventScheduler, times(0)).scheduleEvent(any());
26+
waitMinimalTimeForExecution();
27+
verify(eventDispatcher, times(1)).handleEvent(Watcher.Action.MODIFIED, resource);
2728
}
2829

2930
@Test
30-
void scheduleReceivedEventIfProcessingStarted() {
31-
EventScheduler<CustomResource> eventScheduler = spy(new EventScheduler<>(eventDispatcher));
32-
33-
eventScheduler.eventReceived(Watcher.Action.ADDED, getResource());
34-
eventScheduler.startProcessing();
35-
eventScheduler.eventReceived(Watcher.Action.ADDED, getResource());
31+
public void eventsAreNotExecutedConcurrentlyForSameResource() {
3632

37-
// verify(eventScheduler, times(1)).scheduleEvent(any());
3833
}
3934

35+
private void waitMinimalTimeForExecution() {
36+
try {
37+
Thread.sleep(300);
38+
} catch (InterruptedException e) {
39+
throw new IllegalStateException(e);
40+
}
41+
}
4042

41-
CustomResource getResource() {
43+
CustomResource sampleResource() {
4244
TestCustomResource resource = new TestCustomResource();
4345
resource.setMetadata(new ObjectMetaBuilder()
4446
.withCreationTimestamp("creationTimestamp")

0 commit comments

Comments
 (0)