Skip to content

Commit 5e5423b

Browse files
committed
moving generation handling to custom resource event source. Better handling of cached resource updates.
1 parent 7bb065a commit 5e5423b

File tree

8 files changed

+201
-151
lines changed

8 files changed

+201
-151
lines changed

operator-framework/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private <R extends CustomResource> void registerController(ResourceController<R>
5454
String finalizer = ControllerUtils.getFinalizer(controller);
5555
MixedOperation client = k8sClient.customResources(crd, resClass, CustomResourceList.class, ControllerUtils.getCustomResourceDoneableClass(controller));
5656
EventDispatcher eventDispatcher = new EventDispatcher(controller,
57-
finalizer, new EventDispatcher.CustomResourceFacade(client), ControllerUtils.getGenerationEventProcessing(controller));
57+
finalizer, new EventDispatcher.CustomResourceFacade(client));
5858

5959

6060
CustomResourceCache customResourceCache = new CustomResourceCache();
@@ -68,7 +68,7 @@ private <R extends CustomResource> void registerController(ResourceController<R>
6868
controller.init(eventSourceManager);
6969
CustomResourceEventSource customResourceEventSource
7070
= createCustomResourceEventSource(client, customResourceCache, watchAllNamespaces, targetNamespaces,
71-
defaultEventHandler);
71+
defaultEventHandler, ControllerUtils.getGenerationEventProcessing(controller));
7272
eventSourceManager.registerCustomResourceEventSource(customResourceEventSource);
7373

7474

@@ -80,10 +80,11 @@ private CustomResourceEventSource createCustomResourceEventSource(MixedOperation
8080
CustomResourceCache customResourceCache,
8181
boolean watchAllNamespaces,
8282
String[] targetNamespaces,
83-
DefaultEventHandler defaultEventHandler) {
83+
DefaultEventHandler defaultEventHandler,
84+
boolean generationAware) {
8485
CustomResourceEventSource customResourceEventSource = watchAllNamespaces ?
85-
CustomResourceEventSource.customResourceEventSourceForAllNamespaces(customResourceCache, client) :
86-
CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(customResourceCache, client, targetNamespaces);
86+
CustomResourceEventSource.customResourceEventSourceForAllNamespaces(customResourceCache, client, generationAware) :
87+
CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(customResourceCache, client, targetNamespaces, generationAware);
8788

8889
customResourceEventSource.setEventHandler(defaultEventHandler);
8990

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,42 @@
11
package io.javaoperatorsdk.operator.processing;
22

33
import io.fabric8.kubernetes.client.CustomResource;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
46

7+
import java.util.HashMap;
58
import java.util.Map;
69
import java.util.Optional;
710
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.locks.Lock;
12+
import java.util.concurrent.locks.ReentrantLock;
13+
import java.util.function.Predicate;
814

915
public class CustomResourceCache {
16+
private static final Logger log = LoggerFactory.getLogger(CustomResourceCache.class);
1017

1118
private final Map<String, CustomResource> resources = new ConcurrentHashMap<>();
19+
private final Lock lock = new ReentrantLock();
1220

1321
public void cacheResource(CustomResource resource) {
14-
resources.put(resource.getMetadata().getUid(), resource);
22+
try {
23+
lock.lock();
24+
resources.put(KubernetesResourceUtils.getUID(resource), resource);
25+
} finally {
26+
lock.unlock();
27+
}
28+
}
29+
30+
public void cacheResource(CustomResource resource, Predicate<CustomResource> predicate) {
31+
try {
32+
lock.lock();
33+
if (predicate.test(resources.get(KubernetesResourceUtils.getUID(resource)))) {
34+
log.trace("Update cache after condition is true: {}", resource);
35+
resources.put(resource.getMetadata().getUid(), resource);
36+
}
37+
} finally {
38+
lock.unlock();
39+
}
1540
}
1641

1742
public Optional<CustomResource> getLatestResource(String uuid) {

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.concurrent.ScheduledThreadPoolExecutor;
1515
import java.util.concurrent.ThreadFactory;
1616
import java.util.concurrent.locks.ReentrantLock;
17+
import java.util.function.Predicate;
1718

1819
import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent;
1920
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
@@ -101,23 +102,30 @@ void eventProcessingFinished(ExecutionScope executionScope, PostExecutionControl
101102
}
102103

103104
/**
104-
* Here we try to cache the latest resource after an update. The goal is to solve a concurrency issue we sometimes see:
105+
* Here we try to cache the latest resource after an update. The goal is to solve a concurrency issue we've seen:
105106
* If an execution is finished, where we updated a custom resource, but there are other events already buffered for next
106-
* execution we might not get the newest custom resource from CustomResource event source in time. Thus we execute
107+
* execution, we might not get the newest custom resource from CustomResource event source in time. Thus we execute
107108
* the next batch of events but with a non up to date CR. Here we cache the latest CustomResource from the update
108109
* execution so we make sure its already used in the up-coming execution.
110+
*
111+
* Note that this is an improvement, not a bug fix. This situation can happen naturally, we just make the execution more
112+
* efficient, and avoid questions about conflicts.
113+
*
114+
* Note that without the conditional locking in the cache, there is a very minor chance that we would override an
115+
* additional change coming from a different client.
109116
*/
110117
private void cacheUpdatedResourceIfChanged(ExecutionScope executionScope, PostExecutionControl postExecutionControl) {
111118
if (postExecutionControl.customResourceUpdatedDuringExecution()) {
112119
CustomResource originalCustomResource = executionScope.getCustomResource();
113120
CustomResource customResourceAfterExecution = postExecutionControl.getUpdatedCustomResource().get();
114-
CustomResource cachedVersion = this.customResourceCache.getLatestResource(getUID(customResourceAfterExecution)).get();
115121
String originalResourceVersion = getVersion(originalCustomResource);
116-
if (getVersion(cachedVersion).equals(originalResourceVersion) && !originalResourceVersion.equals(getVersion(customResourceAfterExecution))) {
117-
log.debug("Updating custom resource cache from update response for resource uid: {} new version: {} old version: {}",
118-
getUID(originalCustomResource), getVersion(customResourceAfterExecution), getVersion(originalCustomResource));
119-
this.customResourceCache.cacheResource(customResourceAfterExecution);
120-
}
122+
123+
log.debug("Trying to update resource cache from update response for resource uid: {} new version: {} old version: {}",
124+
getUID(originalCustomResource), getVersion(customResourceAfterExecution), getVersion(originalCustomResource));
125+
this.customResourceCache.cacheResource(customResourceAfterExecution, customResource ->
126+
getVersion(customResource).equals(originalResourceVersion)
127+
&& !originalResourceVersion.equals(getVersion(customResourceAfterExecution))
128+
);
121129
}
122130
}
123131

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java

Lines changed: 12 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,14 @@ public class EventDispatcher {
2929
private final ResourceController controller;
3030
private final String resourceFinalizer;
3131
private final CustomResourceFacade customResourceFacade;
32-
private final boolean generationAware;
33-
private final Map<String, Long> lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>();
3432
private EventSourceManager eventSourceManager;
3533

3634
public EventDispatcher(ResourceController controller,
3735
String finalizer,
38-
CustomResourceFacade customResourceFacade, boolean generationAware) {
36+
CustomResourceFacade customResourceFacade) {
3937
this.controller = controller;
4038
this.customResourceFacade = customResourceFacade;
4139
this.resourceFinalizer = finalizer;
42-
this.generationAware = generationAware;
4340
}
4441

4542
public void setEventSourceManager(EventSourceManager eventSourceManager) {
@@ -60,7 +57,6 @@ private PostExecutionControl handDispatch(ExecutionScope executionScope) {
6057
log.debug("Handling events: {} for resource {}", executionScope.getEvents(), resource.getMetadata());
6158

6259
if (containsCustomResourceDeletedEvent(executionScope.getEvents())) {
63-
cleanup(executionScope.getCustomResource());
6460
log.debug("Skipping dispatch processing because of a Delete event: {} with version: {}",
6561
getUID(resource), getVersion(resource));
6662
return PostExecutionControl.defaultDispatch();
@@ -87,55 +83,30 @@ private PostExecutionControl handleCreateOrUpdate(ExecutionScope executionScope,
8783
updateCustomResourceWithFinalizer(resource);
8884
return PostExecutionControl.onlyFinalizerAdded();
8985
} else {
90-
if (!skipBecauseOfGenerations(executionScope)) {
91-
log.debug("Executing createOrUpdate for resource {} with version: {} with execution scope: {}",
92-
getUID(resource), getVersion(resource), executionScope);
93-
UpdateControl<? extends CustomResource> updateControl = controller.createOrUpdateResource(resource, context);
94-
CustomResource updatedCustomResource = null;
95-
if (updateControl.isUpdateStatusSubResource()) {
96-
updatedCustomResource = customResourceFacade.updateStatus(updateControl.getCustomResource());
97-
} else if (updateControl.isUpdateCustomResource()) {
98-
updatedCustomResource = updateCustomResource(updateControl.getCustomResource());
99-
}
100-
markLastGenerationProcessed(resource);
101-
if (updatedCustomResource != null) {
102-
return PostExecutionControl.customResourceUpdated(updatedCustomResource);
103-
} else {
104-
return PostExecutionControl.defaultDispatch();
105-
}
86+
log.debug("Executing createOrUpdate for resource {} with version: {} with execution scope: {}",
87+
getUID(resource), getVersion(resource), executionScope);
88+
UpdateControl<? extends CustomResource> updateControl = controller.createOrUpdateResource(resource, context);
89+
CustomResource updatedCustomResource = null;
90+
if (updateControl.isUpdateStatusSubResource()) {
91+
updatedCustomResource = customResourceFacade.updateStatus(updateControl.getCustomResource());
92+
} else if (updateControl.isUpdateCustomResource()) {
93+
updatedCustomResource = updateCustomResource(updateControl.getCustomResource());
94+
}
95+
if (updatedCustomResource != null) {
96+
return PostExecutionControl.customResourceUpdated(updatedCustomResource);
10697
} else {
107-
log.debug("Skipping event processing because generations: {} with version: {}",
108-
getUID(resource), getVersion(resource));
10998
return PostExecutionControl.defaultDispatch();
11099
}
111100
}
112101
}
113102

114-
private boolean skipBecauseOfGenerations(ExecutionScope executionScope) {
115-
if (!generationAware) {
116-
return false;
117-
}
118-
if (executionScope.getEvents().size() == 1) {
119-
Event event = executionScope.getEvents().get(0);
120-
if (event instanceof CustomResourceEvent) {
121-
Long actualGeneration = executionScope.getCustomResource().getMetadata().getGeneration();
122-
Long lastGeneration = lastGenerationProcessedSuccessfully.get(executionScope.getCustomResourceUid());
123-
if (lastGeneration == null) {
124-
return false;
125-
}
126-
return actualGeneration <= lastGeneration;
127-
}
128-
}
129-
return false;
130-
}
131103

132104
private PostExecutionControl handleDelete(CustomResource resource, Context context) {
133105
log.debug("Executing delete for resource: {} with version: {}", getUID(resource), getVersion(resource));
134106
DeleteControl deleteControl = controller.deleteResource(resource, context);
135107
boolean hasFinalizer = ControllerUtils.hasGivenFinalizer(resource, resourceFinalizer);
136108
if (deleteControl == DeleteControl.DEFAULT_DELETE && hasFinalizer) {
137109
CustomResource customResource = removeFinalizer(resource);
138-
cleanup(resource);
139110
return PostExecutionControl.customResourceUpdated(customResource);
140111
} else {
141112
log.debug("Skipping finalizer remove for resource: {} with version: {}. delete control: {}, hasFinalizer: {} ",
@@ -144,27 +115,6 @@ private PostExecutionControl handleDelete(CustomResource resource, Context conte
144115
}
145116
}
146117

147-
public boolean largerGenerationThenProcessedBefore(CustomResource resource) {
148-
Long lastGeneration = lastGenerationProcessedSuccessfully.get(resource.getMetadata().getUid());
149-
if (lastGeneration == null) {
150-
return true;
151-
} else {
152-
return resource.getMetadata().getGeneration() > lastGeneration;
153-
}
154-
}
155-
156-
private void cleanup(CustomResource resource) {
157-
if (generationAware) {
158-
lastGenerationProcessedSuccessfully.remove(resource.getMetadata().getUid());
159-
}
160-
}
161-
162-
private void markLastGenerationProcessed(CustomResource resource) {
163-
if (generationAware) {
164-
lastGenerationProcessedSuccessfully.put(resource.getMetadata().getUid(), resource.getMetadata().getGeneration());
165-
}
166-
}
167-
168118
private void updateCustomResourceWithFinalizer(CustomResource resource) {
169119
log.debug("Adding finalizer for resource: {} version: {}", getUID(resource),
170120
getVersion(resource));
@@ -201,10 +151,6 @@ private void addFinalizerIfNotPresent(CustomResource resource) {
201151
}
202152
}
203153

204-
private boolean markedForDeletion(CustomResource resource) {
205-
return resource.getMetadata().getDeletionTimestamp() != null && !resource.getMetadata().getDeletionTimestamp().isEmpty();
206-
}
207-
208154
// created to support unit testing
209155
public static class CustomResourceFacade {
210156

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/KubernetesResourceUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.javaoperatorsdk.operator.processing;
22

33
import io.fabric8.kubernetes.api.model.HasMetadata;
4+
import io.fabric8.kubernetes.client.CustomResource;
45

56
public class KubernetesResourceUtils {
67

@@ -12,4 +13,8 @@ public static String getVersion(HasMetadata customResource) {
1213
return customResource.getMetadata().getResourceVersion();
1314
}
1415

16+
17+
public static boolean markedForDeletion(CustomResource resource) {
18+
return resource.getMetadata().getDeletionTimestamp() != null && !resource.getMetadata().getDeletionTimestamp().isEmpty();
19+
}
1520
}

0 commit comments

Comments
 (0)