diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java similarity index 82% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndex.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java index a1a5a96d36..48a45dd489 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultTemporalPrimaryToSecondaryIndex.java @@ -7,17 +7,19 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; -class DefaultPrimaryToSecondaryIndex implements PrimaryToSecondaryIndex { +class DefaultTemporalPrimaryToSecondaryIndex + implements TemporalPrimaryToSecondaryIndex { private final SecondaryToPrimaryMapper secondaryToPrimaryMapper; private final Map> index = new HashMap<>(); - public DefaultPrimaryToSecondaryIndex(SecondaryToPrimaryMapper secondaryToPrimaryMapper) { + public DefaultTemporalPrimaryToSecondaryIndex( + SecondaryToPrimaryMapper secondaryToPrimaryMapper) { this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; } @Override - public synchronized void onAddOrUpdate(R resource) { + public synchronized void explicitAddOrUpdate(R resource) { Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); primaryResources.forEach( primaryResource -> { @@ -28,7 +30,7 @@ public synchronized void onAddOrUpdate(R resource) { } @Override - public synchronized void onDelete(R resource) { + public synchronized void cleanupForResource(R resource) { Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); primaryResources.forEach( primaryResource -> { @@ -51,7 +53,7 @@ public synchronized Set getSecondaryResources(ResourceID primary) { if (resourceIDs == null) { return Collections.emptySet(); } else { - return Collections.unmodifiableSet(resourceIDs); + return new HashSet<>(resourceIDs); } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index b52dc278f2..f91594beb4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -58,10 +59,11 @@ public class InformerEventSource extends ManagedInformerEventSource> implements ResourceEventHandler { + public static final String PRIMARY_TO_SECONDARY_INDEX_NAME = "primaryToSecondary"; + public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous"; private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); // we need direct control for the indexer to propagate the just update resource also to the index - private final PrimaryToSecondaryIndex primaryToSecondaryIndex; private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; private final String id = UUID.randomUUID().toString(); @@ -95,12 +97,14 @@ private InformerEventSource( parseResourceVersions); // If there is a primary to secondary mapper there is no need for primary to secondary index. primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); - if (primaryToSecondaryMapper == null) { - primaryToSecondaryIndex = - // The index uses the secondary to primary mapper (always present) to build the index - new DefaultPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper()); - } else { - primaryToSecondaryIndex = NOOPPrimaryToSecondaryIndex.getInstance(); + if (useSecondaryToPrimaryIndex()) { + addIndexers( + Map.of( + PRIMARY_TO_SECONDARY_INDEX_NAME, + (R r) -> + configuration.getSecondaryToPrimaryMapper().toPrimaryResourceIDs(r).stream() + .map(InformerEventSource::resourceIdToString) + .toList())); } final var informerConfig = configuration.getInformerConfig(); @@ -119,7 +123,6 @@ public void onAdd(R newResource) { resourceType().getSimpleName(), newResource.getMetadata().getResourceVersion()); } - primaryToSecondaryIndex.onAddOrUpdate(newResource); onAddOrUpdate( Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource)); } @@ -134,7 +137,7 @@ public void onUpdate(R oldObject, R newObject) { newObject.getMetadata().getResourceVersion(), oldObject.getMetadata().getResourceVersion()); } - primaryToSecondaryIndex.onAddOrUpdate(newObject); + onAddOrUpdate( Operation.UPDATE, newObject, @@ -150,7 +153,6 @@ public void onDelete(R resource, boolean b) { ResourceID.fromResource(resource), resourceType().getSimpleName()); } - primaryToSecondaryIndex.onDelete(resource); super.onDelete(resource, b); if (acceptedByDeleteFilters(resource, b)) { propagateEvent(resource); @@ -160,7 +162,6 @@ public void onDelete(R resource, boolean b) { private synchronized void onAddOrUpdate( Operation operation, R newObject, R oldObject, Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); - if (canSkipEvent(newObject, oldObject, resourceID)) { log.debug( "Skipping event propagation for {}, since was a result of a reconcile action. Resource" @@ -244,42 +245,73 @@ private void propagateEvent(R object) { @Override public Set getSecondaryResources(P primary) { - Set secondaryIDs; + if (useSecondaryToPrimaryIndex()) { - var primaryResourceID = ResourceID.fromResource(primary); - secondaryIDs = primaryToSecondaryIndex.getSecondaryResources(primaryResourceID); + var primaryID = ResourceID.fromResource(primary); + // Note that the order matter is these lines. This method is not synchronized + // because of performance reasons. If it was in reverse order, it could happen + // that we did not receive yet an event in the informer so the index would not + // be updated. However, before reading it from temp IDs the event arrives and erases + // the temp index. So in case of Add not id would be found. + var sid = resourceIdToString(primaryID); + var temporalIds = + temporaryResourceCache + .getTemporalPrimaryToSecondaryIndex() + .getSecondaryResources(primaryID); + var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, sid); + log.debug( - "Using PrimaryToSecondaryIndex to find secondary resources for primary: {}. Found" - + " secondary ids: {} ", - primaryResourceID, - secondaryIDs); + "Using informer primary to secondary index to find secondary resources for primary name:" + + " {} namespace: {}. Found number {}, String id: {}. All resources: {}", + primary.getMetadata().getName(), + primary.getMetadata().getNamespace(), + resources.size(), + sid, + manager().list().map(ResourceID::fromResource).toList()); + + log.debug("Complementary ids: {}", temporalIds); + var res = + resources.stream() + .map( + r -> { + var resourceId = ResourceID.fromResource(r); + temporalIds.remove(resourceId); + Optional resource = temporaryResourceCache.getResourceFromCache(resourceId); + return resource.orElse(r); + }) + .collect(Collectors.toSet()); + temporalIds.forEach( + id -> { + Optional resource = get(id); + resource.ifPresentOrElse(res::add, () -> log.warn("Resource not found: {}", id)); + }); + return res; } else { - secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary); + Set secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary); log.debug( "Using PrimaryToSecondaryMapper to find secondary resources for primary: {}. Found" + " secondary ids: {} ", primary, secondaryIDs); + return secondaryIDs.stream() + .map(this::get) + .flatMap(Optional::stream) + .collect(Collectors.toSet()); } - return secondaryIDs.stream() - .map(this::get) - .flatMap(Optional::stream) - .collect(Collectors.toSet()); } @Override public synchronized void handleRecentResourceUpdate( ResourceID resourceID, R resource, R previousVersionOfResource) { - handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource); + handleRecentCreateOrUpdate(resource, previousVersionOfResource); } @Override public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { - handleRecentCreateOrUpdate(Operation.ADD, resource, null); + handleRecentCreateOrUpdate(resource, null); } - private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) { - primaryToSecondaryIndex.onAddOrUpdate(newResource); + private void handleRecentCreateOrUpdate(R newResource, R oldResource) { temporaryResourceCache.putResource( newResource, Optional.ofNullable(oldResource) @@ -332,4 +364,20 @@ private enum Operation { ADD, UPDATE } + + private static String resourceIdToString(ResourceID resourceID) { + return resourceID.getName() + "#" + resourceID.getNamespace().orElse("$na"); + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + protected TemporaryResourceCache temporaryResourceCache() { + return new TemporaryResourceCache<>( + this, + useSecondaryToPrimaryIndex() + ? new DefaultTemporalPrimaryToSecondaryIndex( + configuration().getSecondaryToPrimaryMapper()) + : NOOPTemporalPrimaryToSecondaryIndex.getInstance(), + parseResourceVersions); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 549d2236cd..63318cd04e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -40,7 +40,7 @@ public abstract class ManagedInformerEventSource< private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); private InformerManager cache; - private final boolean parseResourceVersions; + protected final boolean parseResourceVersions; private ControllerConfiguration controllerConfiguration; private final C configuration; private final Map>> indexers = new HashMap<>(); @@ -87,7 +87,7 @@ public synchronized void start() { if (isRunning()) { return; } - temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions); + temporaryResourceCache = temporaryResourceCache(); this.cache = new InformerManager<>(client, configuration, this); cache.setControllerConfiguration(controllerConfiguration); cache.addIndexers(indexers); @@ -133,6 +133,11 @@ public Optional get(ResourceID resourceID) { } } + protected TemporaryResourceCache temporaryResourceCache() { + return new TemporaryResourceCache<>( + this, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), parseResourceVersions); + } + @SuppressWarnings("unused") public Optional getCachedValue(ResourceID resourceID) { return get(resourceID); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPTemporalPrimaryToSecondaryIndex.java similarity index 54% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPPrimaryToSecondaryIndex.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPTemporalPrimaryToSecondaryIndex.java index abefbba638..bf837c7507 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPPrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPTemporalPrimaryToSecondaryIndex.java @@ -5,25 +5,27 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; -class NOOPPrimaryToSecondaryIndex implements PrimaryToSecondaryIndex { +class NOOPTemporalPrimaryToSecondaryIndex + implements TemporalPrimaryToSecondaryIndex { @SuppressWarnings("rawtypes") - private static final NOOPPrimaryToSecondaryIndex instance = new NOOPPrimaryToSecondaryIndex(); + private static final NOOPTemporalPrimaryToSecondaryIndex instance = + new NOOPTemporalPrimaryToSecondaryIndex(); @SuppressWarnings("unchecked") - public static NOOPPrimaryToSecondaryIndex getInstance() { + public static NOOPTemporalPrimaryToSecondaryIndex getInstance() { return instance; } - private NOOPPrimaryToSecondaryIndex() {} + private NOOPTemporalPrimaryToSecondaryIndex() {} @Override - public void onAddOrUpdate(R resource) { + public void explicitAddOrUpdate(R resource) { // empty method because of noop implementation } @Override - public void onDelete(R resource) { + public void cleanupForResource(R resource) { // empty method because of noop implementation } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndex.java similarity index 64% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndex.java index 7a87b23272..e6059d0983 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndex.java @@ -5,11 +5,11 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; -public interface PrimaryToSecondaryIndex { +public interface TemporalPrimaryToSecondaryIndex { - void onAddOrUpdate(R resource); + void explicitAddOrUpdate(R resource); - void onDelete(R resource); + void cleanupForResource(R resource); Set getSecondaryResources(ResourceID primary); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index af75a5abc4..f3af9a24b1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -31,47 +31,6 @@ */ public class TemporaryResourceCache { - static class ExpirationCache { - private final LinkedHashMap cache; - private final int ttlMs; - - public ExpirationCache(int maxEntries, int ttlMs) { - this.ttlMs = ttlMs; - this.cache = - new LinkedHashMap<>() { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > maxEntries; - } - }; - } - - public void add(K key) { - clean(); - cache.putIfAbsent(key, System.currentTimeMillis()); - } - - public boolean contains(K key) { - clean(); - return cache.get(key) != null; - } - - void clean() { - if (!cache.isEmpty()) { - long currentTimeMillis = System.currentTimeMillis(); - var iter = cache.entrySet().iterator(); - // the order will already be from oldest to newest, clean a fixed number of entries to - // amortize the cost amongst multiple calls - for (int i = 0; i < 10 && iter.hasNext(); i++) { - var entry = iter.next(); - if (currentTimeMillis - entry.getValue() > ttlMs) { - iter.remove(); - } - } - } - } - } - private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); private final Map cache = new ConcurrentHashMap<>(); @@ -81,12 +40,15 @@ void clean() { private final ManagedInformerEventSource managedInformerEventSource; private final boolean parseResourceVersions; private final ExpirationCache knownResourceVersions; + private final TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex; public TemporaryResourceCache( ManagedInformerEventSource managedInformerEventSource, + TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex, boolean parseResourceVersions) { this.managedInformerEventSource = managedInformerEventSource; this.parseResourceVersions = parseResourceVersions; + this.temporalPrimaryToSecondaryIndex = temporalPrimaryToSecondaryIndex; if (parseResourceVersions) { // keep up to the 50000 add/updates for up to 5 minutes knownResourceVersions = new ExpirationCache<>(50000, 600000); @@ -105,10 +67,18 @@ public synchronized void onAddOrUpdateEvent(T resource) { } synchronized void onEvent(T resource, boolean unknownState) { - cache.computeIfPresent( - ResourceID.fromResource(resource), - (id, cached) -> - (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached); + var res = + cache.computeIfPresent( + ResourceID.fromResource(resource), + (id, cached) -> + (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached); + if (res == null) { + log.debug( + "Cleaning up for resource name: {} ns: {} ", + resource.getMetadata().getName(), + resource.getMetadata().getNamespace()); + temporalPrimaryToSecondaryIndex.cleanupForResource(resource); + } } public synchronized void putAddedResource(T newResource) { @@ -141,7 +111,6 @@ public synchronized void putResource(T newResource, String previousResourceVersi // consider moveAhead = true; } - if (moveAhead || (cachedResource != null && (cachedResource @@ -153,6 +122,7 @@ public synchronized void putResource(T newResource, String previousResourceVersi "Temporarily moving ahead to target version {} for resource id: {}", newResource.getMetadata().getResourceVersion(), resourceId); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(newResource); cache.put(resourceId, newResource); } else if (cache.remove(resourceId) != null) { log.debug("Removed an obsolete resource from cache for id: {}", resourceId); @@ -189,4 +159,49 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } + + public TemporalPrimaryToSecondaryIndex getTemporalPrimaryToSecondaryIndex() { + return temporalPrimaryToSecondaryIndex; + } + + static class ExpirationCache { + private final LinkedHashMap cache; + private final int ttlMs; + + public ExpirationCache(int maxEntries, int ttlMs) { + this.ttlMs = ttlMs; + this.cache = + new LinkedHashMap<>() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxEntries; + } + }; + } + + public void add(K key) { + clean(); + cache.putIfAbsent(key, System.currentTimeMillis()); + } + + public boolean contains(K key) { + clean(); + return cache.get(key) != null; + } + + void clean() { + if (!cache.isEmpty()) { + long currentTimeMillis = System.currentTimeMillis(); + var iter = cache.entrySet().iterator(); + // the order will already be from oldest to newest, clean a fixed number of entries to + // amortize the cost amongst multiple calls + for (int i = 0; i < 10 && iter.hasNext(); i++) { + var entry = iter.next(); + if (currentTimeMillis - entry.getValue() > ttlMs) { + iter.remove(); + } + } + } + } + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndexTest.java similarity index 60% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndexTest.java index 7343b1e581..2cbce08c51 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporalPrimaryToSecondaryIndexTest.java @@ -15,14 +15,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class PrimaryToSecondaryIndexTest { +class TemporalPrimaryToSecondaryIndexTest { @SuppressWarnings("unchecked") private final SecondaryToPrimaryMapper secondaryToPrimaryMapperMock = mock(SecondaryToPrimaryMapper.class); - private final PrimaryToSecondaryIndex primaryToSecondaryIndex = - new DefaultPrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock); + private final TemporalPrimaryToSecondaryIndex temporalPrimaryToSecondaryIndex = + new DefaultTemporalPrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock); private final ResourceID primaryID1 = new ResourceID("id1", "default"); private final ResourceID primaryID2 = new ResourceID("id2", "default"); @@ -37,16 +37,17 @@ void setup() { @Test void returnsEmptySetOnEmptyIndex() { - var res = primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(secondary1)); + var res = + temporalPrimaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(secondary1)); assertThat(res).isEmpty(); } @Test void indexesNewResources() { - primaryToSecondaryIndex.onAddOrUpdate(secondary1); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary1); - var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + var secondaryResources1 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID1); + var secondaryResources2 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID2); assertThat(secondaryResources1).containsOnly(ResourceID.fromResource(secondary1)); assertThat(secondaryResources2).containsOnly(ResourceID.fromResource(secondary1)); @@ -54,11 +55,11 @@ void indexesNewResources() { @Test void indexesAdditionalResources() { - primaryToSecondaryIndex.onAddOrUpdate(secondary1); - primaryToSecondaryIndex.onAddOrUpdate(secondary2); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary1); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary2); - var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + var secondaryResources1 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID1); + var secondaryResources2 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID2); assertThat(secondaryResources1) .containsOnly(ResourceID.fromResource(secondary1), ResourceID.fromResource(secondary2)); @@ -68,20 +69,20 @@ void indexesAdditionalResources() { @Test void removingResourceFromIndex() { - primaryToSecondaryIndex.onAddOrUpdate(secondary1); - primaryToSecondaryIndex.onAddOrUpdate(secondary2); - primaryToSecondaryIndex.onDelete(secondary1); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary1); + temporalPrimaryToSecondaryIndex.explicitAddOrUpdate(secondary2); + temporalPrimaryToSecondaryIndex.cleanupForResource(secondary1); - var secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - var secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + var secondaryResources1 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID1); + var secondaryResources2 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID2); assertThat(secondaryResources1).containsOnly(ResourceID.fromResource(secondary2)); assertThat(secondaryResources2).containsOnly(ResourceID.fromResource(secondary2)); - primaryToSecondaryIndex.onDelete(secondary2); + temporalPrimaryToSecondaryIndex.cleanupForResource(secondary2); - secondaryResources1 = primaryToSecondaryIndex.getSecondaryResources(primaryID1); - secondaryResources2 = primaryToSecondaryIndex.getSecondaryResources(primaryID2); + secondaryResources1 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID1); + secondaryResources2 = temporalPrimaryToSecondaryIndex.getSecondaryResources(primaryID2); assertThat(secondaryResources1).isEmpty(); assertThat(secondaryResources2).isEmpty(); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index e62888832f..2e0f68308b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -31,7 +31,9 @@ class TemporaryPrimaryResourceCacheTest { @BeforeEach void setup() { informerEventSource = mock(InformerEventSource.class); - temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false); + temporaryResourceCache = + new TemporaryResourceCache<>( + informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), false); } @Test @@ -94,7 +96,9 @@ void removesResourceFromCache() { @Test void resourceVersionParsing() { - this.temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true); + this.temporaryResourceCache = + new TemporaryResourceCache<>( + informerEventSource, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), true); assertThat(temporaryResourceCache.isKnownResourceVersion(testResource())).isFalse(); diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java index 00bf7e8380..794bc11d9a 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java @@ -111,6 +111,10 @@ public T create(T resource) { return kubernetesClient.resource(resource).inNamespace(namespace).create(); } + public T serverSideApply(T resource) { + return kubernetesClient.resource(resource).inNamespace(namespace).serverSideApply(); + } + public T replace(T resource) { return kubernetesClient.resource(resource).inNamespace(namespace).replace(); } diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java index 3e6ad35e52..54cb57544d 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java @@ -54,6 +54,7 @@ public class LocallyRunOperatorExtension extends AbstractOperatorExtension { private final List> additionalCustomResourceDefinitions; private final Map registeredControllers; private final Map crdMappings; + private final Consumer beforeStartHook; private LocallyRunOperatorExtension( List reconcilers, @@ -68,7 +69,8 @@ private LocallyRunOperatorExtension( Consumer configurationServiceOverrider, Function namespaceNameSupplier, Function perClassNamespaceNameSupplier, - List additionalCrds) { + List additionalCrds, + Consumer beforeStartHook) { super( infrastructure, infrastructureTimeout, @@ -82,6 +84,7 @@ private LocallyRunOperatorExtension( this.portForwards = portForwards; this.localPortForwards = new ArrayList<>(portForwards.size()); this.additionalCustomResourceDefinitions = additionalCustomResourceDefinitions; + this.beforeStartHook = beforeStartHook; configurationServiceOverrider = configurationServiceOverrider != null ? configurationServiceOverrider.andThen( @@ -298,6 +301,10 @@ protected void before(ExtensionContext context) { }); crdMappings.clear(); + if (beforeStartHook != null) { + beforeStartHook.accept(this); + } + LOGGER.debug("Starting the operator locally"); this.operator.start(); } @@ -356,6 +363,7 @@ public static class Builder extends AbstractBuilder { private final List portForwards; private final List> additionalCustomResourceDefinitions; private final List additionalCRDs = new ArrayList<>(); + private Consumer beforeStartHook; private KubernetesClient kubernetesClient; protected Builder() { @@ -424,6 +432,15 @@ public Builder withAdditionalCRD(String... paths) { return this; } + /** + * Used to initialize resources when the namespace is generated but the operator is not started + * yet. + */ + public Builder withBeforeStartHook(Consumer beforeStartHook) { + this.beforeStartHook = beforeStartHook; + return this; + } + public LocallyRunOperatorExtension build() { return new LocallyRunOperatorExtension( reconcilers, @@ -438,7 +455,8 @@ public LocallyRunOperatorExtension build() { configurationServiceOverrider, namespaceNameSupplier, perClassNamespaceNameSupplier, - additionalCRDs); + additionalCRDs, + beforeStartHook); } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessCustomResource.java new file mode 100644 index 0000000000..b9701c94bd --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessCustomResource.java @@ -0,0 +1,13 @@ +package io.javaoperatorsdk.operator.baseapi.startsecondaryaccess; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("ssac") +public class StartupSecondaryAccessCustomResource extends CustomResource + implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java new file mode 100644 index 0000000000..61fc40803c --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessIT.java @@ -0,0 +1,53 @@ +package io.javaoperatorsdk.operator.baseapi.startsecondaryaccess; + +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static io.javaoperatorsdk.operator.baseapi.startsecondaryaccess.StartupSecondaryAccessReconciler.LABEL_KEY; +import static io.javaoperatorsdk.operator.baseapi.startsecondaryaccess.StartupSecondaryAccessReconciler.LABEL_VALUE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class StartupSecondaryAccessIT { + + public static final int SECONDARY_NUMBER = 200; + + @RegisterExtension + static LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(new StartupSecondaryAccessReconciler()) + .withBeforeStartHook( + ex -> { + var primary = new StartupSecondaryAccessCustomResource(); + primary.setMetadata(new ObjectMetaBuilder().withName("test1").build()); + primary = ex.serverSideApply(primary); + + for (int i = 0; i < SECONDARY_NUMBER; i++) { + ConfigMap cm = new ConfigMap(); + cm.setMetadata( + new ObjectMetaBuilder() + .withLabels(Map.of(LABEL_KEY, LABEL_VALUE)) + .withNamespace(ex.getNamespace()) + .withName("cm" + i) + .build()); + cm.addOwnerReference(primary); + ex.serverSideApply(cm); + } + }) + .build(); + + @Test + void reconcilerSeeAllSecondaryResources() { + var reconciler = extension.getReconcilerOfType(StartupSecondaryAccessReconciler.class); + + await().untilAsserted(() -> assertThat(reconciler.isReconciled()).isTrue()); + + assertThat(reconciler.isSecondaryAndCacheSameAmount()).isTrue(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessReconciler.java new file mode 100644 index 0000000000..a2c51fdafd --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/startsecondaryaccess/StartupSecondaryAccessReconciler.java @@ -0,0 +1,75 @@ +package io.javaoperatorsdk.operator.baseapi.startsecondaryaccess; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; + +import static io.javaoperatorsdk.operator.baseapi.startsecondaryaccess.StartupSecondaryAccessIT.SECONDARY_NUMBER; + +@ControllerConfiguration +public class StartupSecondaryAccessReconciler + implements Reconciler { + + private static final Logger log = LoggerFactory.getLogger(StartupSecondaryAccessReconciler.class); + + public static final String LABEL_KEY = "app"; + public static final String LABEL_VALUE = "secondary-test"; + + private InformerEventSource cmInformer; + + private boolean secondaryAndCacheSameAmount = true; + private boolean reconciled = false; + + @Override + public UpdateControl reconcile( + StartupSecondaryAccessCustomResource resource, + Context context) { + + var secondary = context.getSecondaryResources(ConfigMap.class); + var cached = cmInformer.list().toList(); + + log.info( + "Secondary number: {}, cached: {}, expected: {}", + secondary.size(), + cached.size(), + SECONDARY_NUMBER); + + if (secondary.size() != cached.size()) { + secondaryAndCacheSameAmount = false; + } + reconciled = true; + return UpdateControl.noUpdate(); + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + cmInformer = + new InformerEventSource<>( + InformerEventSourceConfiguration.from( + ConfigMap.class, StartupSecondaryAccessCustomResource.class) + .withLabelSelector(LABEL_KEY + "=" + LABEL_VALUE) + .build(), + context); + return List.of(cmInformer); + } + + public boolean isSecondaryAndCacheSameAmount() { + return secondaryAndCacheSameAmount; + } + + public boolean isReconciled() { + return reconciled; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java index ae5cd25895..ae487daca6 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefIT.java @@ -16,7 +16,7 @@ class DependentResourceCrossRefIT { public static final String TEST_RESOURCE_NAME = "test"; - public static final int EXECUTION_NUMBER = 50; + public static final int EXECUTION_NUMBER = 10; @RegisterExtension LocallyRunOperatorExtension operator = @@ -44,6 +44,22 @@ void dependentResourceCanReferenceEachOther() { assertThat(operator.get(Secret.class, TEST_RESOURCE_NAME + i)).isNotNull(); } }); + + for (int i = 0; i < EXECUTION_NUMBER; i++) { + operator.delete(testResource(i)); + } + await() + .timeout(Duration.ofSeconds(30)) + .untilAsserted( + () -> { + for (int i = 0; i < EXECUTION_NUMBER; i++) { + assertThat( + operator.get( + DependentResourceCrossRefResource.class, + testResource(i).getMetadata().getName())) + .isNull(); + } + }); } DependentResourceCrossRefResource testResource(int n) { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefReconciler.java index 5d54ecdabe..d0dbb74245 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentresourcecrossref/DependentResourceCrossRefReconciler.java @@ -5,6 +5,9 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.api.model.Secret; @@ -26,6 +29,8 @@ @ControllerConfiguration public class DependentResourceCrossRefReconciler implements Reconciler { + private static final Logger log = + LoggerFactory.getLogger(DependentResourceCrossRefReconciler.class); public static final String SECRET_NAME = "secret"; private final AtomicInteger numberOfExecutions = new AtomicInteger(0); @@ -48,6 +53,7 @@ public ErrorStatusUpdateControl updateErrorSt DependentResourceCrossRefResource resource, Context context, Exception e) { + log.error("Status update on error for resource: {}", resource.getMetadata().getName(), e); errorHappened = true; return ErrorStatusUpdateControl.noStatusUpdate(); }