Skip to content

Commit c873db4

Browse files
committed
generation aware scheduling
1 parent a25ef48 commit c873db4

File tree

7 files changed

+98
-6
lines changed

7 files changed

+98
-6
lines changed

operator-framework/src/main/java/com/github/containersolutions/operator/ControllerUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ static String getDefaultFinalizer(ResourceController controller) {
2828
return getAnnotation(controller).finalizerName();
2929
}
3030

31+
static boolean getGenerationEventProcessing(ResourceController controller) {
32+
return getAnnotation(controller).generationAwareEventProcessing();
33+
}
34+
3135
static <R extends CustomResource> Class<R> getCustomResourceClass(ResourceController controller) {
3236
return (Class<R>) getAnnotation(controller).customResourceClass();
3337
}

operator-framework/src/main/java/com/github/containersolutions/operator/Operator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private <R extends CustomResource> void registerController(ResourceController<R>
6060
MixedOperation client = k8sClient.customResources(crd, resClass, CustomResourceList.class, getCustomResourceDoneableClass(controller));
6161
EventDispatcher eventDispatcher = new EventDispatcher(controller,
6262
getDefaultFinalizer(controller), new EventDispatcher.CustomResourceReplaceFacade(client));
63-
EventScheduler eventScheduler = new EventScheduler(eventDispatcher, retry);
63+
EventScheduler eventScheduler = new EventScheduler(eventDispatcher, retry, ControllerUtils.getGenerationEventProcessing(controller));
6464
registerWatches(controller, client, resClass, watchAllNamespaces, targetNamespaces, eventScheduler);
6565
}
6666

operator-framework/src/main/java/com/github/containersolutions/operator/api/Controller.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,11 @@
1818
Class<? extends CustomResource> customResourceClass();
1919

2020
String finalizerName() default DEFAULT_FINALIZER;
21-
}
21+
22+
/**
23+
* If true, will schedule new event only if generation increased since the last processing, otherwise will
24+
* process all events.
25+
* See: https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#status-subresource
26+
*/
27+
boolean generationAwareEventProcessing() default true;
28+
}

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventScheduler.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ public class EventScheduler implements Watcher<CustomResource> {
4040
private final ScheduledThreadPoolExecutor executor;
4141
private final EventStore eventStore = new EventStore();
4242
private final Retry retry;
43+
private final boolean generationAware;
4344

4445
private ReentrantLock lock = new ReentrantLock();
4546

46-
public EventScheduler(EventDispatcher eventDispatcher, Retry retry) {
47+
public EventScheduler(EventDispatcher eventDispatcher, Retry retry, boolean generationAware) {
4748
this.eventDispatcher = eventDispatcher;
4849
this.retry = retry;
50+
this.generationAware = generationAware;
4951
executor = new ScheduledThreadPoolExecutor(1);
5052
executor.setRemoveOnCancelPolicy(true);
5153
}
@@ -70,12 +72,19 @@ void scheduleEventFromApi(CustomResourceEvent event) {
7072
log.debug("Skipping delete event since deletion timestamp is present on resource, so finalizer was in place.");
7173
return;
7274
}
75+
// In case of generation aware processing, we want to replace this even if generation not increased,
76+
// to have the most recent copy of the event.
7377
if (eventStore.containsNotScheduledEvent(event.resourceUid())) {
7478
log.debug("Replacing not scheduled event with actual event." +
7579
" New event: {}", event);
7680
eventStore.addOrReplaceEventAsNotScheduled(event);
7781
return;
7882
}
83+
if (generationAware && !eventStore.hasLargerGenerationThanLastStored(event)) {
84+
log.debug("Skipping event, has not larger generation than last stored, actual generation: {}, last stored: {} ",
85+
event.getResource().getMetadata().getGeneration(), eventStore.getLastStoredGeneration(event));
86+
return;
87+
}
7988
if (eventStore.containsEventUnderProcessing(event.resourceUid())) {
8089
log.debug("Scheduling event for later processing since there is an event under processing for same kind." +
8190
" New event: {}", event);

operator-framework/src/main/java/com/github/containersolutions/operator/processing/EventStore.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ public class EventStore {
77

88
private final Map<String, CustomResourceEvent> eventsNotScheduled = new HashMap<>();
99
private final Map<String, CustomResourceEvent> eventsUnderProcessing = new HashMap<>();
10+
private final Map<String, Long> lastGeneration = new HashMap<>();
1011

1112
public boolean containsNotScheduledEvent(String uuid) {
1213
return eventsNotScheduled.containsKey(uuid);
@@ -18,6 +19,7 @@ public CustomResourceEvent removeEventNotScheduled(String uid) {
1819

1920
public void addOrReplaceEventAsNotScheduled(CustomResourceEvent event) {
2021
eventsNotScheduled.put(event.resourceUid(), event);
22+
updateLastGeneration(event);
2123
}
2224

2325
public boolean containsEventUnderProcessing(String uuid) {
@@ -26,10 +28,27 @@ public boolean containsEventUnderProcessing(String uuid) {
2628

2729
public void addEventUnderProcessing(CustomResourceEvent event) {
2830
eventsUnderProcessing.put(event.resourceUid(), event);
31+
updateLastGeneration(event);
2932
}
3033

3134
public CustomResourceEvent removeEventUnderProcessing(String uid) {
3235
return eventsUnderProcessing.remove(uid);
3336
}
3437

38+
private void updateLastGeneration(CustomResourceEvent event) {
39+
Long generation = event.getResource().getMetadata().getGeneration();
40+
Long storedGeneration = lastGeneration.get(event.getResource().getMetadata().getUid());
41+
if (storedGeneration == null || generation > storedGeneration) {
42+
lastGeneration.put(event.getResource().getMetadata().getUid(), generation);
43+
}
44+
}
45+
46+
public boolean hasLargerGenerationThanLastStored(CustomResourceEvent event) {
47+
return getLastStoredGeneration(event) == null || getLastStoredGeneration(event) <
48+
event.getResource().getMetadata().getGeneration();
49+
}
50+
51+
public Long getLastStoredGeneration(CustomResourceEvent event) {
52+
return lastGeneration.get(event.getResource().getMetadata().getUid());
53+
}
3554
}

operator-framework/src/test/java/com/github/containersolutions/operator/ControllerUtilsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public void returnsValuesFromControllerAnnotationFinalizer() {
1717
assertEquals(DEFAULT_FINALIZER, ControllerUtils.getDefaultFinalizer(new TestCustomResourceController(null)));
1818
assertEquals(TestCustomResource.class, ControllerUtils.getCustomResourceClass(new TestCustomResourceController(null)));
1919
assertEquals(CRD_NAME, ControllerUtils.getCrdName(new TestCustomResourceController(null)));
20+
assertEquals(true, ControllerUtils.getGenerationEventProcessing(new TestCustomResourceController(null)));
2021
assertTrue(CustomResourceDoneable.class.isAssignableFrom(ControllerUtils.getCustomResourceDoneableClass(new TestCustomResourceController(null))));
2122
}
2223
}

operator-framework/src/test/java/com/github/containersolutions/operator/EventSchedulerTest.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class EventSchedulerTest {
3232
@SuppressWarnings("unchecked")
3333
private EventDispatcher eventDispatcher = mock(EventDispatcher.class);
3434

35-
private EventScheduler eventScheduler = new EventScheduler(eventDispatcher, new GenericRetry().setMaxAttempts(MAX_RETRY_ATTEMPTS).withLinearRetry());
35+
private EventScheduler eventScheduler = initScheduler(true);
3636

3737
private List<EventProcessingDetail> eventProcessingList = Collections.synchronizedList(new ArrayList<>());
3838

@@ -55,6 +55,7 @@ public void eventsAreNotExecutedConcurrentlyForSameResource() throws Interrupted
5555
CustomResource resource1 = sampleResource();
5656
CustomResource resource2 = sampleResource();
5757
resource2.getMetadata().setResourceVersion("2");
58+
resource2.getMetadata().setGeneration(2l);
5859

5960
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource1);
6061
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource2);
@@ -70,14 +71,56 @@ public void eventsAreNotExecutedConcurrentlyForSameResource() throws Interrupted
7071
"Start time of event 2 is after end time of event 1");
7172
}
7273

74+
@Test
75+
public void generationAwareSchedulingSkipsEventsWithoutIncreasedGeneration() {
76+
normalDispatcherExecution();
77+
CustomResource resource1 = sampleResource();
78+
CustomResource resource2 = sampleResource();
79+
resource2.getMetadata().setResourceVersion("2");
80+
81+
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource1);
82+
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource2);
83+
84+
waitTimeForExecution(2);
85+
assertThat(eventProcessingList).hasSize(1)
86+
.matches(list ->
87+
eventProcessingList.get(0).getCustomResource().getMetadata().getResourceVersion().equals("1"));
88+
89+
}
90+
91+
@Test
92+
public void notGenerationAwareSchedulingProcessesAllEventsRegardlessOfGeneration() {
93+
generationUnAwareScheduler();
94+
normalDispatcherExecution();
95+
CustomResource resource1 = sampleResource();
96+
CustomResource resource2 = sampleResource();
97+
resource2.getMetadata().setResourceVersion("2");
98+
99+
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource1);
100+
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource2);
101+
102+
waitTimeForExecution(2);
103+
log.info("Event processing details 1.: {}. 2: {}", eventProcessingList.get(0), eventProcessingList.get(1));
104+
assertThat(eventProcessingList).hasSize(2)
105+
.matches(list -> eventProcessingList.get(0).getCustomResource().getMetadata().getResourceVersion().equals("1") &&
106+
eventProcessingList.get(1).getCustomResource().getMetadata().getResourceVersion().equals("2"),
107+
"Events processed in correct order")
108+
.matches(list ->
109+
eventProcessingList.get(0).getEndTime().isBefore(eventProcessingList.get(1).startTime),
110+
"Start time of event 2 is after end time of event 1");
111+
}
112+
113+
// note that this is true for generation aware scheduling
73114
@Test
74115
public void onlyLastEventIsScheduledIfMoreReceivedDuringAndExecution() {
75116
normalDispatcherExecution();
76117
CustomResource resource1 = sampleResource();
77118
CustomResource resource2 = sampleResource();
78119
resource2.getMetadata().setResourceVersion("2");
120+
resource2.getMetadata().setGeneration(2l);
79121
CustomResource resource3 = sampleResource();
80122
resource3.getMetadata().setResourceVersion("3");
123+
resource3.getMetadata().setGeneration(3l);
81124

82125
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource1);
83126
eventScheduler.eventReceived(Watcher.Action.MODIFIED, resource2);
@@ -118,6 +161,7 @@ public void processesNewEventIfItIsReceivedAfterExecutionInError() {
118161
CustomResource resource1 = sampleResource();
119162
CustomResource resource2 = sampleResource();
120163
resource2.getMetadata().setResourceVersion("2");
164+
resource2.getMetadata().setGeneration(2l);
121165

122166
doAnswer(this::exceptionInExecution).when(eventDispatcher).handleEvent(any(Watcher.Action.class), eq(resource1));
123167
doAnswer(this::normalExecution).when(eventDispatcher).handleEvent(any(Watcher.Action.class), eq(resource2));
@@ -140,7 +184,7 @@ public void processesNewEventIfItIsReceivedAfterExecutionInError() {
140184
}
141185

142186
@Test
143-
public void numberOfRetriesIsLimited() {
187+
public void numberOfRetriesCanBeLimited() {
144188
doAnswer(this::exceptionInExecution).when(eventDispatcher).handleEvent(any(Watcher.Action.class), any(CustomResource.class));
145189

146190
eventScheduler.eventReceived(Watcher.Action.MODIFIED, sampleResource());
@@ -166,6 +210,14 @@ private Object normalExecution(InvocationOnMock invocation) {
166210
}
167211
}
168212

213+
private void generationUnAwareScheduler() {
214+
eventScheduler = initScheduler(false);
215+
}
216+
217+
private EventScheduler initScheduler(boolean generationAware) {
218+
return new EventScheduler(eventDispatcher,
219+
new GenericRetry().setMaxAttempts(MAX_RETRY_ATTEMPTS).withLinearRetry(), generationAware);
220+
}
169221

170222
private Object exceptionInExecution(InvocationOnMock invocation) {
171223
try {
@@ -203,7 +255,7 @@ CustomResource sampleResource() {
203255
resource.setMetadata(new ObjectMetaBuilder()
204256
.withCreationTimestamp("creationTimestamp")
205257
.withDeletionGracePeriodSeconds(10L)
206-
.withGeneration(10L)
258+
.withGeneration(1L)
207259
.withName("name")
208260
.withNamespace("namespace")
209261
.withResourceVersion("1")

0 commit comments

Comments
 (0)