Skip to content

Commit faf9bdf

Browse files
committed
improve:
Signed-off-by: Attila Mészáros <[email protected]>
1 parent 9e41f50 commit faf9bdf

File tree

4 files changed

+86
-58
lines changed

4 files changed

+86
-58
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6565
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
6666
// we need direct control for the indexer to propagate the just update resource also to the index
6767
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
68-
private final TemporalPrimaryToSecondaryIndex<R> temporalPrimaryToSecondaryIndex;
6968
private final String id = UUID.randomUUID().toString();
7069

7170
public InformerEventSource(
@@ -99,17 +98,13 @@ private InformerEventSource(
9998
// If there is a primary to secondary mapper there is no need for primary to secondary index.
10099
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
101100
if (useSecondaryToPrimaryIndex()) {
102-
temporalPrimaryToSecondaryIndex =
103-
new DefaultTemporalPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
104101
addIndexers(
105102
Map.of(
106103
PRIMARY_TO_SECONDARY_INDEX_NAME,
107104
(R r) ->
108105
configuration.getSecondaryToPrimaryMapper().toPrimaryResourceIDs(r).stream()
109106
.map(InformerEventSource::resourceIdToString)
110107
.toList()));
111-
} else {
112-
temporalPrimaryToSecondaryIndex = NOOPTemporalPrimaryToSecondaryIndex.getInstance();
113108
}
114109

115110
final var informerConfig = configuration.getInformerConfig();
@@ -158,7 +153,6 @@ public void onDelete(R resource, boolean b) {
158153
ResourceID.fromResource(resource),
159154
resourceType().getSimpleName());
160155
}
161-
temporalPrimaryToSecondaryIndex.cleanupForResource(resource);
162156
super.onDelete(resource, b);
163157
if (acceptedByDeleteFilters(resource, b)) {
164158
propagateEvent(resource);
@@ -168,7 +162,6 @@ public void onDelete(R resource, boolean b) {
168162
private synchronized void onAddOrUpdate(
169163
Operation operation, R newObject, R oldObject, Runnable superOnOp) {
170164
var resourceID = ResourceID.fromResource(newObject);
171-
temporalPrimaryToSecondaryIndex.cleanupForResource(newObject);
172165
if (canSkipEvent(newObject, oldObject, resourceID)) {
173166
log.debug(
174167
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
@@ -260,7 +253,10 @@ public Set<R> getSecondaryResources(P primary) {
260253
// that we did not receive yet an event in the informer so the index would not
261254
// be updated. However, before reading it from temp IDs the event arrives and erases
262255
// the temp index. So in case of Add not id would be found.
263-
var temporalIds = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID);
256+
var temporalIds =
257+
temporaryResourceCache
258+
.getTemporalPrimaryToSecondaryIndex()
259+
.getSecondaryResources(primaryID);
264260
var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID));
265261

266262
log.debug(
@@ -313,7 +309,6 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
313309
}
314310

315311
private void handleRecentCreateOrUpdate(R newResource, R oldResource) {
316-
temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource);
317312
temporaryResourceCache.putResource(
318313
newResource,
319314
Optional.ofNullable(oldResource)
@@ -370,4 +365,16 @@ private enum Operation {
370365
private static String resourceIdToString(ResourceID resourceID) {
371366
return resourceID.getName() + "#" + resourceID.getNamespace().orElse("$na");
372367
}
368+
369+
@Override
370+
@SuppressWarnings({"unchecked", "rawtypes"})
371+
protected TemporaryResourceCache<R> temporaryResourceCache() {
372+
return new TemporaryResourceCache<>(
373+
this,
374+
useSecondaryToPrimaryIndex()
375+
? new DefaultTemporalPrimaryToSecondaryIndex(
376+
configuration().getSecondaryToPrimaryMapper())
377+
: NOOPTemporalPrimaryToSecondaryIndex.getInstance(),
378+
parseResourceVersions);
379+
}
373380
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public abstract class ManagedInformerEventSource<
4040

4141
private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class);
4242
private InformerManager<R, C> cache;
43-
private final boolean parseResourceVersions;
43+
protected final boolean parseResourceVersions;
4444
private ControllerConfiguration<R> controllerConfiguration;
4545
private final C configuration;
4646
private final Map<String, Function<R, List<String>>> indexers = new HashMap<>();
@@ -87,7 +87,7 @@ public synchronized void start() {
8787
if (isRunning()) {
8888
return;
8989
}
90-
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
90+
temporaryResourceCache = temporaryResourceCache();
9191
this.cache = new InformerManager<>(client, configuration, this);
9292
cache.setControllerConfiguration(controllerConfiguration);
9393
cache.addIndexers(indexers);
@@ -133,6 +133,11 @@ public Optional<R> get(ResourceID resourceID) {
133133
}
134134
}
135135

136+
protected TemporaryResourceCache temporaryResourceCache() {
137+
return new TemporaryResourceCache<>(
138+
this, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), parseResourceVersions);
139+
}
140+
136141
@SuppressWarnings("unused")
137142
public Optional<R> getCachedValue(ResourceID resourceID) {
138143
return get(resourceID);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,47 +31,6 @@
3131
*/
3232
public class TemporaryResourceCache<T extends HasMetadata> {
3333

34-
static class ExpirationCache<K> {
35-
private final LinkedHashMap<K, Long> cache;
36-
private final int ttlMs;
37-
38-
public ExpirationCache(int maxEntries, int ttlMs) {
39-
this.ttlMs = ttlMs;
40-
this.cache =
41-
new LinkedHashMap<>() {
42-
@Override
43-
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
44-
return size() > maxEntries;
45-
}
46-
};
47-
}
48-
49-
public void add(K key) {
50-
clean();
51-
cache.putIfAbsent(key, System.currentTimeMillis());
52-
}
53-
54-
public boolean contains(K key) {
55-
clean();
56-
return cache.get(key) != null;
57-
}
58-
59-
void clean() {
60-
if (!cache.isEmpty()) {
61-
long currentTimeMillis = System.currentTimeMillis();
62-
var iter = cache.entrySet().iterator();
63-
// the order will already be from oldest to newest, clean a fixed number of entries to
64-
// amortize the cost amongst multiple calls
65-
for (int i = 0; i < 10 && iter.hasNext(); i++) {
66-
var entry = iter.next();
67-
if (currentTimeMillis - entry.getValue() > ttlMs) {
68-
iter.remove();
69-
}
70-
}
71-
}
72-
}
73-
}
74-
7534
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
7635

7736
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
@@ -81,12 +40,15 @@ void clean() {
8140
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
8241
private final boolean parseResourceVersions;
8342
private final ExpirationCache<String> knownResourceVersions;
43+
private final TemporalPrimaryToSecondaryIndex<T> temporalPrimaryToSecondaryIndex;
8444

8545
public TemporaryResourceCache(
8646
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource,
47+
TemporalPrimaryToSecondaryIndex<T> temporalPrimaryToSecondaryIndex,
8748
boolean parseResourceVersions) {
8849
this.managedInformerEventSource = managedInformerEventSource;
8950
this.parseResourceVersions = parseResourceVersions;
51+
this.temporalPrimaryToSecondaryIndex = temporalPrimaryToSecondaryIndex;
9052
if (parseResourceVersions) {
9153
// keep up to the 50000 add/updates for up to 5 minutes
9254
knownResourceVersions = new ExpirationCache<>(50000, 600000);
@@ -105,10 +67,14 @@ public synchronized void onAddOrUpdateEvent(T resource) {
10567
}
10668

10769
synchronized void onEvent(T resource, boolean unknownState) {
108-
cache.computeIfPresent(
109-
ResourceID.fromResource(resource),
110-
(id, cached) ->
111-
(unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached);
70+
var res =
71+
cache.computeIfPresent(
72+
ResourceID.fromResource(resource),
73+
(id, cached) ->
74+
(unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached);
75+
if (res == null) {
76+
temporalPrimaryToSecondaryIndex.cleanupForResource(resource);
77+
}
11278
}
11379

11480
public synchronized void putAddedResource(T newResource) {
@@ -154,6 +120,7 @@ public synchronized void putResource(T newResource, String previousResourceVersi
154120
newResource.getMetadata().getResourceVersion(),
155121
resourceId);
156122
cache.put(resourceId, newResource);
123+
temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource);
157124
} else if (cache.remove(resourceId) != null) {
158125
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
159126
}
@@ -189,4 +156,49 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c
189156
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
190157
return Optional.ofNullable(cache.get(resourceID));
191158
}
159+
160+
public TemporalPrimaryToSecondaryIndex<T> getTemporalPrimaryToSecondaryIndex() {
161+
return temporalPrimaryToSecondaryIndex;
162+
}
163+
164+
static class ExpirationCache<K> {
165+
private final LinkedHashMap<K, Long> cache;
166+
private final int ttlMs;
167+
168+
public ExpirationCache(int maxEntries, int ttlMs) {
169+
this.ttlMs = ttlMs;
170+
this.cache =
171+
new LinkedHashMap<>() {
172+
@Override
173+
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
174+
return size() > maxEntries;
175+
}
176+
};
177+
}
178+
179+
public void add(K key) {
180+
clean();
181+
cache.putIfAbsent(key, System.currentTimeMillis());
182+
}
183+
184+
public boolean contains(K key) {
185+
clean();
186+
return cache.get(key) != null;
187+
}
188+
189+
void clean() {
190+
if (!cache.isEmpty()) {
191+
long currentTimeMillis = System.currentTimeMillis();
192+
var iter = cache.entrySet().iterator();
193+
// the order will already be from oldest to newest, clean a fixed number of entries to
194+
// amortize the cost amongst multiple calls
195+
for (int i = 0; i < 10 && iter.hasNext(); i++) {
196+
var entry = iter.next();
197+
if (currentTimeMillis - entry.getValue() > ttlMs) {
198+
iter.remove();
199+
}
200+
}
201+
}
202+
}
203+
}
192204
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ class TemporaryPrimaryResourceCacheTest {
3131
@BeforeEach
3232
void setup() {
3333
informerEventSource = mock(InformerEventSource.class);
34-
temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false);
34+
temporaryResourceCache =
35+
new TemporaryResourceCache<>(
36+
informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), false);
3537
}
3638

3739
@Test
@@ -94,7 +96,9 @@ void removesResourceFromCache() {
9496

9597
@Test
9698
void resourceVersionParsing() {
97-
this.temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true);
99+
this.temporaryResourceCache =
100+
new TemporaryResourceCache<>(
101+
informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), true);
98102

99103
assertThat(temporaryResourceCache.isKnownResourceVersion(testResource())).isFalse();
100104

0 commit comments

Comments
 (0)