1
1
package io .javaoperatorsdk .operator .processing .event .source .informer ;
2
2
3
+ import java .util .Map ;
3
4
import java .util .Optional ;
4
5
import java .util .Set ;
5
6
import java .util .UUID ;
@@ -58,10 +59,11 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
58
59
extends ManagedInformerEventSource <R , P , InformerEventSourceConfiguration <R >>
59
60
implements ResourceEventHandler <R > {
60
61
62
+ public static final String PRIMARY_TO_SECONDARY_INDEX_NAME = "primaryToSecondary" ;
63
+
61
64
public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous" ;
62
65
private static final Logger log = LoggerFactory .getLogger (InformerEventSource .class );
63
66
// we need direct control for the indexer to propagate the just update resource also to the index
64
- private final PrimaryToSecondaryIndex <R > primaryToSecondaryIndex ;
65
67
private final PrimaryToSecondaryMapper <P > primaryToSecondaryMapper ;
66
68
private final String id = UUID .randomUUID ().toString ();
67
69
@@ -96,11 +98,13 @@ private InformerEventSource(
96
98
// If there is a primary to secondary mapper there is no need for primary to secondary index.
97
99
primaryToSecondaryMapper = configuration .getPrimaryToSecondaryMapper ();
98
100
if (primaryToSecondaryMapper == null ) {
99
- primaryToSecondaryIndex =
100
- // The index uses the secondary to primary mapper (always present) to build the index
101
- new DefaultPrimaryToSecondaryIndex <>(configuration .getSecondaryToPrimaryMapper ());
102
- } else {
103
- primaryToSecondaryIndex = NOOPPrimaryToSecondaryIndex .getInstance ();
101
+ addIndexers (
102
+ Map .of (
103
+ PRIMARY_TO_SECONDARY_INDEX_NAME ,
104
+ (R r ) ->
105
+ configuration .getSecondaryToPrimaryMapper ().toPrimaryResourceIDs (r ).stream ()
106
+ .map (InformerEventSource ::resourceIdToString )
107
+ .toList ()));
104
108
}
105
109
106
110
final var informerConfig = configuration .getInformerConfig ();
@@ -119,7 +123,7 @@ public void onAdd(R newResource) {
119
123
resourceType ().getSimpleName (),
120
124
newResource .getMetadata ().getResourceVersion ());
121
125
}
122
- primaryToSecondaryIndex . onAddOrUpdate ( newResource );
126
+
123
127
onAddOrUpdate (
124
128
Operation .ADD , newResource , null , () -> InformerEventSource .super .onAdd (newResource ));
125
129
}
@@ -134,7 +138,7 @@ public void onUpdate(R oldObject, R newObject) {
134
138
newObject .getMetadata ().getResourceVersion (),
135
139
oldObject .getMetadata ().getResourceVersion ());
136
140
}
137
- primaryToSecondaryIndex . onAddOrUpdate ( newObject );
141
+
138
142
onAddOrUpdate (
139
143
Operation .UPDATE ,
140
144
newObject ,
@@ -150,7 +154,7 @@ public void onDelete(R resource, boolean b) {
150
154
ResourceID .fromResource (resource ),
151
155
resourceType ().getSimpleName ());
152
156
}
153
- primaryToSecondaryIndex . onDelete ( resource );
157
+
154
158
super .onDelete (resource , b );
155
159
if (acceptedByDeleteFilters (resource , b )) {
156
160
propagateEvent (resource );
@@ -244,27 +248,42 @@ private void propagateEvent(R object) {
244
248
245
249
@ Override
246
250
public Set <R > getSecondaryResources (P primary ) {
247
- Set < ResourceID > secondaryIDs ;
251
+
248
252
if (useSecondaryToPrimaryIndex ()) {
249
- var primaryResourceID = ResourceID .fromResource (primary );
250
- secondaryIDs = primaryToSecondaryIndex .getSecondaryResources (primaryResourceID );
253
+
254
+ var resources =
255
+ byIndex (
256
+ PRIMARY_TO_SECONDARY_INDEX_NAME ,
257
+ resourceIdToString (ResourceID .fromResource (primary )));
258
+
251
259
log .debug (
252
- "Using PrimaryToSecondaryIndex to find secondary resources for primary: {}. Found"
253
- + " secondary ids: {} " ,
254
- primaryResourceID ,
255
- secondaryIDs );
260
+ "Using informer primary to secondary index to find secondary resources for primary name:"
261
+ + " {} namespace: {}. Found {}" ,
262
+ primary .getMetadata ().getName (),
263
+ primary .getMetadata ().getNamespace (),
264
+ resources .size ());
265
+
266
+ return resources .stream ()
267
+ .map (
268
+ r -> {
269
+ Optional <R > resource =
270
+ temporaryResourceCache .getResourceFromCache (ResourceID .fromResource (r ));
271
+ return resource .orElse (r );
272
+ })
273
+ .collect (Collectors .toSet ());
274
+
256
275
} else {
257
- secondaryIDs = primaryToSecondaryMapper .toSecondaryResourceIDs (primary );
276
+ Set < ResourceID > secondaryIDs = primaryToSecondaryMapper .toSecondaryResourceIDs (primary );
258
277
log .debug (
259
278
"Using PrimaryToSecondaryMapper to find secondary resources for primary: {}. Found"
260
279
+ " secondary ids: {} " ,
261
280
primary ,
262
281
secondaryIDs );
282
+ return secondaryIDs .stream ()
283
+ .map (this ::get )
284
+ .flatMap (Optional ::stream )
285
+ .collect (Collectors .toSet ());
263
286
}
264
- return secondaryIDs .stream ()
265
- .map (this ::get )
266
- .flatMap (Optional ::stream )
267
- .collect (Collectors .toSet ());
268
287
}
269
288
270
289
@ Override
@@ -279,7 +298,8 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
279
298
}
280
299
281
300
private void handleRecentCreateOrUpdate (Operation operation , R newResource , R oldResource ) {
282
- primaryToSecondaryIndex .onAddOrUpdate (newResource );
301
+ // todo
302
+ // primaryToSecondaryIndex.onAddOrUpdate(newResource);
283
303
temporaryResourceCache .putResource (
284
304
newResource ,
285
305
Optional .ofNullable (oldResource )
@@ -332,4 +352,8 @@ private enum Operation {
332
352
ADD ,
333
353
UPDATE
334
354
}
355
+
356
+ private static String resourceIdToString (ResourceID resourceID ) {
357
+ return resourceID .getName () + "#" + resourceID .getNamespace ().orElse ("$na" );
358
+ }
335
359
}
0 commit comments