Skip to content

Commit 1501a6e

Browse files
committed
simplifications and refinements to temporaryresource cache
Signed-off-by: Steve Hawkins <[email protected]>
1 parent 9c9d44a commit 1501a6e

File tree

6 files changed

+77
-62
lines changed

6 files changed

+77
-62
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
3232
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
3333
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
34+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3435

3536
import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
3637
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
@@ -138,15 +139,19 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso
138139

139140
@Override
140141
public void onAdd(T resource) {
141-
var obsoleteResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(resource);
142-
handleEvent(ResourceAction.ADDED, resource, null, null, obsoleteResourceVersion);
142+
var handling = temporaryResourceCache.onAddOrUpdateEvent(resource);
143+
handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW);
143144
}
144145

145146
@Override
146147
public void onUpdate(T oldCustomResource, T newCustomResource) {
147-
var knownResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
148+
var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
148149
handleEvent(
149-
ResourceAction.UPDATED, newCustomResource, oldCustomResource, null, knownResourceVersion);
150+
ResourceAction.UPDATED,
151+
newCustomResource,
152+
oldCustomResource,
153+
null,
154+
handling != EventHandling.NEW);
150155
}
151156

152157
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,36 @@
1717

1818
import java.util.Optional;
1919

20+
import io.javaoperatorsdk.operator.api.reconciler.ReconcileUtils;
2021
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
2122

2223
class EventFilterDetails {
2324

2425
private int activeUpdates = 0;
2526
private ResourceEvent lastEvent;
26-
private int lastUpdatedResourceVersion = -1;
27-
28-
public int getActiveUpdates() {
29-
return activeUpdates;
30-
}
3127

3228
public void increaseActiveUpdates() {
3329
activeUpdates = activeUpdates + 1;
3430
}
3531

36-
public void decreaseActiveUpdates() {
32+
public boolean decreaseActiveUpdates() {
3733
activeUpdates = activeUpdates - 1;
34+
return activeUpdates == 0;
3835
}
3936

4037
public void setLastEvent(ResourceEvent event) {
4138
lastEvent = event;
4239
}
4340

44-
public void setLastUpdatedResourceVersion(String version) {
45-
var parsed = Integer.parseInt(version);
46-
if (parsed > lastUpdatedResourceVersion) {
47-
lastUpdatedResourceVersion = parsed;
48-
}
49-
}
50-
51-
public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent() {
52-
if (lastEvent == null) return Optional.empty();
53-
if (Integer.parseInt(lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion())
54-
> lastUpdatedResourceVersion) {
41+
public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent(String updatedResourceVersion) {
42+
if (lastEvent != null
43+
&& (updatedResourceVersion == null
44+
|| ReconcileUtils.validateAndCompareResourceVersions(
45+
lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(),
46+
updatedResourceVersion)
47+
> 0)) {
5548
return Optional.of(lastEvent);
56-
} else {
57-
return Optional.empty();
5849
}
59-
}
60-
61-
public boolean isFilteringDone() {
62-
return activeUpdates == 0;
50+
return Optional.empty();
6351
}
6452
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3434
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
3535
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
36+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3637

3738
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
3839

@@ -159,10 +160,12 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
159160
primaryToSecondaryIndex.onAddOrUpdate(newObject);
160161
var resourceID = ResourceID.fromResource(newObject);
161162

162-
if (temporaryResourceCache.onAddOrUpdateEvent(newObject)) {
163+
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject);
164+
165+
if (eventHandling != EventHandling.NEW) {
163166
log.debug(
164-
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
165-
+ " ID: {}",
167+
"{} event propagation for {}. Resource ID: {}",
168+
eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping",
166169
operation,
167170
ResourceID.fromResource(newObject));
168171
} else if (eventAcceptedByFilter(operation, newObject, oldObject)) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ public class TemporaryResourceCache<T extends HasMetadata> {
6161

6262
private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();
6363

64+
public enum EventHandling {
65+
DEFER,
66+
OBSOLETE,
67+
NEW
68+
}
69+
6470
public TemporaryResourceCache(boolean comparableResourceVersions) {
6571
this.comparableResourceVersions = comparableResourceVersions;
6672
}
@@ -79,16 +85,9 @@ public synchronized Optional<ResourceEvent> doneEventFilterModify(
7985
return Optional.empty();
8086
}
8187
var ed = activeUpdates.get(resourceID);
82-
ed.decreaseActiveUpdates();
83-
if (updatedResourceVersion != null) {
84-
ed.setLastUpdatedResourceVersion(updatedResourceVersion);
85-
}
86-
if (ed.getActiveUpdates() == 0) {
87-
var latestEventAfterUpdate = ed.getLatestEventAfterLastUpdateEvent();
88-
if (latestEventAfterUpdate.isPresent()) {
89-
activeUpdates.remove(resourceID);
90-
}
91-
return latestEventAfterUpdate;
88+
if (ed.decreaseActiveUpdates()) {
89+
activeUpdates.remove(resourceID);
90+
return ed.getLatestEventAfterLastUpdateEvent(updatedResourceVersion);
9291
} else {
9392
return Optional.empty();
9493
}
@@ -101,13 +100,13 @@ public void onDeleteEvent(T resource, boolean unknownState) {
101100
/**
102101
* @return true if the resourceVersion was obsolete
103102
*/
104-
public boolean onAddOrUpdateEvent(T resource) {
103+
public EventHandling onAddOrUpdateEvent(T resource) {
105104
return onEvent(resource, false, false);
106105
}
107106

108-
private synchronized boolean onEvent(T resource, boolean unknownState, boolean delete) {
107+
private synchronized EventHandling onEvent(T resource, boolean unknownState, boolean delete) {
109108
if (!comparableResourceVersions) {
110-
return false;
109+
return EventHandling.NEW;
111110
}
112111

113112
var resourceId = ResourceID.fromResource(resource);
@@ -121,7 +120,7 @@ private synchronized boolean onEvent(T resource, boolean unknownState, boolean d
121120
latestResourceVersion = resource.getMetadata().getResourceVersion();
122121
}
123122
var cached = cache.get(resourceId);
124-
boolean obsoleteEvent = false;
123+
EventHandling result = EventHandling.NEW;
125124
int comp = 0;
126125
if (cached != null) {
127126
comp = ReconcileUtils.validateAndCompareResourceVersions(resource, cached);
@@ -130,26 +129,21 @@ private synchronized boolean onEvent(T resource, boolean unknownState, boolean d
130129
// we propagate event only for our update or newer other can be discarded since we know we
131130
// will receive
132131
// additional event
133-
obsoleteEvent = false;
132+
result = comp == 0 ? EventHandling.OBSOLETE : EventHandling.NEW;
134133
} else {
135-
obsoleteEvent = true;
134+
result = EventHandling.OBSOLETE;
136135
}
137136
}
138137
var ed = activeUpdates.get(resourceId);
139-
if (ed != null) {
138+
if (ed != null && result != EventHandling.OBSOLETE) {
140139
ed.setLastEvent(
141140
delete
142141
? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState)
143142
: new ResourceEvent(
144143
ResourceAction.UPDATED, resourceId, resource)); // todo true action
145-
if (ed.isFilteringDone() && ed.getLatestEventAfterLastUpdateEvent().isPresent()) {
146-
activeUpdates.remove(resourceId);
147-
return false;
148-
} else {
149-
return true;
150-
}
144+
return EventHandling.DEFER;
151145
} else {
152-
return obsoleteEvent;
146+
return result;
153147
}
154148
}
155149

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.javaoperatorsdk.operator.processing.event.EventHandler;
3737
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3838
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
39+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3940
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
4041

4142
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
@@ -98,7 +99,7 @@ void skipsEventPropagation() {
9899
when(temporaryResourceCacheMock.getResourceFromCache(any()))
99100
.thenReturn(Optional.of(testDeployment()));
100101

101-
when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(true);
102+
when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.OBSOLETE);
102103

103104
informerEventSource.onAdd(testDeployment());
104105
informerEventSource.onUpdate(testDeployment(), testDeployment());
@@ -108,13 +109,15 @@ void skipsEventPropagation() {
108109

109110
@Test
110111
void processEventPropagationWithoutAnnotation() {
112+
when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW);
111113
informerEventSource.onUpdate(testDeployment(), testDeployment());
112114

113115
verify(eventHandlerMock, times(1)).handleEvent(any());
114116
}
115117

116118
@Test
117119
void processEventPropagationWithIncorrectAnnotation() {
120+
when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW);
118121
informerEventSource.onAdd(
119122
new DeploymentBuilder(testDeployment())
120123
.editMetadata()
@@ -131,6 +134,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
131134
cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION);
132135
when(temporaryResourceCacheMock.getResourceFromCache(any()))
133136
.thenReturn(Optional.of(cachedDeployment));
137+
when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW);
134138

135139
informerEventSource.onUpdate(cachedDeployment, testDeployment());
136140

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
2929
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
3030
import io.javaoperatorsdk.operator.processing.event.ResourceID;
31+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3132

3233
import static org.assertj.core.api.Assertions.assertThat;
3334

@@ -133,7 +134,7 @@ void lockedEventBeforePut() throws Exception {
133134
temporaryResourceCache.putResource(testResource);
134135
assertThat(result.isDone()).isFalse();
135136
temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "3");
136-
assertThat(result.get(10, TimeUnit.SECONDS)).isTrue();
137+
assertThat(result.get(10, TimeUnit.SECONDS)).isEqualTo(EventHandling.NEW);
137138
} finally {
138139
ex.shutdownNow();
139140
}
@@ -145,15 +146,15 @@ void putBeforeEvent() {
145146

146147
// first ensure an event is not known
147148
var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
148-
assertThat(result).isFalse();
149+
assertThat(result).isEqualTo(EventHandling.NEW);
149150

150151
var nextResource = testResource();
151152
nextResource.getMetadata().setResourceVersion("3");
152153
temporaryResourceCache.putResource(nextResource);
153154

154-
// the result is false since the put was not part of event filtering update
155+
// the result is obsolete
155156
result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
156-
assertThat(result).isFalse();
157+
assertThat(result).isEqualTo(EventHandling.OBSOLETE);
157158
}
158159

159160
@Test
@@ -162,7 +163,7 @@ void putBeforeEventWithEventFiltering() {
162163

163164
// first ensure an event is not known
164165
var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
165-
assertThat(result).isFalse();
166+
assertThat(result).isEqualTo(EventHandling.NEW);
166167

167168
var nextResource = testResource();
168169
nextResource.getMetadata().setResourceVersion("3");
@@ -172,9 +173,29 @@ void putBeforeEventWithEventFiltering() {
172173
temporaryResourceCache.putResource(nextResource);
173174
temporaryResourceCache.doneEventFilterModify(resourceId, "3");
174175

175-
// the result is false since the put was not part of event filtering update
176+
// the result is obsolete
176177
result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
177-
assertThat(result).isTrue();
178+
assertThat(result).isEqualTo(EventHandling.OBSOLETE);
179+
}
180+
181+
@Test
182+
void putAfterEventWithEventFiltering() {
183+
var testResource = testResource();
184+
185+
// first ensure an event is not known
186+
var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
187+
assertThat(result).isEqualTo(EventHandling.NEW);
188+
189+
var nextResource = testResource();
190+
nextResource.getMetadata().setResourceVersion("3");
191+
var resourceId = ResourceID.fromResource(testResource);
192+
193+
temporaryResourceCache.startEventFilteringModify(resourceId);
194+
result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
195+
// the result is deferred
196+
assertThat(result).isEqualTo(EventHandling.DEFER);
197+
temporaryResourceCache.putResource(nextResource);
198+
temporaryResourceCache.doneEventFilterModify(resourceId, "3");
178199
}
179200

180201
@Test

0 commit comments

Comments
 (0)