Skip to content

feat: reconcile-all-event mode #2894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: next
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions notes.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- check that Cleaner interface is not present
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@ default String fieldManager() {
}

<C> C getConfigurationFor(DependentResourceSpec<?, P, C> spec);

default ControllerMode getMode() {
return ControllerMode.DEFAULT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

public enum ControllerMode {
DEFAULT,
RECONCILE_ALL_EVENT
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import io.javaoperatorsdk.operator.api.config.ControllerMode;
import io.javaoperatorsdk.operator.api.config.informer.Informer;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
Expand Down Expand Up @@ -77,4 +78,6 @@ MaxReconciliationInterval maxReconciliationInterval() default
* @return the name used as field manager for SSA operations
*/
String fieldManager() default CONTROLLER_NAME_AS_FIELD_MANAGER;

ControllerMode allEventMode() default ControllerMode.DEFAULT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ControllerMode;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
Expand Down Expand Up @@ -122,7 +123,7 @@ public synchronized void handleEvent(Event event) {
}

private void handleMarkedEventForResource(ResourceState state) {
if (state.deleteEventPresent()) {
if (state.deleteEventPresent() && !isAllEventMode()) {
cleanupForDeletedEvent(state.getId());
} else if (!state.processedMarkForDeletionPresent()) {
submitReconciliationExecution(state);
Expand Down Expand Up @@ -179,7 +180,7 @@ private void handleEventMarking(Event event, ResourceState state) {
if (event instanceof ResourceEvent resourceEvent) {
if (resourceEvent.getAction() == ResourceAction.DELETED) {
log.debug("Marking delete event received for: {}", relatedCustomResourceID);
state.markDeleteEventReceived();
state.markDeleteEventReceived(resourceEvent.getResource().orElseThrow());
} else {
if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) {
log.debug(
Expand Down Expand Up @@ -464,6 +465,13 @@ public void run() {
try {
var actualResource = cache.get(resourceID);
if (actualResource.isEmpty()) {
if (isAllEventMode()) {
var state = resourceStateManager.get(resourceID);
actualResource =
(Optional<P>)
state.filter(s -> s.deleteEventPresent()).map(s -> s.getLastKnownResource());
}

log.debug("Skipping execution; primary resource missing from cache: {}", resourceID);
return;
}
Expand Down Expand Up @@ -501,4 +509,8 @@ public synchronized boolean isUnderProcessing(ResourceID resourceID) {
public synchronized boolean isRunning() {
return running;
}

private boolean isAllEventMode() {
return controllerConfiguration.getMode() == ControllerMode.RECONCILE_ALL_EVENT;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.processing.event;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;

Expand Down Expand Up @@ -29,6 +30,7 @@ private enum EventingState {
private RetryExecution retry;
private EventingState eventing;
private RateLimitState rateLimit;
private HasMetadata lastKnownResource;

public ResourceState(ResourceID id) {
this.id = id;
Expand Down Expand Up @@ -63,8 +65,9 @@ public void setUnderProcessing(boolean underProcessing) {
this.underProcessing = underProcessing;
}

public void markDeleteEventReceived() {
public void markDeleteEventReceived(HasMetadata lastKnownResource) {
eventing = EventingState.DELETE_EVENT_PRESENT;
this.lastKnownResource = lastKnownResource;
}

public boolean deleteEventPresent() {
Expand Down Expand Up @@ -94,6 +97,10 @@ public boolean noEventPresent() {
return eventing == EventingState.NO_EVENT_PRESENT;
}

public HasMetadata getLastKnownResource() {
return lastKnownResource;
}

public void unMarkEventReceived() {
switch (eventing) {
case EVENT_PRESENT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -15,6 +16,10 @@ public ResourceState getOrCreate(ResourceID resourceID) {
return states.computeIfAbsent(resourceID, ResourceState::new);
}

public Optional<ResourceState> get(ResourceID resourceID) {
return Optional.ofNullable(states.get(resourceID));
}

public ResourceState remove(ResourceID resourceID) {
return states.remove(resourceID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public synchronized void start() {
}
}

public void eventReceived(ResourceAction action, T resource, T oldResource) {
public void eventReceived(
ResourceAction action, T resource, T oldResource, Boolean deletedFinalStateUnknown) {
try {
if (log.isDebugEnabled()) {
log.debug(
Expand All @@ -76,8 +77,18 @@ public void eventReceived(ResourceAction action, T resource, T oldResource) {
MDCUtils.addResourceInfo(resource);
controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource);
if (isAcceptedByFilters(action, resource, oldResource)) {
getEventHandler()
.handleEvent(new ResourceEvent(action, ResourceID.fromResource(resource), resource));
if (deletedFinalStateUnknown != null) {
getEventHandler()
.handleEvent(
new ResourceDeleteEvent(
action,
ResourceID.fromResource(resource),
resource,
deletedFinalStateUnknown));
} else {
getEventHandler()
.handleEvent(new ResourceEvent(action, ResourceID.fromResource(resource), resource));
}
} else {
log.debug("Skipping event handling resource {}", ResourceID.fromResource(resource));
}
Expand All @@ -103,19 +114,19 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso
@Override
public void onAdd(T resource) {
super.onAdd(resource);
eventReceived(ResourceAction.ADDED, resource, null);
eventReceived(ResourceAction.ADDED, resource, null, null);
}

@Override
public void onUpdate(T oldCustomResource, T newCustomResource) {
super.onUpdate(oldCustomResource, newCustomResource);
eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource);
eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource, null);
}

@Override
public void onDelete(T resource, boolean b) {
super.onDelete(resource, b);
eventReceived(ResourceAction.DELETED, resource, null);
eventReceived(ResourceAction.DELETED, resource, null, b);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.javaoperatorsdk.operator.processing.event.source.controller;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class ResourceDeleteEvent extends ResourceEvent {

private final boolean deletedFinalStateUnknown;

public ResourceDeleteEvent(
ResourceAction action,
ResourceID resourceID,
HasMetadata resource,
boolean deletedFinalStateUnknown) {
super(action, resourceID, resource);
this.deletedFinalStateUnknown = deletedFinalStateUnknown;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.javaoperatorsdk.operator.TestUtils;

import static org.assertj.core.api.Assertions.assertThat;

class ResourceStateManagerTest {
Expand Down Expand Up @@ -38,7 +40,7 @@ public void marksEvent() {

@Test
public void marksDeleteEvent() {
state.markDeleteEventReceived();
state.markDeleteEventReceived(TestUtils.testCustomResource());

assertThat(state.deleteEventPresent()).isTrue();
assertThat(state.eventPresent()).isFalse();
Expand All @@ -48,7 +50,7 @@ public void marksDeleteEvent() {
public void afterDeleteEventMarkEventIsNotRelevant() {
state.markEventReceived();

state.markDeleteEventReceived();
state.markDeleteEventReceived(TestUtils.testCustomResource());

assertThat(state.deleteEventPresent()).isTrue();
assertThat(state.eventPresent()).isFalse();
Expand All @@ -57,7 +59,7 @@ public void afterDeleteEventMarkEventIsNotRelevant() {
@Test
public void cleansUp() {
state.markEventReceived();
state.markDeleteEventReceived();
state.markDeleteEventReceived(TestUtils.testCustomResource());

manager.remove(sampleResourceID);

Expand All @@ -71,7 +73,7 @@ public void cannotMarkEventAfterDeleteEventReceived() {
Assertions.assertThrows(
IllegalStateException.class,
() -> {
state.markDeleteEventReceived();
state.markDeleteEventReceived(TestUtils.testCustomResource());
state.markEventReceived();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,35 @@ void skipsEventHandlingIfGenerationNotIncreased() {
TestCustomResource oldCustomResource = TestUtils.testCustomResource();
oldCustomResource.getMetadata().setFinalizers(List.of(FINALIZER));

source.eventReceived(ResourceAction.UPDATED, customResource, oldCustomResource);
source.eventReceived(ResourceAction.UPDATED, customResource, oldCustomResource, null);
verify(eventHandler, times(1)).handleEvent(any());

source.eventReceived(ResourceAction.UPDATED, customResource, customResource);
source.eventReceived(ResourceAction.UPDATED, customResource, customResource, null);
verify(eventHandler, times(1)).handleEvent(any());
}

@Test
void dontSkipEventHandlingIfMarkedForDeletion() {
TestCustomResource customResource1 = TestUtils.testCustomResource();

source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1);
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null);
verify(eventHandler, times(1)).handleEvent(any());

// mark for deletion
customResource1.getMetadata().setDeletionTimestamp(LocalDateTime.now().toString());
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1);
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null);
verify(eventHandler, times(2)).handleEvent(any());
}

@Test
void normalExecutionIfGenerationChanges() {
TestCustomResource customResource1 = TestUtils.testCustomResource();

source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1);
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null);
verify(eventHandler, times(1)).handleEvent(any());

customResource1.getMetadata().setGeneration(2L);
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1);
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null);
verify(eventHandler, times(2)).handleEvent(any());
}

Expand All @@ -92,18 +92,18 @@ void handlesAllEventIfNotGenerationAware() {

TestCustomResource customResource1 = TestUtils.testCustomResource();

source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1);
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null);
verify(eventHandler, times(1)).handleEvent(any());

source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1);
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null);
verify(eventHandler, times(2)).handleEvent(any());
}

@Test
void eventWithNoGenerationProcessedIfNoFinalizer() {
TestCustomResource customResource1 = TestUtils.testCustomResource();

source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1);
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null);

verify(eventHandler, times(1)).handleEvent(any());
}
Expand All @@ -112,7 +112,7 @@ void eventWithNoGenerationProcessedIfNoFinalizer() {
void callsBroadcastsOnResourceEvents() {
TestCustomResource customResource1 = TestUtils.testCustomResource();

source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1);
source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1, null);

verify(testController.getEventSourceManager(), times(1))
.broadcastOnResourceEvent(
Expand All @@ -128,8 +128,8 @@ void filtersOutEventsOnAddAndUpdate() {
source = new ControllerEventSource<>(new TestController(onAddFilter, onUpdatePredicate, null));
setUpSource(source, true, controllerConfig);

source.eventReceived(ResourceAction.ADDED, cr, null);
source.eventReceived(ResourceAction.UPDATED, cr, cr);
source.eventReceived(ResourceAction.ADDED, cr, null, null);
source.eventReceived(ResourceAction.UPDATED, cr, cr, null);

verify(eventHandler, never()).handleEvent(any());
}
Expand All @@ -141,9 +141,9 @@ void genericFilterFiltersOutAddUpdateAndDeleteEvents() {
source = new ControllerEventSource<>(new TestController(null, null, res -> false));
setUpSource(source, true, controllerConfig);

source.eventReceived(ResourceAction.ADDED, cr, null);
source.eventReceived(ResourceAction.UPDATED, cr, cr);
source.eventReceived(ResourceAction.DELETED, cr, cr);
source.eventReceived(ResourceAction.ADDED, cr, null, null);
source.eventReceived(ResourceAction.UPDATED, cr, cr, null);
source.eventReceived(ResourceAction.DELETED, cr, cr, true);

verify(eventHandler, never()).handleEvent(any());
}
Expand Down
Loading