Skip to content

Commit 95a19ad

Browse files
committed
complementary index
Signed-off-by: Attila Mészáros <[email protected]>
1 parent 18a6c39 commit 95a19ad

File tree

4 files changed

+93
-23
lines changed

4 files changed

+93
-23
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ComplementaryPrimaryToSecondaryIndex.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77

88
public interface ComplementaryPrimaryToSecondaryIndex<R extends HasMetadata> {
99

10-
void explicitAddOrUpdate(R resource);
10+
void explicitAdd(R resource);
1111

12-
void onCreateOrUpdateEvent(R resourceID);
12+
void cleanupForResource(R resourceID);
1313

1414
Set<ResourceID> getComplementarySecondaryResources(ResourceID primary);
1515
}
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,61 @@
11
package io.javaoperatorsdk.operator.processing.event.source.informer;
22

3+
import java.util.Collections;
4+
import java.util.HashSet;
35
import java.util.Set;
46
import java.util.concurrent.ConcurrentHashMap;
57

68
import io.fabric8.kubernetes.api.model.HasMetadata;
79
import io.javaoperatorsdk.operator.processing.event.ResourceID;
10+
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
811

912
public class DefaultComplementaryPrimaryToSecondaryIndex<R extends HasMetadata>
1013
implements ComplementaryPrimaryToSecondaryIndex<R> {
1114

1215
private final ConcurrentHashMap<ResourceID, Set<ResourceID>> index = new ConcurrentHashMap<>();
16+
private final SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
17+
18+
public DefaultComplementaryPrimaryToSecondaryIndex(
19+
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper) {
20+
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
21+
}
1322

1423
@Override
15-
public void explicitAddOrUpdate(R resource) {}
24+
public void explicitAdd(R resource) {
25+
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
26+
primaryResources.forEach(
27+
primaryResource -> {
28+
var resourceSet =
29+
index.computeIfAbsent(primaryResource, pr -> ConcurrentHashMap.newKeySet());
30+
resourceSet.add(ResourceID.fromResource(resource));
31+
});
32+
}
1633

1734
@Override
18-
public void onCreateOrUpdateEvent(R resourceID) {}
35+
public void cleanupForResource(R resource) {
36+
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
37+
primaryResources.forEach(
38+
primaryResource -> {
39+
var secondaryResources = index.get(primaryResource);
40+
// this can be null in just very special cases, like when the secondaryToPrimaryMapper is
41+
// changing dynamically. Like if a list of ResourceIDs mapped dynamically extended in the
42+
// mapper between the onAddOrUpdate and onDelete is called.
43+
if (secondaryResources != null) {
44+
secondaryResources.remove(ResourceID.fromResource(resource));
45+
if (secondaryResources.isEmpty()) {
46+
index.remove(primaryResource);
47+
}
48+
}
49+
});
50+
}
1951

2052
@Override
2153
public Set<ResourceID> getComplementarySecondaryResources(ResourceID primary) {
22-
return Set.of();
54+
var resourceIDs = index.get(primary);
55+
if (resourceIDs == null) {
56+
return Collections.emptySet();
57+
} else {
58+
return new HashSet<>(resourceIDs);
59+
}
2360
}
2461
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
6565
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
6666
// we need direct control for the indexer to propagate the just update resource also to the index
6767
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
68+
private final ComplementaryPrimaryToSecondaryIndex<R> complementaryPrimaryToSecondaryIndex;
6869
private final String id = UUID.randomUUID().toString();
6970

7071
public InformerEventSource(
@@ -98,13 +99,18 @@ private InformerEventSource(
9899
// If there is a primary to secondary mapper there is no need for primary to secondary index.
99100
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
100101
if (primaryToSecondaryMapper == null) {
102+
complementaryPrimaryToSecondaryIndex =
103+
new DefaultComplementaryPrimaryToSecondaryIndex<>(
104+
configuration.getSecondaryToPrimaryMapper());
101105
addIndexers(
102106
Map.of(
103107
PRIMARY_TO_SECONDARY_INDEX_NAME,
104108
(R r) ->
105109
configuration.getSecondaryToPrimaryMapper().toPrimaryResourceIDs(r).stream()
106110
.map(InformerEventSource::resourceIdToString)
107111
.toList()));
112+
} else {
113+
complementaryPrimaryToSecondaryIndex = new NOOPComplementaryPrimaryToSecondaryIndex();
108114
}
109115

110116
final var informerConfig = configuration.getInformerConfig();
@@ -123,7 +129,7 @@ public void onAdd(R newResource) {
123129
resourceType().getSimpleName(),
124130
newResource.getMetadata().getResourceVersion());
125131
}
126-
132+
complementaryPrimaryToSecondaryIndex.cleanupForResource(newResource);
127133
onAddOrUpdate(
128134
Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource));
129135
}
@@ -250,28 +256,33 @@ private void propagateEvent(R object) {
250256
public Set<R> getSecondaryResources(P primary) {
251257

252258
if (useSecondaryToPrimaryIndex()) {
253-
254-
var resources =
255-
byIndex(
256-
PRIMARY_TO_SECONDARY_INDEX_NAME,
257-
resourceIdToString(ResourceID.fromResource(primary)));
259+
var primaryID = ResourceID.fromResource(primary);
260+
var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, resourceIdToString(primaryID));
258261

259262
log.debug(
260263
"Using informer primary to secondary index to find secondary resources for primary name:"
261264
+ " {} namespace: {}. Found {}",
262265
primary.getMetadata().getName(),
263266
primary.getMetadata().getNamespace(),
264267
resources.size());
265-
266-
return resources.stream()
267-
.map(
268-
r -> {
269-
Optional<R> resource =
270-
temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(r));
271-
return resource.orElse(r);
272-
})
273-
.collect(Collectors.toSet());
274-
268+
var complementaryIds =
269+
complementaryPrimaryToSecondaryIndex.getComplementarySecondaryResources(primaryID);
270+
var res =
271+
resources.stream()
272+
.map(
273+
r -> {
274+
var resourceId = ResourceID.fromResource(r);
275+
Optional<R> resource = temporaryResourceCache.getResourceFromCache(resourceId);
276+
complementaryIds.remove(resourceId);
277+
return resource.orElse(r);
278+
})
279+
.collect(Collectors.toSet());
280+
complementaryIds.forEach(
281+
id -> {
282+
Optional<R> resource = temporaryResourceCache.getResourceFromCache(id);
283+
resource.ifPresent(res::add);
284+
});
285+
return res;
275286
} else {
276287
Set<ResourceID> secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary);
277288
log.debug(
@@ -298,8 +309,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
298309
}
299310

300311
private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
301-
// todo
302-
// primaryToSecondaryIndex.onAddOrUpdate(newResource);
312+
if (operation == Operation.ADD) {
313+
complementaryPrimaryToSecondaryIndex.explicitAdd(newResource);
314+
}
303315
temporaryResourceCache.putResource(
304316
newResource,
305317
Optional.ofNullable(oldResource)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.informer;
2+
3+
import java.util.Set;
4+
5+
import io.fabric8.kubernetes.api.model.HasMetadata;
6+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
7+
8+
public class NOOPComplementaryPrimaryToSecondaryIndex<R extends HasMetadata>
9+
implements ComplementaryPrimaryToSecondaryIndex<R> {
10+
11+
@Override
12+
public void explicitAdd(R resource) {}
13+
14+
@Override
15+
public void cleanupForResource(R resourceID) {}
16+
17+
@Override
18+
public Set<ResourceID> getComplementarySecondaryResources(ResourceID primary) {
19+
return Set.of();
20+
}
21+
}

0 commit comments

Comments
 (0)