Skip to content

Commit 833d434

Browse files
committed
wip
Signed-off-by: Attila Mészáros <[email protected]>
1 parent 1cc04d7 commit 833d434

File tree

4 files changed

+83
-34
lines changed

4 files changed

+83
-34
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,14 @@ private void submitReconciliationExecution(ResourceState state) {
158158
}
159159
state.setUnderProcessing(true);
160160
final var latest = maybeLatest.orElseGet(() -> getResourceFromState(state));
161+
// passing the latest resources for a corner case when delete event received
162+
// during processing an event
161163
ExecutionScope<P> executionScope =
162164
new ExecutionScope<>(
163-
state.getRetry(), state.deleteEventPresent(), state.isDeleteFinalStateUnknown());
165+
latest,
166+
state.getRetry(),
167+
state.deleteEventPresent(),
168+
state.isDeleteFinalStateUnknown());
164169
state.unMarkEventReceived(triggerOnAllEvent());
165170
metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata);
166171
log.debug("Executing events for custom resource. Scope: {}", executionScope);
@@ -281,6 +286,7 @@ synchronized void eventProcessingFinished(
281286
metrics.cleanupDoneFor(resourceID, metricsMetadata);
282287
} else {
283288
if (state.eventPresent() || (triggerOnAllEvent() && state.deleteEventPresent())) {
289+
log.debug("Submitting for reconciliation.");
284290
submitReconciliationExecution(state);
285291
} else {
286292
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource());
@@ -484,25 +490,36 @@ public void run() {
484490
log.debug("Event processor not running skipping resource processing: {}", resourceID);
485491
return;
486492
}
493+
log.debug("Running reconcile executor for: {}", executionScope);
487494
// change thread name for easier debugging
488495
final var thread = Thread.currentThread();
489496
final var name = thread.getName();
490497
try {
498+
// we try to get the most up-to-date resource from cache
491499
var actualResource = cache.get(resourceID);
492500
if (actualResource.isEmpty()) {
493-
if (triggerOnAllEvent() && executionScope.isDeleteEvent()) {
501+
if (triggerOnAllEvent()) {
494502
log.debug(
495503
"Resource not found in the cache, checking for delete event resource: {}",
496504
resourceID);
497505
var state = resourceStateManager.get(resourceID);
498-
actualResource =
499-
(Optional<P>)
500-
state
501-
.filter(ResourceState::deleteEventPresent)
502-
.map(ResourceState::getLastKnownResource);
503-
if (actualResource.isEmpty()) {
504-
throw new IllegalStateException("This should not happen");
506+
if (executionScope.isDeleteEvent()) {
507+
actualResource =
508+
(Optional<P>)
509+
state
510+
.filter(ResourceState::deleteEventPresent)
511+
.map(ResourceState::getLastKnownResource);
512+
if (actualResource.isEmpty()) {
513+
throw new IllegalStateException("this should not happen");
514+
}
515+
} else {
516+
log.debug("Skipping execution since delete event received meanwhile");
517+
eventProcessingFinished(executionScope, PostExecutionControl.defaultDispatch());
518+
return;
505519
}
520+
} else {
521+
log.debug("Skipping execution; primary resource missing from cache: {}", resourceID);
522+
return;
506523
}
507524
}
508525
actualResource.ifPresent(executionScope::setResource);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ class ExecutionScope<R extends HasMetadata> {
1111
private boolean deleteEvent;
1212
private boolean isDeleteFinalStateUnknown;
1313

14-
ExecutionScope(RetryInfo retryInfo, boolean deleteEvent, boolean isDeleteFinalStateUnknown) {
14+
ExecutionScope(
15+
R resource, RetryInfo retryInfo, boolean deleteEvent, boolean isDeleteFinalStateUnknown) {
1516
this.retryInfo = retryInfo;
1617
this.deleteEvent = deleteEvent;
1718
this.isDeleteFinalStateUnknown = isDeleteFinalStateUnknown;
19+
this.resource = resource;
1820
}
1921

2022
public ExecutionScope<R> setResource(R resource) {
@@ -48,15 +50,16 @@ public void setDeleteFinalStateUnknown(boolean deleteFinalStateUnknown) {
4850

4951
@Override
5052
public String toString() {
51-
if (resource == null) {
52-
return "ExecutionScope{resource: null}";
53-
} else
54-
return "ExecutionScope{"
55-
+ " resource id: "
56-
+ ResourceID.fromResource(resource)
57-
+ ", version: "
58-
+ resource.getMetadata().getResourceVersion()
59-
+ '}';
53+
return "ExecutionScope{"
54+
+ "resource="
55+
+ ResourceID.fromResource(resource)
56+
+ ", retryInfo="
57+
+ retryInfo
58+
+ ", deleteEvent="
59+
+ deleteEvent
60+
+ ", isDeleteFinalStateUnknown="
61+
+ isDeleteFinalStateUnknown
62+
+ '}';
6063
}
6164

6265
public RetryInfo getRetryInfo() {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ void ifExecutionInProgressWaitsUntilItsFinished() {
132132
void schedulesAnEventRetryOnException() {
133133
TestCustomResource customResource = testCustomResource();
134134

135-
ExecutionScope executionScope = new ExecutionScope(null, false, false);
135+
ExecutionScope executionScope = new ExecutionScope(null, null, false, false);
136136
executionScope.setResource(customResource);
137137
PostExecutionControl postExecutionControl =
138138
PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));
@@ -274,7 +274,7 @@ void cancelScheduleOnceEventsOnSuccessfulExecution() {
274274
var cr = testCustomResource(crID);
275275

276276
eventProcessor.eventProcessingFinished(
277-
new ExecutionScope(null, false, false).setResource(cr),
277+
new ExecutionScope(null, null, false, false).setResource(cr),
278278
PostExecutionControl.defaultDispatch());
279279

280280
verify(retryTimerEventSourceMock, times(1)).cancelOnceSchedule(eq(crID));
@@ -329,7 +329,7 @@ void startProcessedMarkedEventReceivedBefore() {
329329
void notUpdatesEventSourceHandlerIfResourceUpdated() {
330330
TestCustomResource customResource = testCustomResource();
331331
ExecutionScope executionScope =
332-
new ExecutionScope(null, false, false).setResource(customResource);
332+
new ExecutionScope(null, null, false, false).setResource(customResource);
333333
PostExecutionControl postExecutionControl =
334334
PostExecutionControl.customResourceStatusPatched(customResource);
335335

@@ -343,7 +343,7 @@ void notReschedulesAfterTheFinalizerRemoveProcessed() {
343343
TestCustomResource customResource = testCustomResource();
344344
markForDeletion(customResource);
345345
ExecutionScope executionScope =
346-
new ExecutionScope(null, false, false).setResource(customResource);
346+
new ExecutionScope(null, null, false, false).setResource(customResource);
347347
PostExecutionControl postExecutionControl =
348348
PostExecutionControl.customResourceFinalizerRemoved(customResource);
349349

@@ -357,7 +357,7 @@ void skipEventProcessingIfFinalizerRemoveProcessed() {
357357
TestCustomResource customResource = testCustomResource();
358358
markForDeletion(customResource);
359359
ExecutionScope executionScope =
360-
new ExecutionScope(null, false, false).setResource(customResource);
360+
new ExecutionScope(null, null, false, false).setResource(customResource);
361361
PostExecutionControl postExecutionControl =
362362
PostExecutionControl.customResourceFinalizerRemoved(customResource);
363363

@@ -375,7 +375,7 @@ void newResourceAfterMissedDeleteEvent() {
375375
TestCustomResource customResource = testCustomResource();
376376
markForDeletion(customResource);
377377
ExecutionScope executionScope =
378-
new ExecutionScope(null, false, false).setResource(customResource);
378+
new ExecutionScope(null, null, false, false).setResource(customResource);
379379
PostExecutionControl postExecutionControl =
380380
PostExecutionControl.customResourceFinalizerRemoved(customResource);
381381
var newResource = testCustomResource();
@@ -412,7 +412,7 @@ void rateLimitsReconciliationSubmission() {
412412
void schedulesRetryForMarReconciliationInterval() {
413413
TestCustomResource customResource = testCustomResource();
414414
ExecutionScope executionScope =
415-
new ExecutionScope(null, false, false).setResource(customResource);
415+
new ExecutionScope(null, null, false, false).setResource(customResource);
416416
PostExecutionControl postExecutionControl = PostExecutionControl.defaultDispatch();
417417

418418
eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);
@@ -435,7 +435,7 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
435435
metricsMock));
436436
eventProcessorWithRetry.start();
437437
ExecutionScope executionScope =
438-
new ExecutionScope(null, false, false).setResource(testCustomResource());
438+
new ExecutionScope(null, null, false, false).setResource(testCustomResource());
439439
PostExecutionControl postExecutionControl =
440440
PostExecutionControl.exceptionDuringExecution(new RuntimeException());
441441
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
@@ -522,6 +522,7 @@ void triggerOnAllEventProcessesDeleteEvent() {
522522
verify(reconciliationDispatcherMock, times(2)).handleExecution(any());
523523
}
524524

525+
// this is a special corner case that needs special care
525526
@Test
526527
void triggerOnAllEventDeleteEventInstantlyAfterEvent() {
527528
var reconciliationDispatcherMock = mock(ReconciliationDispatcher.class);
@@ -540,6 +541,29 @@ void triggerOnAllEventDeleteEventInstantlyAfterEvent() {
540541
eventProcessor.handleEvent(prepareCREvent1());
541542
eventProcessor.handleEvent(prepareCRDeleteEvent1());
542543

544+
waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id());
545+
verify(reconciliationDispatcherMock, times(1)).handleExecution(any());
546+
}
547+
548+
@Test
549+
void triggerOnAllEventDeleteEventAfterEventProcessed() {
550+
var reconciliationDispatcherMock = mock(ReconciliationDispatcher.class);
551+
when(reconciliationDispatcherMock.handleExecution(any()))
552+
.thenReturn(PostExecutionControl.defaultDispatch());
553+
when(eventSourceManagerMock.retryEventSource()).thenReturn(mock(TimerEventSource.class));
554+
eventProcessor =
555+
spy(
556+
new EventProcessor(
557+
controllerConfigTriggerAllEvent(null, rateLimiterMock),
558+
reconciliationDispatcherMock,
559+
eventSourceManagerMock,
560+
null));
561+
eventProcessor.start();
562+
563+
eventProcessor.handleEvent(prepareCREvent1());
564+
waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id());
565+
566+
eventProcessor.handleEvent(prepareCRDeleteEvent1());
543567
waitUntilProcessingFinished(eventProcessor, TestUtils.testCustomResource1Id());
544568
verify(reconciliationDispatcherMock, times(2)).handleExecution(any());
545569
}
@@ -584,6 +608,9 @@ void passesResourceFromStateToDispatcher() {
584608
// check also last state unknown
585609
}
586610

611+
@Test
612+
void onAllEventRateLimiting() {}
613+
587614
private ResourceID eventAlreadyUnderProcessing() {
588615
when(reconciliationDispatcherMock.handleExecution(any()))
589616
.then(

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,7 @@ void propagatesRetryInfoToContextIfFinalizerSet() {
394394

395395
reconciliationDispatcher.handleExecution(
396396
new ExecutionScope(
397+
null,
397398
new RetryInfo() {
398399
@Override
399400
public int getAttemptCount() {
@@ -497,6 +498,7 @@ void callErrorStatusHandlerIfImplemented() {
497498

498499
reconciliationDispatcher.handleExecution(
499500
new ExecutionScope(
501+
null,
500502
new RetryInfo() {
501503
@Override
502504
public int getAttemptCount() {
@@ -532,7 +534,7 @@ void callErrorStatusHandlerEvenOnFirstError() {
532534

533535
var postExecControl =
534536
reconciliationDispatcher.handleExecution(
535-
new ExecutionScope(null, false, false).setResource(testCustomResource));
537+
new ExecutionScope(null, null, false, false).setResource(testCustomResource));
536538
verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any());
537539
verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any());
538540
assertThat(postExecControl.exceptionDuringExecution()).isTrue();
@@ -553,7 +555,7 @@ void errorHandlerCanInstructNoRetryWithUpdate() {
553555

554556
var postExecControl =
555557
reconciliationDispatcher.handleExecution(
556-
new ExecutionScope(null, false, false).setResource(testCustomResource));
558+
new ExecutionScope(null, null, false, false).setResource(testCustomResource));
557559

558560
verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any());
559561
verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any());
@@ -575,7 +577,7 @@ void errorHandlerCanInstructNoRetryNoUpdate() {
575577

576578
var postExecControl =
577579
reconciliationDispatcher.handleExecution(
578-
new ExecutionScope(null, false, false).setResource(testCustomResource));
580+
new ExecutionScope(null, null, false, false).setResource(testCustomResource));
579581

580582
verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any());
581583
verify(customResourceFacade, times(0)).patchStatus(eq(testCustomResource), any());
@@ -592,7 +594,7 @@ void errorStatusHandlerCanPatchResource() {
592594
reconciler.errorHandler = () -> ErrorStatusUpdateControl.patchStatus(testCustomResource);
593595

594596
reconciliationDispatcher.handleExecution(
595-
new ExecutionScope(null, false, false).setResource(testCustomResource));
597+
new ExecutionScope(null, null, false, false).setResource(testCustomResource));
596598

597599
verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any());
598600
verify(reconciler, times(1)).updateErrorStatus(eq(testCustomResource), any(), any());
@@ -615,7 +617,7 @@ void ifRetryLimitedToZeroMaxAttemptsErrorHandlerGetsCorrectLastAttempt() {
615617
reconciler.errorHandler = () -> ErrorStatusUpdateControl.noStatusUpdate();
616618

617619
reconciliationDispatcher.handleExecution(
618-
new ExecutionScope(null, false, false).setResource(testCustomResource));
620+
new ExecutionScope(null, null, false, false).setResource(testCustomResource));
619621

620622
verify(reconciler, times(1))
621623
.updateErrorStatus(
@@ -679,7 +681,7 @@ void reSchedulesFromErrorHandler() {
679681

680682
var res =
681683
reconciliationDispatcher.handleExecution(
682-
new ExecutionScope(null, false, false).setResource(testCustomResource));
684+
new ExecutionScope(null, null, false, false).setResource(testCustomResource));
683685

684686
assertThat(res.getReScheduleDelay()).contains(delay);
685687
assertThat(res.getRuntimeException()).isEmpty();
@@ -730,7 +732,7 @@ private void removeFinalizers(CustomResource customResource) {
730732
}
731733

732734
public <T extends HasMetadata> ExecutionScope<T> executionScopeWithCREvent(T resource) {
733-
return (ExecutionScope<T>) new ExecutionScope<>(null, false, false).setResource(resource);
735+
return (ExecutionScope<T>) new ExecutionScope<>(null, null, false, false).setResource(resource);
734736
}
735737

736738
private class TestReconciler

0 commit comments

Comments
 (0)