Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
Expand Down Expand Up @@ -138,15 +139,19 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso

@Override
public void onAdd(T resource) {
var obsoleteResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(resource);
handleEvent(ResourceAction.ADDED, resource, null, null, obsoleteResourceVersion);
var handling = temporaryResourceCache.onAddOrUpdateEvent(resource);
handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW);
}

@Override
public void onUpdate(T oldCustomResource, T newCustomResource) {
var knownResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
handleEvent(
ResourceAction.UPDATED, newCustomResource, oldCustomResource, null, knownResourceVersion);
ResourceAction.UPDATED,
newCustomResource,
oldCustomResource,
null,
handling != EventHandling.NEW);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,36 @@

import java.util.Optional;

import io.javaoperatorsdk.operator.api.reconciler.ReconcileUtils;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;

class EventFilterDetails {

private int activeUpdates = 0;
private ResourceEvent lastEvent;
private int lastUpdatedResourceVersion = -1;

public int getActiveUpdates() {
return activeUpdates;
}

public void increaseActiveUpdates() {
activeUpdates = activeUpdates + 1;
}

public void decreaseActiveUpdates() {
public boolean decreaseActiveUpdates() {
activeUpdates = activeUpdates - 1;
return activeUpdates == 0;
}

public void setLastEvent(ResourceEvent event) {
lastEvent = event;
}

public void setLastUpdatedResourceVersion(String version) {
var parsed = Integer.parseInt(version);
if (parsed > lastUpdatedResourceVersion) {
lastUpdatedResourceVersion = parsed;
}
}

public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent() {
if (lastEvent == null) return Optional.empty();
if (Integer.parseInt(lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion())
> lastUpdatedResourceVersion) {
public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent(String updatedResourceVersion) {
if (lastEvent != null
&& (updatedResourceVersion == null
|| ReconcileUtils.validateAndCompareResourceVersions(
lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(),
updatedResourceVersion)
> 0)) {
return Optional.of(lastEvent);
} else {
return Optional.empty();
}
}

public boolean isFilteringDone() {
return activeUpdates == 0;
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;

Expand Down Expand Up @@ -159,10 +160,12 @@ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldO
primaryToSecondaryIndex.onAddOrUpdate(newObject);
var resourceID = ResourceID.fromResource(newObject);

if (temporaryResourceCache.onAddOrUpdateEvent(newObject)) {
var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject);

if (eventHandling != EventHandling.NEW) {
log.debug(
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
+ " ID: {}",
"{} event propagation for {}. Resource ID: {}",
eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping",
operation,
ResourceID.fromResource(newObject));
} else if (eventAcceptedByFilter(operation, newObject, oldObject)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public class TemporaryResourceCache<T extends HasMetadata> {

private final Map<ResourceID, EventFilterDetails> activeUpdates = new HashMap<>();

public enum EventHandling {
DEFER,
OBSOLETE,
NEW
}

public TemporaryResourceCache(boolean comparableResourceVersions) {
this.comparableResourceVersions = comparableResourceVersions;
}
Expand All @@ -79,16 +85,9 @@ public synchronized Optional<ResourceEvent> doneEventFilterModify(
return Optional.empty();
}
var ed = activeUpdates.get(resourceID);
ed.decreaseActiveUpdates();
if (updatedResourceVersion != null) {
ed.setLastUpdatedResourceVersion(updatedResourceVersion);
}
if (ed.getActiveUpdates() == 0) {
var latestEventAfterUpdate = ed.getLatestEventAfterLastUpdateEvent();
if (latestEventAfterUpdate.isPresent()) {
activeUpdates.remove(resourceID);
}
return latestEventAfterUpdate;
if (ed.decreaseActiveUpdates()) {
activeUpdates.remove(resourceID);
return ed.getLatestEventAfterLastUpdateEvent(updatedResourceVersion);
} else {
return Optional.empty();
}
Expand All @@ -101,13 +100,13 @@ public void onDeleteEvent(T resource, boolean unknownState) {
/**
* @return true if the resourceVersion was obsolete
*/
public boolean onAddOrUpdateEvent(T resource) {
public EventHandling onAddOrUpdateEvent(T resource) {
return onEvent(resource, false, false);
}

private synchronized boolean onEvent(T resource, boolean unknownState, boolean delete) {
private synchronized EventHandling onEvent(T resource, boolean unknownState, boolean delete) {
if (!comparableResourceVersions) {
return false;
return EventHandling.NEW;
}

var resourceId = ResourceID.fromResource(resource);
Expand All @@ -121,7 +120,7 @@ private synchronized boolean onEvent(T resource, boolean unknownState, boolean d
latestResourceVersion = resource.getMetadata().getResourceVersion();
}
var cached = cache.get(resourceId);
boolean obsoleteEvent = false;
EventHandling result = EventHandling.NEW;
int comp = 0;
if (cached != null) {
comp = ReconcileUtils.validateAndCompareResourceVersions(resource, cached);
Expand All @@ -130,26 +129,21 @@ private synchronized boolean onEvent(T resource, boolean unknownState, boolean d
// we propagate event only for our update or newer other can be discarded since we know we
// will receive
// additional event
obsoleteEvent = false;
result = comp == 0 ? EventHandling.OBSOLETE : EventHandling.NEW;
} else {
obsoleteEvent = true;
result = EventHandling.OBSOLETE;
}
}
var ed = activeUpdates.get(resourceId);
if (ed != null) {
if (ed != null && result != EventHandling.OBSOLETE) {
ed.setLastEvent(
delete
? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState)
: new ResourceEvent(
ResourceAction.UPDATED, resourceId, resource)); // todo true action
if (ed.isFilteringDone() && ed.getLatestEventAfterLastUpdateEvent().isPresent()) {
activeUpdates.remove(resourceId);
return false;
} else {
return true;
}
return EventHandling.DEFER;
} else {
return obsoleteEvent;
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
Expand Down Expand Up @@ -98,7 +99,7 @@ void skipsEventPropagation() {
when(temporaryResourceCacheMock.getResourceFromCache(any()))
.thenReturn(Optional.of(testDeployment()));

when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(true);
when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.OBSOLETE);

informerEventSource.onAdd(testDeployment());
informerEventSource.onUpdate(testDeployment(), testDeployment());
Expand All @@ -108,13 +109,15 @@ void skipsEventPropagation() {

@Test
void processEventPropagationWithoutAnnotation() {
when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW);
informerEventSource.onUpdate(testDeployment(), testDeployment());

verify(eventHandlerMock, times(1)).handleEvent(any());
}

@Test
void processEventPropagationWithIncorrectAnnotation() {
when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW);
informerEventSource.onAdd(
new DeploymentBuilder(testDeployment())
.editMetadata()
Expand All @@ -131,6 +134,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION);
when(temporaryResourceCacheMock.getResourceFromCache(any()))
.thenReturn(Optional.of(cachedDeployment));
when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW);

informerEventSource.onUpdate(cachedDeployment, testDeployment());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -133,7 +134,7 @@ void lockedEventBeforePut() throws Exception {
temporaryResourceCache.putResource(testResource);
assertThat(result.isDone()).isFalse();
temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "3");
assertThat(result.get(10, TimeUnit.SECONDS)).isTrue();
assertThat(result.get(10, TimeUnit.SECONDS)).isEqualTo(EventHandling.NEW);
} finally {
ex.shutdownNow();
}
Expand All @@ -145,15 +146,15 @@ void putBeforeEvent() {

// first ensure an event is not known
var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
assertThat(result).isFalse();
assertThat(result).isEqualTo(EventHandling.NEW);

var nextResource = testResource();
nextResource.getMetadata().setResourceVersion("3");
temporaryResourceCache.putResource(nextResource);

// the result is false since the put was not part of event filtering update
// the result is obsolete
result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
assertThat(result).isFalse();
assertThat(result).isEqualTo(EventHandling.OBSOLETE);
}

@Test
Expand All @@ -162,7 +163,7 @@ void putBeforeEventWithEventFiltering() {

// first ensure an event is not known
var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
assertThat(result).isFalse();
assertThat(result).isEqualTo(EventHandling.NEW);

var nextResource = testResource();
nextResource.getMetadata().setResourceVersion("3");
Expand All @@ -172,9 +173,29 @@ void putBeforeEventWithEventFiltering() {
temporaryResourceCache.putResource(nextResource);
temporaryResourceCache.doneEventFilterModify(resourceId, "3");

// the result is false since the put was not part of event filtering update
// the result is obsolete
result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
assertThat(result).isTrue();
assertThat(result).isEqualTo(EventHandling.OBSOLETE);
}

@Test
void putAfterEventWithEventFiltering() {
var testResource = testResource();

// first ensure an event is not known
var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
assertThat(result).isEqualTo(EventHandling.NEW);

var nextResource = testResource();
nextResource.getMetadata().setResourceVersion("3");
var resourceId = ResourceID.fromResource(testResource);

temporaryResourceCache.startEventFilteringModify(resourceId);
result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
// the result is deferred
assertThat(result).isEqualTo(EventHandling.DEFER);
temporaryResourceCache.putResource(nextResource);
temporaryResourceCache.doneEventFilterModify(resourceId, "3");
}

@Test
Expand Down