Skip to content

Commit 1548361

Browse files
committed
caching of resources from update
1 parent 323e2a8 commit 1548361

File tree

5 files changed

+68
-22
lines changed

5 files changed

+68
-22
lines changed

operator-framework/src/main/java/io/javaoperatorsdk/operator/api/ResourceController.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public interface ResourceController<R extends CustomResource> {
2020

2121
/**
2222
* The implementation of this operation is required to be idempotent.
23+
* Always use the UpdateControl object to make updates on custom resource if possible.
24+
* Also always use the custom resource parameter (not the custom resource that might be in the events)
2325
*
2426
* @return The resource is updated in api server if the return value is present
2527
* within Optional. This the common use cases. However in cases, for example the operator is restarted,

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.concurrent.locks.ReentrantLock;
1616

1717
import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent;
18+
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.*;
1819

1920
/**
2021
* Event handler that makes sure that events are processed in a "single threaded" way per resource UID, while buffering
@@ -89,13 +90,31 @@ void eventProcessingFinished(ExecutionScope executionScope, PostExecutionControl
8990
if (containsCustomResourceDeletedEvent(executionScope.getEvents())) {
9091
cleanupAfterDeletedEvent(executionScope.getCustomResourceUid());
9192
} else {
93+
cacheUpdatedResourceIfChanged(executionScope, postExecutionControl);
9294
executeBufferedEvents(executionScope.getCustomResourceUid());
9395
}
9496
} finally {
9597
lock.unlock();
9698
}
9799
}
98100

101+
/**
102+
* Here we try to cache the latest resource after an update. The goal is to solve a concurrency issue we sometimes see:
103+
* If an execution is finished, where we updated a custom resource, but there are other events already buffered for next
104+
* execution we might not get the newest custom resource from CustomResource event source in time. Thus we execute
105+
* the next batch of events but with a non up to date CR. Here we cache the latest CustomResource from the update
106+
* execution so we make sure its already used in the up-comming execution.
107+
*/
108+
private void cacheUpdatedResourceIfChanged(ExecutionScope executionScope, PostExecutionControl postExecutionControl) {
109+
if (postExecutionControl.customResourceUpdatedDuringExecution()) {
110+
CustomResource originalCustomResource = executionScope.getCustomResource();
111+
CustomResource customResourceAfterExecution = postExecutionControl.getUpdatedCustomResource().get();
112+
if (!getVersion(originalCustomResource).equals(getVersion(customResourceAfterExecution))) {
113+
this.customResourceCache.cacheResource(customResourceAfterExecution);
114+
}
115+
}
116+
}
117+
99118
private void cleanupAfterDeletedEvent(String customResourceUid) {
100119
defaultEventSourceManager.cleanup(customResourceUid);
101120
eventBuffer.cleanup(customResourceUid);

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,23 @@ private PostExecutionControl handleCreateOrUpdate(ExecutionScope executionScope,
9191
log.debug("Executing createOrUpdate for resource {} with version: {} with execution scope: {}",
9292
getUID(resource), getVersion(resource), executionScope);
9393
UpdateControl<? extends CustomResource> updateControl = controller.createOrUpdateResource(resource, context);
94+
CustomResource updatedCustomResource = null;
9495
if (updateControl.isUpdateStatusSubResource()) {
95-
customResourceFacade.updateStatus(updateControl.getCustomResource());
96+
updatedCustomResource = customResourceFacade.updateStatus(updateControl.getCustomResource());
9697
} else if (updateControl.isUpdateCustomResource()) {
97-
updateCustomResource(updateControl.getCustomResource());
98+
updatedCustomResource = updateCustomResource(updateControl.getCustomResource());
9899
}
99100
markLastGenerationProcessed(resource);
101+
if (updatedCustomResource != null) {
102+
return PostExecutionControl.customResourceUpdated(updatedCustomResource);
103+
} else {
104+
return PostExecutionControl.defaultDispatch();
105+
}
100106
} else {
101107
log.debug("Skipping event processing because generations: {} with version: {}",
102108
getUID(resource), getVersion(resource));
109+
return PostExecutionControl.defaultDispatch();
103110
}
104-
return PostExecutionControl.defaultDispatch();
105111
}
106112
}
107113

@@ -128,13 +134,14 @@ private PostExecutionControl handleDelete(CustomResource resource, Context conte
128134
DeleteControl deleteControl = controller.deleteResource(resource, context);
129135
boolean hasFinalizer = ControllerUtils.hasGivenFinalizer(resource, resourceFinalizer);
130136
if (deleteControl == DeleteControl.DEFAULT_DELETE && hasFinalizer) {
131-
removeFinalizer(resource);
137+
CustomResource customResource = removeFinalizer(resource);
132138
cleanup(resource);
139+
return PostExecutionControl.customResourceUpdated(customResource);
133140
} else {
134141
log.debug("Skipping finalizer remove for resource: {} with version: {}. delete control: {}, hasFinalizer: {} ",
135142
getUID(resource), getVersion(resource), deleteControl, hasFinalizer);
143+
return PostExecutionControl.defaultDispatch();
136144
}
137-
return PostExecutionControl.defaultDispatch();
138145
}
139146

140147
public boolean largerGenerationThenProcessedBefore(CustomResource resource) {
@@ -165,23 +172,23 @@ private void updateCustomResourceWithFinalizer(CustomResource resource) {
165172
replace(resource);
166173
}
167174

168-
private void updateCustomResource(CustomResource resource) {
175+
private CustomResource updateCustomResource(CustomResource resource) {
169176
log.debug("Updating resource: {} with version: {}", getUID(resource),
170177
getVersion(resource));
171178
log.trace("Resource before update: {}", resource);
172-
replace(resource);
179+
return replace(resource);
173180
}
174181

175182

176-
private void removeFinalizer(CustomResource resource) {
183+
private CustomResource removeFinalizer(CustomResource resource) {
177184
log.debug("Removing finalizer on resource: {} with version: {}", getUID(resource), getVersion(resource));
178185
resource.getMetadata().getFinalizers().remove(resourceFinalizer);
179-
customResourceFacade.replaceWithLock(resource);
186+
return customResourceFacade.replaceWithLock(resource);
180187
}
181188

182-
private void replace(CustomResource resource) {
189+
private CustomResource replace(CustomResource resource) {
183190
log.debug("Trying to replace resource {}, version: {}", resource.getMetadata().getName(), resource.getMetadata().getResourceVersion());
184-
customResourceFacade.replaceWithLock(resource);
191+
return customResourceFacade.replaceWithLock(resource);
185192
}
186193

187194
private void addFinalizerIfNotPresent(CustomResource resource) {
@@ -207,9 +214,9 @@ public CustomResourceFacade(MixedOperation<?, ?, ?, Resource<CustomResource, ?>>
207214
this.resourceOperation = resourceOperation;
208215
}
209216

210-
public void updateStatus(CustomResource resource) {
217+
public CustomResource updateStatus(CustomResource resource) {
211218
log.trace("Updating status for resource: {}", resource);
212-
resourceOperation.inNamespace(resource.getMetadata().getNamespace())
219+
return resourceOperation.inNamespace(resource.getMetadata().getNamespace())
213220
.withName(resource.getMetadata().getName())
214221
.updateStatus(resource);
215222
}
Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.processing;
22

3+
import io.fabric8.kubernetes.client.CustomResource;
34
import io.javaoperatorsdk.operator.api.UpdateControl;
45

56
import java.util.Optional;
@@ -8,29 +9,34 @@ public final class PostExecutionControl {
89

910
private final boolean onlyFinalizerHandled;
1011

11-
private final UpdateControl updateControl;
12+
private final CustomResource updatedCustomResource;
1213

13-
private PostExecutionControl(boolean onlyFinalizerHandled, UpdateControl updateControl) {
14+
private PostExecutionControl(boolean onlyFinalizerHandled, CustomResource updatedCustomResource) {
1415
this.onlyFinalizerHandled = onlyFinalizerHandled;
15-
this.updateControl = updateControl;
16+
this.updatedCustomResource = updatedCustomResource;
1617
}
1718

1819
public static PostExecutionControl onlyFinalizerAdded() {
19-
return new PostExecutionControl(true,null);
20+
return new PostExecutionControl(true, null);
2021
}
2122

2223
public static PostExecutionControl defaultDispatch() {
23-
return new PostExecutionControl(false,null);
24+
return new PostExecutionControl(false, null);
2425
}
25-
public static PostExecutionControl dispatchWithUpdateControl(UpdateControl updateControl) {
26-
return new PostExecutionControl(false,updateControl);
26+
27+
public static PostExecutionControl customResourceUpdated(CustomResource updatedCustomResource) {
28+
return new PostExecutionControl(false, updatedCustomResource);
2729
}
2830

2931
public boolean isOnlyFinalizerHandled() {
3032
return onlyFinalizerHandled;
3133
}
3234

33-
public Optional<UpdateControl> getUpdateControl() {
34-
return Optional.ofNullable(updateControl);
35+
public Optional<CustomResource> getUpdatedCustomResource() {
36+
return Optional.ofNullable(updatedCustomResource);
37+
}
38+
39+
public boolean customResourceUpdatedDuringExecution() {
40+
return updatedCustomResource != null;
3541
}
3642
}

operator-framework/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,18 @@ public void eventReceived(Watcher.Action action, CustomResource customResource)
7676
eventHandler.handleEvent(new CustomResourceEvent(action, customResource, this));
7777
}
7878

79+
public void refreshCachedCustomResource(String resourceId) {
80+
CustomResource customResource = resourceCache.getLatestResource(resourceId).get();
81+
CustomResourceOperationsImpl client = (CustomResourceOperationsImpl) this.client;
82+
83+
CustomResource upToDateCR = (CustomResource)
84+
((CustomResourceOperationsImpl) client.inNamespace(customResource.getMetadata()
85+
.getNamespace())
86+
.withName(customResource.getMetadata().getName()))
87+
.fromServer().get();
88+
resourceCache.cacheResource(upToDateCR);
89+
}
90+
7991
@Override
8092
public void onClose(KubernetesClientException e) {
8193
if (e == null) {

0 commit comments

Comments
 (0)