Skip to content

Commit f9aafb8

Browse files
committed
Revert "improve:"
This reverts commit faf9bdf.
1 parent faf9bdf commit f9aafb8

File tree

4 files changed

+58
-86
lines changed

4 files changed

+58
-86
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6565
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
6666
// we need direct control for the indexer to propagate the just update resource also to the index
6767
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
68+
private final TemporalPrimaryToSecondaryIndex<R> temporalPrimaryToSecondaryIndex;
6869
private final String id = UUID.randomUUID().toString();
6970

7071
public InformerEventSource(
@@ -98,13 +99,17 @@ private InformerEventSource(
9899
// If there is a primary to secondary mapper there is no need for primary to secondary index.
99100
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
100101
if (useSecondaryToPrimaryIndex()) {
102+
temporalPrimaryToSecondaryIndex =
103+
new DefaultTemporalPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
101104
addIndexers(
102105
Map.of(
103106
PRIMARY_TO_SECONDARY_INDEX_NAME,
104107
(R r) ->
105108
configuration.getSecondaryToPrimaryMapper().toPrimaryResourceIDs(r).stream()
106109
.map(InformerEventSource::resourceIdToString)
107110
.toList()));
111+
} else {
112+
temporalPrimaryToSecondaryIndex = NOOPTemporalPrimaryToSecondaryIndex.getInstance();
108113
}
109114

110115
final var informerConfig = configuration.getInformerConfig();
@@ -153,6 +158,7 @@ public void onDelete(R resource, boolean b) {
153158
ResourceID.fromResource(resource),
154159
resourceType().getSimpleName());
155160
}
161+
temporalPrimaryToSecondaryIndex.cleanupForResource(resource);
156162
super.onDelete(resource, b);
157163
if (acceptedByDeleteFilters(resource, b)) {
158164
propagateEvent(resource);
@@ -162,6 +168,7 @@ public void onDelete(R resource, boolean b) {
162168
private synchronized void onAddOrUpdate(
163169
Operation operation, R newObject, R oldObject, Runnable superOnOp) {
164170
var resourceID = ResourceID.fromResource(newObject);
171+
temporalPrimaryToSecondaryIndex.cleanupForResource(newObject);
165172
if (canSkipEvent(newObject, oldObject, resourceID)) {
166173
log.debug(
167174
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
@@ -253,10 +260,7 @@ public Set<R> getSecondaryResources(P primary) {
253260
// that we did not receive yet an event in the informer so the index would not
254261
// be updated. However, before reading it from temp IDs the event arrives and erases
255262
// the temp index. So in case of Add not id would be found.
256-
var temporalIds =
257-
temporaryResourceCache
258-
.getTemporalPrimaryToSecondaryIndex()
259-
.getSecondaryResources(primaryID);
263+
var temporalIds = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID);
260264
var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID));
261265

262266
log.debug(
@@ -309,6 +313,7 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
309313
}
310314

311315
private void handleRecentCreateOrUpdate(R newResource, R oldResource) {
316+
temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource);
312317
temporaryResourceCache.putResource(
313318
newResource,
314319
Optional.ofNullable(oldResource)
@@ -365,16 +370,4 @@ private enum Operation {
365370
private static String resourceIdToString(ResourceID resourceID) {
366371
return resourceID.getName() + "#" + resourceID.getNamespace().orElse("$na");
367372
}
368-
369-
@Override
370-
@SuppressWarnings({"unchecked", "rawtypes"})
371-
protected TemporaryResourceCache<R> temporaryResourceCache() {
372-
return new TemporaryResourceCache<>(
373-
this,
374-
useSecondaryToPrimaryIndex()
375-
? new DefaultTemporalPrimaryToSecondaryIndex(
376-
configuration().getSecondaryToPrimaryMapper())
377-
: NOOPTemporalPrimaryToSecondaryIndex.getInstance(),
378-
parseResourceVersions);
379-
}
380373
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public abstract class ManagedInformerEventSource<
4040

4141
private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class);
4242
private InformerManager<R, C> cache;
43-
protected final boolean parseResourceVersions;
43+
private final boolean parseResourceVersions;
4444
private ControllerConfiguration<R> controllerConfiguration;
4545
private final C configuration;
4646
private final Map<String, Function<R, List<String>>> indexers = new HashMap<>();
@@ -87,7 +87,7 @@ public synchronized void start() {
8787
if (isRunning()) {
8888
return;
8989
}
90-
temporaryResourceCache = temporaryResourceCache();
90+
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
9191
this.cache = new InformerManager<>(client, configuration, this);
9292
cache.setControllerConfiguration(controllerConfiguration);
9393
cache.addIndexers(indexers);
@@ -133,11 +133,6 @@ public Optional<R> get(ResourceID resourceID) {
133133
}
134134
}
135135

136-
protected TemporaryResourceCache temporaryResourceCache() {
137-
return new TemporaryResourceCache<>(
138-
this, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), parseResourceVersions);
139-
}
140-
141136
@SuppressWarnings("unused")
142137
public Optional<R> getCachedValue(ResourceID resourceID) {
143138
return get(resourceID);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 45 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,47 @@
3131
*/
3232
public class TemporaryResourceCache<T extends HasMetadata> {
3333

34+
static class ExpirationCache<K> {
35+
private final LinkedHashMap<K, Long> cache;
36+
private final int ttlMs;
37+
38+
public ExpirationCache(int maxEntries, int ttlMs) {
39+
this.ttlMs = ttlMs;
40+
this.cache =
41+
new LinkedHashMap<>() {
42+
@Override
43+
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
44+
return size() > maxEntries;
45+
}
46+
};
47+
}
48+
49+
public void add(K key) {
50+
clean();
51+
cache.putIfAbsent(key, System.currentTimeMillis());
52+
}
53+
54+
public boolean contains(K key) {
55+
clean();
56+
return cache.get(key) != null;
57+
}
58+
59+
void clean() {
60+
if (!cache.isEmpty()) {
61+
long currentTimeMillis = System.currentTimeMillis();
62+
var iter = cache.entrySet().iterator();
63+
// the order will already be from oldest to newest, clean a fixed number of entries to
64+
// amortize the cost amongst multiple calls
65+
for (int i = 0; i < 10 && iter.hasNext(); i++) {
66+
var entry = iter.next();
67+
if (currentTimeMillis - entry.getValue() > ttlMs) {
68+
iter.remove();
69+
}
70+
}
71+
}
72+
}
73+
}
74+
3475
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
3576

3677
private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();
@@ -40,15 +81,12 @@ public class TemporaryResourceCache<T extends HasMetadata> {
4081
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
4182
private final boolean parseResourceVersions;
4283
private final ExpirationCache<String> knownResourceVersions;
43-
private final TemporalPrimaryToSecondaryIndex<T> temporalPrimaryToSecondaryIndex;
4484

4585
public TemporaryResourceCache(
4686
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource,
47-
TemporalPrimaryToSecondaryIndex<T> temporalPrimaryToSecondaryIndex,
4887
boolean parseResourceVersions) {
4988
this.managedInformerEventSource = managedInformerEventSource;
5089
this.parseResourceVersions = parseResourceVersions;
51-
this.temporalPrimaryToSecondaryIndex = temporalPrimaryToSecondaryIndex;
5290
if (parseResourceVersions) {
5391
// keep up to the 50000 add/updates for up to 5 minutes
5492
knownResourceVersions = new ExpirationCache<>(50000, 600000);
@@ -67,14 +105,10 @@ public synchronized void onAddOrUpdateEvent(T resource) {
67105
}
68106

69107
synchronized void onEvent(T resource, boolean unknownState) {
70-
var res =
71-
cache.computeIfPresent(
72-
ResourceID.fromResource(resource),
73-
(id, cached) ->
74-
(unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached);
75-
if (res == null) {
76-
temporalPrimaryToSecondaryIndex.cleanupForResource(resource);
77-
}
108+
cache.computeIfPresent(
109+
ResourceID.fromResource(resource),
110+
(id, cached) ->
111+
(unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached);
78112
}
79113

80114
public synchronized void putAddedResource(T newResource) {
@@ -120,7 +154,6 @@ public synchronized void putResource(T newResource, String previousResourceVersi
120154
newResource.getMetadata().getResourceVersion(),
121155
resourceId);
122156
cache.put(resourceId, newResource);
123-
temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource);
124157
} else if (cache.remove(resourceId) != null) {
125158
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
126159
}
@@ -156,49 +189,4 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c
156189
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
157190
return Optional.ofNullable(cache.get(resourceID));
158191
}
159-
160-
public TemporalPrimaryToSecondaryIndex<T> getTemporalPrimaryToSecondaryIndex() {
161-
return temporalPrimaryToSecondaryIndex;
162-
}
163-
164-
static class ExpirationCache<K> {
165-
private final LinkedHashMap<K, Long> cache;
166-
private final int ttlMs;
167-
168-
public ExpirationCache(int maxEntries, int ttlMs) {
169-
this.ttlMs = ttlMs;
170-
this.cache =
171-
new LinkedHashMap<>() {
172-
@Override
173-
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
174-
return size() > maxEntries;
175-
}
176-
};
177-
}
178-
179-
public void add(K key) {
180-
clean();
181-
cache.putIfAbsent(key, System.currentTimeMillis());
182-
}
183-
184-
public boolean contains(K key) {
185-
clean();
186-
return cache.get(key) != null;
187-
}
188-
189-
void clean() {
190-
if (!cache.isEmpty()) {
191-
long currentTimeMillis = System.currentTimeMillis();
192-
var iter = cache.entrySet().iterator();
193-
// the order will already be from oldest to newest, clean a fixed number of entries to
194-
// amortize the cost amongst multiple calls
195-
for (int i = 0; i < 10 && iter.hasNext(); i++) {
196-
var entry = iter.next();
197-
if (currentTimeMillis - entry.getValue() > ttlMs) {
198-
iter.remove();
199-
}
200-
}
201-
}
202-
}
203-
}
204192
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ class TemporaryPrimaryResourceCacheTest {
3131
@BeforeEach
3232
void setup() {
3333
informerEventSource = mock(InformerEventSource.class);
34-
temporaryResourceCache =
35-
new TemporaryResourceCache<>(
36-
informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), false);
34+
temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false);
3735
}
3836

3937
@Test
@@ -96,9 +94,7 @@ void removesResourceFromCache() {
9694

9795
@Test
9896
void resourceVersionParsing() {
99-
this.temporaryResourceCache =
100-
new TemporaryResourceCache<>(
101-
informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), true);
97+
this.temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true);
10298

10399
assertThat(temporaryResourceCache.isKnownResourceVersion(testResource())).isFalse();
104100

0 commit comments

Comments
 (0)