1
1
package io .javaoperatorsdk .operator .processing .event .source .informer ;
2
2
3
- import java .util .Map ;
4
3
import java .util .Optional ;
5
4
import java .util .Set ;
6
5
import java .util .UUID ;
@@ -59,13 +58,11 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
59
58
extends ManagedInformerEventSource <R , P , InformerEventSourceConfiguration <R >>
60
59
implements ResourceEventHandler <R > {
61
60
62
- public static final String PRIMARY_TO_SECONDARY_INDEX_NAME = "primaryToSecondary" ;
63
-
64
61
public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous" ;
65
62
private static final Logger log = LoggerFactory .getLogger (InformerEventSource .class );
66
63
// we need direct control for the indexer to propagate the just update resource also to the index
64
+ private final PrimaryToSecondaryIndex <R > primaryToSecondaryIndex ;
67
65
private final PrimaryToSecondaryMapper <P > primaryToSecondaryMapper ;
68
- private final TemporalPrimaryToSecondaryIndex <R > temporalPrimaryToSecondaryIndex ;
69
66
private final String id = UUID .randomUUID ().toString ();
70
67
71
68
public InformerEventSource (
@@ -99,17 +96,11 @@ private InformerEventSource(
99
96
// If there is a primary to secondary mapper there is no need for primary to secondary index.
100
97
primaryToSecondaryMapper = configuration .getPrimaryToSecondaryMapper ();
101
98
if (useSecondaryToPrimaryIndex ()) {
102
- temporalPrimaryToSecondaryIndex =
103
- new DefaultTemporalPrimaryToSecondaryIndex <>(configuration .getSecondaryToPrimaryMapper ());
104
- addIndexers (
105
- Map .of (
106
- PRIMARY_TO_SECONDARY_INDEX_NAME ,
107
- (R r ) ->
108
- configuration .getSecondaryToPrimaryMapper ().toPrimaryResourceIDs (r ).stream ()
109
- .map (InformerEventSource ::resourceIdToString )
110
- .toList ()));
99
+ primaryToSecondaryIndex =
100
+ // The index uses the secondary to primary mapper (always present) to build the index
101
+ new DefaultPrimaryToSecondaryIndex <>(configuration .getSecondaryToPrimaryMapper ());
111
102
} else {
112
- temporalPrimaryToSecondaryIndex = NOOPTemporalPrimaryToSecondaryIndex .getInstance ();
103
+ primaryToSecondaryIndex = NOOPPrimaryToSecondaryIndex .getInstance ();
113
104
}
114
105
115
106
final var informerConfig = configuration .getInformerConfig ();
@@ -128,6 +119,7 @@ public void onAdd(R newResource) {
128
119
resourceType ().getSimpleName (),
129
120
newResource .getMetadata ().getResourceVersion ());
130
121
}
122
+ primaryToSecondaryIndex .onAddOrUpdate (newResource );
131
123
onAddOrUpdate (
132
124
Operation .ADD , newResource , null , () -> InformerEventSource .super .onAdd (newResource ));
133
125
}
@@ -142,7 +134,7 @@ public void onUpdate(R oldObject, R newObject) {
142
134
newObject .getMetadata ().getResourceVersion (),
143
135
oldObject .getMetadata ().getResourceVersion ());
144
136
}
145
-
137
+ primaryToSecondaryIndex . onAddOrUpdate ( newObject );
146
138
onAddOrUpdate (
147
139
Operation .UPDATE ,
148
140
newObject ,
@@ -158,17 +150,23 @@ public void onDelete(R resource, boolean b) {
158
150
ResourceID .fromResource (resource ),
159
151
resourceType ().getSimpleName ());
160
152
}
161
- temporalPrimaryToSecondaryIndex . cleanupForResource (resource );
153
+ primaryToSecondaryIndex . onDelete (resource );
162
154
super .onDelete (resource , b );
163
155
if (acceptedByDeleteFilters (resource , b )) {
164
156
propagateEvent (resource );
165
157
}
166
158
}
167
159
160
+ @ Override
161
+ public synchronized void start () {
162
+ super .start ();
163
+ manager ().list ().forEach (primaryToSecondaryIndex ::onAddOrUpdate );
164
+ }
165
+
168
166
private synchronized void onAddOrUpdate (
169
167
Operation operation , R newObject , R oldObject , Runnable superOnOp ) {
170
168
var resourceID = ResourceID .fromResource (newObject );
171
- temporalPrimaryToSecondaryIndex . cleanupForResource ( newObject );
169
+
172
170
if (canSkipEvent (newObject , oldObject , resourceID )) {
173
171
log .debug (
174
172
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
@@ -252,68 +250,42 @@ private void propagateEvent(R object) {
252
250
253
251
@ Override
254
252
public Set <R > getSecondaryResources (P primary ) {
255
-
253
+ Set < ResourceID > secondaryIDs ;
256
254
if (useSecondaryToPrimaryIndex ()) {
257
- var primaryID = ResourceID .fromResource (primary );
258
- // Note that the order matter is these lines. This method is not synchronized
259
- // because of performance reasons. If it was in reverse order, it could happen
260
- // that we did not receive yet an event in the informer so the index would not
261
- // be updated. However, before reading it from temp IDs the event arrives and erases
262
- // the temp index. So in case of Add not id would be found.
263
- var temporalIds = temporalPrimaryToSecondaryIndex .getSecondaryResources (primaryID );
264
- var resources = byIndex (PRIMARY_TO_SECONDARY_INDEX_NAME , resourceIdToString (primaryID ));
265
-
255
+ var primaryResourceID = ResourceID .fromResource (primary );
256
+ secondaryIDs = primaryToSecondaryIndex .getSecondaryResources (primaryResourceID );
266
257
log .debug (
267
- "Using informer primary to secondary index to find secondary resources for primary name:"
268
- + " {} namespace: {}. Found number {}" ,
269
- primary .getMetadata ().getName (),
270
- primary .getMetadata ().getNamespace (),
271
- resources .size ());
272
-
273
- log .debug ("Complementary ids: {}" , temporalIds );
274
- var res =
275
- resources .stream ()
276
- .map (
277
- r -> {
278
- var resourceId = ResourceID .fromResource (r );
279
- Optional <R > resource = temporaryResourceCache .getResourceFromCache (resourceId );
280
- temporalIds .remove (resourceId );
281
- return resource .orElse (r );
282
- })
283
- .collect (Collectors .toSet ());
284
- temporalIds .forEach (
285
- id -> {
286
- Optional <R > resource = get (id );
287
- resource .ifPresentOrElse (res ::add , () -> log .warn ("Resource not found: {}" , id ));
288
- });
289
- return res ;
258
+ "Using PrimaryToSecondaryIndex to find secondary resources for primary: {}. Found"
259
+ + " secondary ids: {} " ,
260
+ primaryResourceID ,
261
+ secondaryIDs );
290
262
} else {
291
- Set < ResourceID > secondaryIDs = primaryToSecondaryMapper .toSecondaryResourceIDs (primary );
263
+ secondaryIDs = primaryToSecondaryMapper .toSecondaryResourceIDs (primary );
292
264
log .debug (
293
265
"Using PrimaryToSecondaryMapper to find secondary resources for primary: {}. Found"
294
266
+ " secondary ids: {} " ,
295
267
primary ,
296
268
secondaryIDs );
297
- return secondaryIDs .stream ()
298
- .map (this ::get )
299
- .flatMap (Optional ::stream )
300
- .collect (Collectors .toSet ());
301
269
}
270
+ return secondaryIDs .stream ()
271
+ .map (this ::get )
272
+ .flatMap (Optional ::stream )
273
+ .collect (Collectors .toSet ());
302
274
}
303
275
304
276
@ Override
305
277
public synchronized void handleRecentResourceUpdate (
306
278
ResourceID resourceID , R resource , R previousVersionOfResource ) {
307
- handleRecentCreateOrUpdate (resource , previousVersionOfResource );
279
+ handleRecentCreateOrUpdate (Operation . UPDATE , resource , previousVersionOfResource );
308
280
}
309
281
310
282
@ Override
311
283
public synchronized void handleRecentResourceCreate (ResourceID resourceID , R resource ) {
312
- handleRecentCreateOrUpdate (resource , null );
284
+ handleRecentCreateOrUpdate (Operation . ADD , resource , null );
313
285
}
314
286
315
- private void handleRecentCreateOrUpdate (R newResource , R oldResource ) {
316
- temporalPrimaryToSecondaryIndex . explicitAddOrUpdate (newResource );
287
+ private void handleRecentCreateOrUpdate (Operation operation , R newResource , R oldResource ) {
288
+ primaryToSecondaryIndex . onAddOrUpdate (newResource );
317
289
temporaryResourceCache .putResource (
318
290
newResource ,
319
291
Optional .ofNullable (oldResource )
@@ -366,8 +338,4 @@ private enum Operation {
366
338
ADD ,
367
339
UPDATE
368
340
}
369
-
370
- private static String resourceIdToString (ResourceID resourceID ) {
371
- return resourceID .getName () + "#" + resourceID .getNamespace ().orElse ("$na" );
372
- }
373
341
}
0 commit comments