Skip to content

Commit 80ff244

Browse files
csvirimetacosm
andauthored
Improve cache interface for CustomResourceEventSource (#684)
Co-authored-by: Chris Laprun <[email protected]>
1 parent 8626ab4 commit 80ff244

File tree

10 files changed

+166
-104
lines changed

10 files changed

+166
-104
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/MDCUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@ public class MDCUtils {
1515
private static final String GENERATION = "resource.generation";
1616
private static final String UID = "resource.uid";
1717

18-
public static void addCustomResourceIDInfo(ResourceID resourceID) {
18+
public static void addResourceIDInfo(ResourceID resourceID) {
1919
MDC.put(NAME, resourceID.getName());
2020
MDC.put(NAMESPACE, resourceID.getNamespace().orElse("no namespace"));
2121
}
2222

23-
public static void removeCustomResourceIDInfo() {
23+
public static void removeResourceIDInfo() {
2424
MDC.remove(NAME);
2525
MDC.remove(NAMESPACE);
2626
}
2727

28-
public static void addCustomResourceInfo(HasMetadata resource) {
28+
public static void addResourceInfo(HasMetadata resource) {
2929
MDC.put(API_VERSION, resource.getApiVersion());
3030
MDC.put(KIND, resource.getKind());
3131
MDC.put(NAME, resource.getMetadata().getName());
@@ -37,7 +37,7 @@ public static void addCustomResourceInfo(HasMetadata resource) {
3737
MDC.put(UID, resource.getMetadata().getUid());
3838
}
3939

40-
public static void removeCustomResourceInfo() {
40+
public static void removeResourceInfo() {
4141
MDC.remove(API_VERSION);
4242
MDC.remove(KIND);
4343
MDC.remove(NAME);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ResourceCache.java

Lines changed: 0 additions & 12 deletions
This file was deleted.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
2121
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2222
import io.javaoperatorsdk.operator.processing.MDCUtils;
23-
import io.javaoperatorsdk.operator.processing.ResourceCache;
2423
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
24+
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
2525
import io.javaoperatorsdk.operator.processing.event.source.ResourceEvent;
2626
import io.javaoperatorsdk.operator.processing.event.source.TimerEventSource;
2727
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
@@ -54,7 +54,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
5454

5555
EventProcessor(EventSourceManager<R> eventSourceManager) {
5656
this(
57-
eventSourceManager.getControllerResourceEventSource(),
57+
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
5858
ExecutorServiceManager.instance().executorService(),
5959
eventSourceManager.getController().getConfiguration().getName(),
6060
new ReconciliationDispatcher<>(eventSourceManager.getController()),
@@ -69,7 +69,8 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
6969
EventSourceManager<R> eventSourceManager,
7070
String relatedControllerName,
7171
Retry retry) {
72-
this(eventSourceManager.getControllerResourceEventSource(), null, relatedControllerName,
72+
this(eventSourceManager.getControllerResourceEventSource().getResourceCache(), null,
73+
relatedControllerName,
7374
reconciliationDispatcher, retry, null, eventSourceManager);
7475
}
7576

@@ -105,7 +106,7 @@ public void handleEvent(Event event) {
105106
return;
106107
}
107108
final var resourceID = event.getRelatedCustomResourceID();
108-
MDCUtils.addCustomResourceIDInfo(resourceID);
109+
MDCUtils.addResourceIDInfo(resourceID);
109110
metrics.receivedEvent(event);
110111

111112
handleEventMarking(event);
@@ -116,42 +117,35 @@ public void handleEvent(Event event) {
116117
}
117118
} finally {
118119
lock.unlock();
119-
MDCUtils.removeCustomResourceIDInfo();
120+
MDCUtils.removeResourceIDInfo();
120121
}
121122
}
122123

123-
private void submitReconciliationExecution(ResourceID customResourceUid) {
124+
private void submitReconciliationExecution(ResourceID resourceID) {
124125
try {
125-
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid);
126-
Optional<R> latestCustomResource =
127-
resourceCache.getCustomResource(customResourceUid);
128-
latestCustomResource.ifPresent(MDCUtils::addCustomResourceInfo);
129-
if (!controllerUnderExecution
130-
&& latestCustomResource.isPresent()) {
131-
setUnderExecutionProcessing(customResourceUid);
132-
final var retryInfo = retryInfo(customResourceUid);
133-
ExecutionScope<R> executionScope =
134-
new ExecutionScope<>(
135-
latestCustomResource.get(),
136-
retryInfo);
137-
eventMarker.unMarkEventReceived(customResourceUid);
138-
metrics.reconcileCustomResource(customResourceUid, retryInfo);
126+
boolean controllerUnderExecution = isControllerUnderExecution(resourceID);
127+
Optional<R> latest = resourceCache.get(resourceID);
128+
latest.ifPresent(MDCUtils::addResourceInfo);
129+
if (!controllerUnderExecution && latest.isPresent()) {
130+
setUnderExecutionProcessing(resourceID);
131+
final var retryInfo = retryInfo(resourceID);
132+
ExecutionScope<R> executionScope = new ExecutionScope<>(latest.get(), retryInfo);
133+
eventMarker.unMarkEventReceived(resourceID);
134+
metrics.reconcileCustomResource(resourceID, retryInfo);
139135
log.debug("Executing events for custom resource. Scope: {}", executionScope);
140136
executor.execute(new ControllerExecution(executionScope));
141137
} else {
142138
log.debug(
143-
"Skipping executing controller for resource id: {}."
144-
+ " Controller in execution: {}. Latest CustomResource present: {}",
145-
customResourceUid,
139+
"Skipping executing controller for resource id: {}. Controller in execution: {}. Latest Resource present: {}",
140+
resourceID,
146141
controllerUnderExecution,
147-
latestCustomResource.isPresent());
148-
if (latestCustomResource.isEmpty()) {
149-
log.warn("no custom resource found in cache for CustomResourceID: {}",
150-
customResourceUid);
142+
latest.isPresent());
143+
if (latest.isEmpty()) {
144+
log.warn("no custom resource found in cache for ResourceID: {}", resourceID);
151145
}
152146
}
153147
} finally {
154-
MDCUtils.removeCustomResourceInfo();
148+
MDCUtils.removeResourceInfo();
155149
}
156150
}
157151

@@ -227,7 +221,7 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> execution
227221
.orElseThrow(() -> new IllegalStateException(
228222
"Updated custom resource must be present at this point of time")));
229223
String cachedCustomResourceVersion = getVersion(resourceCache
230-
.getCustomResource(executionScope.getCustomResourceID())
224+
.get(executionScope.getCustomResourceID())
231225
.orElseThrow(() -> new IllegalStateException(
232226
"Cached custom resource must be present at this point")));
233227

@@ -357,15 +351,15 @@ public void run() {
357351
final var thread = Thread.currentThread();
358352
final var name = thread.getName();
359353
try {
360-
MDCUtils.addCustomResourceInfo(executionScope.getResource());
354+
MDCUtils.addResourceInfo(executionScope.getResource());
361355
thread.setName("EventHandler-" + controllerName);
362356
PostExecutionControl<R> postExecutionControl =
363357
reconciliationDispatcher.handleExecution(executionScope);
364358
eventProcessingFinished(executionScope, postExecutionControl);
365359
} finally {
366360
// restore original name
367361
thread.setName(name);
368-
MDCUtils.removeCustomResourceInfo();
362+
MDCUtils.removeResourceInfo();
369363
}
370364
}
371365

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package io.javaoperatorsdk.operator.processing.event.source;
2+
3+
import java.util.Map;
4+
import java.util.Optional;
5+
import java.util.function.Predicate;
6+
import java.util.stream.Stream;
7+
8+
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
10+
import io.fabric8.kubernetes.client.informers.cache.Cache;
11+
import io.javaoperatorsdk.operator.api.config.Cloner;
12+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
13+
14+
import static io.javaoperatorsdk.operator.processing.event.source.ControllerResourceEventSource.ANY_NAMESPACE_MAP_KEY;
15+
16+
public class ControllerResourceCache<T extends HasMetadata> implements ResourceCache<T> {
17+
18+
private final Map<String, SharedIndexInformer<T>> sharedIndexInformers;
19+
private final Cloner cloner;
20+
21+
public ControllerResourceCache(Map<String, SharedIndexInformer<T>> sharedIndexInformers,
22+
Cloner cloner) {
23+
this.sharedIndexInformers = sharedIndexInformers;
24+
this.cloner = cloner;
25+
}
26+
27+
@Override
28+
public Stream<T> list(Predicate<T> predicate) {
29+
return sharedIndexInformers.values().stream()
30+
.flatMap(i -> i.getStore().list().stream().filter(predicate));
31+
}
32+
33+
@Override
34+
public Stream<T> list(String namespace, Predicate<T> predicate) {
35+
if (isWatchingAllNamespaces()) {
36+
final var stream = sharedIndexInformers.get(ANY_NAMESPACE_MAP_KEY).getStore().list().stream()
37+
.filter(r -> r.getMetadata().getNamespace().equals(namespace));
38+
return predicate != null ? stream.filter(predicate) : stream;
39+
} else {
40+
final var informer = sharedIndexInformers.get(namespace);
41+
return informer != null ? informer.getStore().list().stream().filter(predicate)
42+
: Stream.empty();
43+
}
44+
}
45+
46+
@Override
47+
public Optional<T> get(ResourceID resourceID) {
48+
var sharedIndexInformer = sharedIndexInformers.get(ANY_NAMESPACE_MAP_KEY);
49+
if (sharedIndexInformer == null) {
50+
sharedIndexInformer =
51+
sharedIndexInformers.get(resourceID.getNamespace().orElse(ANY_NAMESPACE_MAP_KEY));
52+
}
53+
var resource = sharedIndexInformer.getStore()
54+
.getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null),
55+
resourceID.getName()));
56+
if (resource == null) {
57+
return Optional.empty();
58+
} else {
59+
return Optional.of(cloner.clone(resource));
60+
}
61+
}
62+
63+
private boolean isWatchingAllNamespaces() {
64+
return sharedIndexInformers.containsKey(ANY_NAMESPACE_MAP_KEY);
65+
}
66+
67+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ControllerResourceEventSource.java

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package io.javaoperatorsdk.operator.processing.event.source;
22

3-
import java.util.*;
3+
import java.util.Collections;
4+
import java.util.Map;
5+
import java.util.Objects;
6+
import java.util.Optional;
47
import java.util.concurrent.ConcurrentHashMap;
58

69
import org.slf4j.Logger;
@@ -12,13 +15,10 @@
1215
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
1316
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
1417
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
15-
import io.fabric8.kubernetes.client.informers.cache.Cache;
1618
import io.javaoperatorsdk.operator.MissingCRDException;
17-
import io.javaoperatorsdk.operator.api.config.Cloner;
1819
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
1920
import io.javaoperatorsdk.operator.processing.Controller;
2021
import io.javaoperatorsdk.operator.processing.MDCUtils;
21-
import io.javaoperatorsdk.operator.processing.ResourceCache;
2222
import io.javaoperatorsdk.operator.processing.event.ResourceID;
2323

2424
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
@@ -29,7 +29,7 @@
2929
* This is a special case since is not bound to a single custom resource
3030
*/
3131
public class ControllerResourceEventSource<T extends HasMetadata> extends AbstractEventSource
32-
implements ResourceEventHandler<T>, ResourceCache<T> {
32+
implements ResourceEventHandler<T> {
3333

3434
public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";
3535

@@ -41,11 +41,12 @@ public class ControllerResourceEventSource<T extends HasMetadata> extends Abstra
4141

4242
private final ResourceEventFilter<T> filter;
4343
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;
44-
private final Cloner cloner;
44+
private final ControllerResourceCache<T> cache;
4545

4646
public ControllerResourceEventSource(Controller<T> controller) {
4747
this.controller = controller;
48-
this.cloner = controller.getConfiguration().getConfigurationService().getResourceCloner();
48+
var cloner = controller.getConfiguration().getConfigurationService().getResourceCloner();
49+
this.cache = new ControllerResourceCache<>(sharedIndexInformers, cloner);
4950

5051
var filters = new ResourceEventFilter[] {
5152
ResourceEventFilters.finalizerNeededAndApplied(),
@@ -128,7 +129,7 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
128129
try {
129130
log.debug(
130131
"Event received for resource: {}", getName(customResource));
131-
MDCUtils.addCustomResourceInfo(customResource);
132+
MDCUtils.addResourceInfo(customResource);
132133
if (filter.acceptChange(controller.getConfiguration(), oldResource, customResource)) {
133134
eventHandler.handleEvent(
134135
new ResourceEvent(action, ResourceID.fromResource(customResource)));
@@ -139,7 +140,7 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
139140
getVersion(customResource));
140141
}
141142
} finally {
142-
MDCUtils.removeCustomResourceInfo();
143+
MDCUtils.removeResourceInfo();
143144
}
144145
}
145146

@@ -158,24 +159,13 @@ public void onDelete(T resource, boolean b) {
158159
eventReceived(ResourceAction.DELETED, resource, null);
159160
}
160161

161-
@Override
162-
public Optional<T> getCustomResource(ResourceID resourceID) {
163-
var sharedIndexInformer = sharedIndexInformers.get(ANY_NAMESPACE_MAP_KEY);
164-
if (sharedIndexInformer == null) {
165-
sharedIndexInformer =
166-
sharedIndexInformers.get(resourceID.getNamespace().orElse(ANY_NAMESPACE_MAP_KEY));
167-
}
168-
var resource = sharedIndexInformer.getStore()
169-
.getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null),
170-
resourceID.getName()));
171-
if (resource == null) {
172-
return Optional.empty();
173-
} else {
174-
return Optional.of(cloner.clone(resource));
175-
}
162+
public Optional<T> get(ResourceID resourceID) {
163+
return cache.get(resourceID);
176164
}
177165

178-
166+
public ControllerResourceCache<T> getResourceCache() {
167+
return cache;
168+
}
179169

180170
/**
181171
* @return shared informers by namespace. If custom resource is not namespace scoped use
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.javaoperatorsdk.operator.processing.event.source;
2+
3+
import java.util.Optional;
4+
import java.util.function.Predicate;
5+
import java.util.stream.Stream;
6+
7+
import io.fabric8.kubernetes.api.model.HasMetadata;
8+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
9+
10+
@SuppressWarnings({"rawtypes", "unchecked"})
11+
public interface ResourceCache<T extends HasMetadata> {
12+
Predicate TRUE = (a) -> true;
13+
14+
Optional<T> get(ResourceID resourceID);
15+
16+
default Stream<T> list() {
17+
return list(TRUE);
18+
}
19+
20+
Stream<T> list(Predicate<T> predicate);
21+
22+
default Stream<T> list(String namespace) {
23+
return list(namespace, TRUE);
24+
}
25+
26+
Stream<T> list(String namespace, Predicate<T> predicate);
27+
}

0 commit comments

Comments
 (0)