Skip to content

Commit eaa50d1

Browse files
committed
updating obsolete event handling
1 parent 571b94e commit eaa50d1

File tree

4 files changed

+55
-24
lines changed

4 files changed

+55
-24
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
3232
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
3333
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
34+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3435

3536
import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
3637
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
@@ -138,15 +139,15 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso
138139

139140
@Override
140141
public void onAdd(T resource) {
141-
var obsoleteResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(resource);
142-
handleEvent(ResourceAction.ADDED, resource, null, null, obsoleteResourceVersion);
142+
var handling = temporaryResourceCache.onAddOrUpdateEvent(resource);
143+
handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW);
143144
}
144145

145146
@Override
146147
public void onUpdate(T oldCustomResource, T newCustomResource) {
147-
var knownResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
148+
var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
148149
handleEvent(
149-
ResourceAction.UPDATED, newCustomResource, oldCustomResource, null, knownResourceVersion);
150+
ResourceAction.UPDATED, newCustomResource, oldCustomResource, null, handling != EventHandling.NEW);
150151
}
151152

152153
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3434
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
3535
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
36+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3637

3738
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
3839

@@ -159,10 +160,12 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
159160
primaryToSecondaryIndex.onAddOrUpdate(newObject);
160161
var resourceID = ResourceID.fromResource(newObject);
161162

162-
if (temporaryResourceCache.onAddOrUpdateEvent(newObject)) {
163+
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject);
164+
165+
if (eventHandling != EventHandling.NEW) {
163166
log.debug(
164-
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
165-
+ " ID: {}",
167+
"{} event propagation for {}. Resource ID: {}",
168+
eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping",
166169
operation,
167170
ResourceID.fromResource(newObject));
168171
} else if (eventAcceptedByFilter(operation, newObject, oldObject)) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ public class TemporaryResourceCache<T extends HasMetadata> {
6060
private String latestResourceVersion;
6161

6262
private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();
63+
64+
public enum EventHandling {
65+
DEFER,
66+
OBSOLETE,
67+
NEW
68+
}
6369

6470
public TemporaryResourceCache(boolean comparableResourceVersions) {
6571
this.comparableResourceVersions = comparableResourceVersions;
@@ -95,13 +101,13 @@ public void onDeleteEvent(T resource, boolean unknownState) {
95101
/**
96102
* @return true if the resourceVersion was obsolete
97103
*/
98-
public boolean onAddOrUpdateEvent(T resource) {
104+
public EventHandling onAddOrUpdateEvent(T resource) {
99105
return onEvent(resource, false, false);
100106
}
101107

102-
private synchronized boolean onEvent(T resource, boolean unknownState, boolean delete) {
108+
private synchronized EventHandling onEvent(T resource, boolean unknownState, boolean delete) {
103109
if (!comparableResourceVersions) {
104-
return false;
110+
return EventHandling.NEW;
105111
}
106112

107113
var resourceId = ResourceID.fromResource(resource);
@@ -115,30 +121,30 @@ private synchronized boolean onEvent(T resource, boolean unknownState, boolean d
115121
latestResourceVersion = resource.getMetadata().getResourceVersion();
116122
}
117123
var cached = cache.get(resourceId);
118-
boolean obsoleteEvent = false;
124+
EventHandling result = EventHandling.NEW;
119125
int comp = 0;
120126
if (cached != null) {
121127
comp = ReconcileUtils.validateAndCompareResourceVersions(resource, cached);
122-
if (comp > 0 || unknownState) {
128+
if (comp >= 0 || unknownState) {
123129
cache.remove(resourceId);
124130
// we propagate event only for our update or newer other can be discarded since we know we
125131
// will receive
126132
// additional event
127-
obsoleteEvent = false;
133+
result = comp == 0 ? EventHandling.OBSOLETE : EventHandling.NEW;
128134
} else {
129-
obsoleteEvent = true;
135+
result = EventHandling.OBSOLETE;
130136
}
131137
}
132138
var ed = activeUpdates.get(resourceId);
133-
if (ed != null) {
139+
if (ed != null && result != EventHandling.OBSOLETE) {
134140
ed.setLastEvent(
135141
delete
136142
? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState)
137143
: new ResourceEvent(
138144
ResourceAction.UPDATED, resourceId, resource)); // todo true action
139-
return true;
145+
return EventHandling.DEFER;
140146
} else {
141-
return obsoleteEvent;
147+
return result;
142148
}
143149
}
144150

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
2929
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
3030
import io.javaoperatorsdk.operator.processing.event.ResourceID;
31+
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
3132

3233
import static org.assertj.core.api.Assertions.assertThat;
3334

@@ -133,7 +134,7 @@ void lockedEventBeforePut() throws Exception {
133134
temporaryResourceCache.putResource(testResource);
134135
assertThat(result.isDone()).isFalse();
135136
temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "3");
136-
assertThat(result.get(10, TimeUnit.SECONDS)).isTrue();
137+
assertThat(result.get(10, TimeUnit.SECONDS)).isEqualTo(EventHandling.NEW);
137138
} finally {
138139
ex.shutdownNow();
139140
}
@@ -145,15 +146,15 @@ void putBeforeEvent() {
145146

146147
// first ensure an event is not known
147148
var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
148-
assertThat(result).isFalse();
149+
assertThat(result).isEqualTo(EventHandling.NEW);
149150

150151
var nextResource = testResource();
151152
nextResource.getMetadata().setResourceVersion("3");
152153
temporaryResourceCache.putResource(nextResource);
153154

154-
// now expect an event with the matching resourceVersion to be known after the put
155+
// the result is obsolete
155156
result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
156-
assertThat(result).isTrue();
157+
assertThat(result).isEqualTo(EventHandling.OBSOLETE);
157158
}
158159

159160
@Test
@@ -162,7 +163,7 @@ void putBeforeEventWithEventFiltering() {
162163

163164
// first ensure an event is not known
164165
var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
165-
assertThat(result).isFalse();
166+
assertThat(result).isEqualTo(EventHandling.NEW);
166167

167168
var nextResource = testResource();
168169
nextResource.getMetadata().setResourceVersion("3");
@@ -172,9 +173,29 @@ void putBeforeEventWithEventFiltering() {
172173
temporaryResourceCache.putResource(nextResource);
173174
temporaryResourceCache.doneEventFilterModify(resourceId, "3");
174175

175-
// the result is false since the put was not part of event filtering update
176+
// the result is obsolete
176177
result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
177-
assertThat(result).isTrue();
178+
assertThat(result).isEqualTo(EventHandling.OBSOLETE);
179+
}
180+
181+
@Test
182+
void putAfterEventWithEventFiltering() {
183+
var testResource = testResource();
184+
185+
// first ensure an event is not known
186+
var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
187+
assertThat(result).isEqualTo(EventHandling.NEW);
188+
189+
var nextResource = testResource();
190+
nextResource.getMetadata().setResourceVersion("3");
191+
var resourceId = ResourceID.fromResource(testResource);
192+
193+
temporaryResourceCache.startEventFilteringModify(resourceId);
194+
result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
195+
// the result is deferred
196+
assertThat(result).isEqualTo(EventHandling.DEFER);
197+
temporaryResourceCache.putResource(nextResource);
198+
temporaryResourceCache.doneEventFilterModify(resourceId, "3");
178199
}
179200

180201
@Test

0 commit comments

Comments
 (0)