Skip to content

Commit cd619fe

Browse files
committed
feat: ReconcileUtils for strongly consistent updates - alternative algortihm
Signed-off-by: Attila Mészáros <[email protected]>
1 parent bcd62f9 commit cd619fe

File tree

7 files changed

+201
-103
lines changed

7 files changed

+201
-103
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public synchronized void start() {
8181
}
8282
}
8383

84-
public synchronized void eventReceived(
84+
public synchronized void handleEvent(
8585
ResourceAction action,
8686
T resource,
8787
T oldResource,
@@ -138,13 +138,13 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso
138138
@Override
139139
public void onAdd(T resource) {
140140
var knownResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(resource);
141-
eventReceived(ResourceAction.ADDED, resource, null, null, knownResourceVersion);
141+
handleEvent(ResourceAction.ADDED, resource, null, null, knownResourceVersion);
142142
}
143143

144144
@Override
145145
public void onUpdate(T oldCustomResource, T newCustomResource) {
146146
var knownResourceVersion = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource);
147-
eventReceived(
147+
handleEvent(
148148
ResourceAction.UPDATED, newCustomResource, oldCustomResource, null, knownResourceVersion);
149149
}
150150

@@ -153,7 +153,7 @@ public void onDelete(T resource, boolean deletedFinalStateUnknown) {
153153
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
154154
// delete event is quite special here, that requires special care, since we clean up caches on
155155
// delete event.
156-
eventReceived(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown, false);
156+
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown, false);
157157
}
158158

159159
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.processing.event.source.informer;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.Optional;
21+
22+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
23+
24+
class EventFilterDetails {
25+
26+
// initially should be created during event filtering update
27+
28+
private int activeUpdates = 0;
29+
// todo might be just one
30+
private final List<ResourceEvent> events = new ArrayList<>();
31+
private int lastUpdatedResourceVersion = -1;
32+
33+
public int getActiveUpdates() {
34+
return activeUpdates;
35+
}
36+
37+
public void increaseActiveUpdates() {
38+
activeUpdates = activeUpdates + 1;
39+
}
40+
41+
public void decreaseActiveUpdates() {
42+
activeUpdates = activeUpdates - 1;
43+
}
44+
45+
public void recordEvent(ResourceEvent event) {
46+
events.add(event);
47+
}
48+
49+
public void setLastUpdatedResourceVersion(String version) {
50+
var parsed = Integer.parseInt(version);
51+
if (parsed > lastUpdatedResourceVersion) {
52+
lastUpdatedResourceVersion = parsed;
53+
}
54+
}
55+
56+
public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent() {
57+
if (events.isEmpty()) return Optional.empty();
58+
var latest = events.get(events.size() - 1);
59+
if (Integer.parseInt(latest.getResource().orElseThrow().getMetadata().getResourceVersion())
60+
> lastUpdatedResourceVersion) {
61+
return Optional.of(latest);
62+
} else {
63+
return Optional.empty();
64+
}
65+
}
66+
67+
public boolean isFilteringDone() {
68+
return activeUpdates == 0;
69+
}
70+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.javaoperatorsdk.operator.processing.event.EventHandler;
3333
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3434
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
35+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
3536

3637
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
3738

@@ -136,6 +137,16 @@ public synchronized void onDelete(R resource, boolean b) {
136137
}
137138
}
138139

140+
@Override
141+
public void handleEvent(
142+
ResourceAction action,
143+
R resource,
144+
R oldResource,
145+
Boolean deletedFinalStateUnknown,
146+
boolean filterEvent) {
147+
propagateEvent(resource);
148+
}
149+
139150
@Override
140151
public synchronized void start() {
141152
super.start();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import io.javaoperatorsdk.operator.health.Status;
4343
import io.javaoperatorsdk.operator.processing.event.ResourceID;
4444
import io.javaoperatorsdk.operator.processing.event.source.*;
45+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
46+
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
4547

4648
@SuppressWarnings("rawtypes")
4749
public abstract class ManagedInformerEventSource<
@@ -104,16 +106,36 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
104106
if (log.isDebugEnabled()) {
105107
log.debug("Update and cache: {}", id);
106108
}
109+
R updatedResource = null;
107110
try {
108111
temporaryResourceCache.startEventFilteringModify(id);
109-
var updated = updateMethod.apply(resourceToUpdate);
110-
handleRecentResourceUpdate(id, updated, resourceToUpdate);
111-
return updated;
112+
updatedResource = updateMethod.apply(resourceToUpdate);
113+
handleRecentResourceUpdate(id, updatedResource, resourceToUpdate);
114+
return updatedResource;
112115
} finally {
113-
temporaryResourceCache.doneEventFilterModify(id);
116+
var res =
117+
temporaryResourceCache.doneEventFilterModify(
118+
id,
119+
updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion());
120+
res.ifPresent(
121+
r ->
122+
handleEvent(
123+
r.getAction(),
124+
(R) r.getResource().orElseThrow(),
125+
null,
126+
!(r instanceof ResourceDeleteEvent)
127+
|| ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown(),
128+
false));
114129
}
115130
}
116131

132+
public abstract void handleEvent(
133+
ResourceAction action,
134+
R resource,
135+
R oldResource,
136+
Boolean deletedFinalStateUnknown,
137+
boolean filterEvent);
138+
117139
@SuppressWarnings("unchecked")
118140
@Override
119141
public synchronized void start() {

0 commit comments

Comments
 (0)