> defaultNonSSAResource() {
return defaultNonSSAResources();
}
- /**
- * If a javaoperatorsdk.io/previous annotation should be used so that the operator sdk can detect
- * events from its own updates of dependent resources and then filter them.
- *
- * Disable this if you want to react to your own dependent resource updates
- *
- * @return if special annotation should be used for dependent resource to filter events
- * @since 4.5.0
- */
- default boolean previousAnnotationForDependentResourcesEventFiltering() {
- return true;
- }
-
- /**
- * For dependent resources, the framework can add an annotation to filter out events resulting
- * directly from the framework's operation. There are, however, some resources that do not follow
- * the Kubernetes API conventions that changes in metadata should not increase the generation of
- * the resource (as recorded in the {@code generation} field of the resource's {@code metadata}).
- * For these resources, this convention is not respected and results in a new event for the
- * framework to process. If that particular case is not handled correctly in the resource matcher,
- * the framework will consider that the resource doesn't match the desired state and therefore
- * triggers an update, which in turn, will re-add the annotation, thus starting the loop again,
- * infinitely.
- *
- *
As a workaround, we automatically skip adding previous annotation for those well-known
- * resources. Note that if you are sure that the matcher works for your use case, and it should in
- * most instances, you can remove the resource type from the blocklist.
- *
- *
The consequence of adding a resource type to the set is that the framework will not use
- * event filtering to prevent events, initiated by changes made by the framework itself as a
- * result of its processing of dependent resources, to trigger the associated reconciler again.
- *
- *
Note that this method only takes effect if annotating dependent resources to prevent
- * dependent resources events from triggering the associated reconciler again is activated as
- * controlled by {@link #previousAnnotationForDependentResourcesEventFiltering()}
- *
- * @return a Set of resource classes where the previous version annotation won't be used.
- */
- default Set> withPreviousAnnotationForDependentResourcesBlocklist() {
- return Set.of(Deployment.class, StatefulSet.class);
- }
-
- /**
- * If the event logic should parse the resourceVersion to determine the ordering of dependent
- * resource events. This is typically not needed.
- *
- * Disabled by default as Kubernetes does not support, and discourages, this interpretation of
- * resourceVersions. Enable only if your api server event processing seems to lag the operator
- * logic, and you want to further minimize the amount of work done / updates issued by the
- * operator.
- *
- * @return if resource version should be parsed (as integer)
- * @since 4.5.0
- */
- default boolean parseResourceVersionsForEventFilteringAndCaching() {
- return false;
- }
-
/**
* {@link io.javaoperatorsdk.operator.api.reconciler.UpdateControl} patch resource or status can
* either use simple patches or SSA. Setting this to {@code true}, controllers will use SSA for
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
index 3d29bb6589..cd9cdafb39 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
@@ -51,11 +51,8 @@ public class ConfigurationServiceOverrider {
private Duration reconciliationTerminationTimeout;
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
private Set> defaultNonSSAResource;
- private Boolean previousAnnotationForDependentResources;
- private Boolean parseResourceVersions;
private Boolean useSSAToPatchPrimaryResource;
private Boolean cloneSecondaryResourcesWhenGettingFromCache;
- private Set> previousAnnotationUsageBlocklist;
@SuppressWarnings("rawtypes")
private DependentResourceFactory dependentResourceFactory;
@@ -168,31 +165,6 @@ public ConfigurationServiceOverrider withDefaultNonSSAResource(
return this;
}
- public ConfigurationServiceOverrider withPreviousAnnotationForDependentResources(boolean value) {
- this.previousAnnotationForDependentResources = value;
- return this;
- }
-
- /**
- * @param value true if internal algorithms can use metadata.resourceVersion as a numeric value.
- * @return this
- */
- public ConfigurationServiceOverrider withParseResourceVersions(boolean value) {
- this.parseResourceVersions = value;
- return this;
- }
-
- /**
- * @deprecated use withParseResourceVersions
- * @param value true if internal algorithms can use metadata.resourceVersion as a numeric value.
- * @return this
- */
- @Deprecated(forRemoval = true)
- public ConfigurationServiceOverrider wihtParseResourceVersions(boolean value) {
- this.parseResourceVersions = value;
- return this;
- }
-
public ConfigurationServiceOverrider withUseSSAToPatchPrimaryResource(boolean value) {
this.useSSAToPatchPrimaryResource = value;
return this;
@@ -204,12 +176,6 @@ public ConfigurationServiceOverrider withCloneSecondaryResourcesWhenGettingFromC
return this;
}
- public ConfigurationServiceOverrider withPreviousAnnotationForDependentResourcesBlocklist(
- Set> blocklist) {
- this.previousAnnotationUsageBlocklist = blocklist;
- return this;
- }
-
public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, client) {
@Override
@@ -331,20 +297,6 @@ public Set> defaultNonSSAResources() {
defaultNonSSAResource, ConfigurationService::defaultNonSSAResources);
}
- @Override
- public boolean previousAnnotationForDependentResourcesEventFiltering() {
- return overriddenValueOrDefault(
- previousAnnotationForDependentResources,
- ConfigurationService::previousAnnotationForDependentResourcesEventFiltering);
- }
-
- @Override
- public boolean parseResourceVersionsForEventFilteringAndCaching() {
- return overriddenValueOrDefault(
- parseResourceVersions,
- ConfigurationService::parseResourceVersionsForEventFilteringAndCaching);
- }
-
@Override
public boolean useSSAToPatchPrimaryResource() {
return overriddenValueOrDefault(
@@ -357,14 +309,6 @@ public boolean cloneSecondaryResourcesWhenGettingFromCache() {
cloneSecondaryResourcesWhenGettingFromCache,
ConfigurationService::cloneSecondaryResourcesWhenGettingFromCache);
}
-
- @Override
- public Set>
- withPreviousAnnotationForDependentResourcesBlocklist() {
- return overriddenValueOrDefault(
- previousAnnotationUsageBlocklist,
- ConfigurationService::withPreviousAnnotationForDependentResourcesBlocklist);
- }
};
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java
index 9264db66bc..e6655641a2 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java
@@ -28,6 +28,7 @@
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
+import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_LONG_VALUE_SET;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_VALUE_SET;
@@ -131,4 +132,11 @@
/** Kubernetes field selector for additional resource filtering */
Field[] fieldSelector() default {};
+
+ /**
+ * true if we can consider resource versions as integers, therefore it is valid to compare them
+ *
+ * @since 5.3.0
+ */
+ boolean comparableResourceVersions() default DEFAULT_COMPARABLE_RESOURCE_VERSION;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java
index 24f78eb7be..30a1a32e8a 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java
@@ -53,6 +53,7 @@ public class InformerConfiguration {
private ItemStore itemStore;
private Long informerListLimit;
private FieldSelector fieldSelector;
+ private boolean comparableResourceVersions;
protected InformerConfiguration(
Class resourceClass,
@@ -66,7 +67,8 @@ protected InformerConfiguration(
GenericFilter super R> genericFilter,
ItemStore itemStore,
Long informerListLimit,
- FieldSelector fieldSelector) {
+ FieldSelector fieldSelector,
+ boolean comparableResourceVersions) {
this(resourceClass);
this.name = name;
this.namespaces = namespaces;
@@ -79,6 +81,7 @@ protected InformerConfiguration(
this.itemStore = itemStore;
this.informerListLimit = informerListLimit;
this.fieldSelector = fieldSelector;
+ this.comparableResourceVersions = comparableResourceVersions;
}
private InformerConfiguration(Class resourceClass) {
@@ -113,7 +116,8 @@ public static InformerConfiguration.Builder builder(
original.genericFilter,
original.itemStore,
original.informerListLimit,
- original.fieldSelector)
+ original.fieldSelector,
+ original.comparableResourceVersions)
.builder;
}
@@ -288,6 +292,10 @@ public FieldSelector getFieldSelector() {
return fieldSelector;
}
+ public boolean isComparableResourceVersions() {
+ return comparableResourceVersions;
+ }
+
@SuppressWarnings("UnusedReturnValue")
public class Builder {
@@ -359,6 +367,7 @@ public InformerConfiguration.Builder initFromAnnotation(
Arrays.stream(informerConfig.fieldSelector())
.map(f -> new FieldSelector.Field(f.path(), f.value(), f.negated()))
.toList()));
+ withComparableResourceVersions(informerConfig.comparableResourceVersions());
}
return this;
}
@@ -459,5 +468,10 @@ public Builder withFieldSelector(FieldSelector fieldSelector) {
InformerConfiguration.this.fieldSelector = fieldSelector;
return this;
}
+
+ public Builder withComparableResourceVersions(boolean comparableResourceVersions) {
+ InformerConfiguration.this.comparableResourceVersions = comparableResourceVersions;
+ return this;
+ }
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java
index bca605a41c..69903e805f 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java
@@ -33,6 +33,7 @@
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
+import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.SAME_AS_CONTROLLER_NAMESPACES_SET;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACE_SET;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE_SET;
@@ -96,18 +97,21 @@ class DefaultInformerEventSourceConfiguration
private final GroupVersionKind groupVersionKind;
private final InformerConfiguration informerConfig;
private final KubernetesClient kubernetesClient;
+ private final boolean comparableResourceVersion;
protected DefaultInformerEventSourceConfiguration(
GroupVersionKind groupVersionKind,
PrimaryToSecondaryMapper> primaryToSecondaryMapper,
SecondaryToPrimaryMapper secondaryToPrimaryMapper,
InformerConfiguration informerConfig,
- KubernetesClient kubernetesClient) {
+ KubernetesClient kubernetesClient,
+ boolean comparableResourceVersion) {
this.informerConfig = Objects.requireNonNull(informerConfig);
this.groupVersionKind = groupVersionKind;
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
this.kubernetesClient = kubernetesClient;
+ this.comparableResourceVersion = comparableResourceVersion;
}
@Override
@@ -135,6 +139,11 @@ public Optional getGroupVersionKind() {
public Optional getKubernetesClient() {
return Optional.ofNullable(kubernetesClient);
}
+
+ @Override
+ public boolean comparableResourceVersion() {
+ return this.comparableResourceVersion;
+ }
}
@SuppressWarnings({"unused", "UnusedReturnValue"})
@@ -148,6 +157,7 @@ class Builder {
private PrimaryToSecondaryMapper> primaryToSecondaryMapper;
private SecondaryToPrimaryMapper secondaryToPrimaryMapper;
private KubernetesClient kubernetesClient;
+ private boolean comparableResourceVersion = DEFAULT_COMPARABLE_RESOURCE_VERSION;
private Builder(Class resourceClass, Class extends HasMetadata> primaryResourceClass) {
this(resourceClass, primaryResourceClass, null);
@@ -285,6 +295,11 @@ public Builder withFieldSelector(FieldSelector fieldSelector) {
return this;
}
+ public Builder withComparableResourceVersion(boolean comparableResourceVersion) {
+ this.comparableResourceVersion = comparableResourceVersion;
+ return this;
+ }
+
public void updateFrom(InformerConfiguration informerConfig) {
if (informerConfig != null) {
final var informerConfigName = informerConfig.getName();
@@ -324,7 +339,10 @@ public InformerEventSourceConfiguration build() {
HasMetadata.getKind(primaryResourceClass),
false)),
config.build(),
- kubernetesClient);
+ kubernetesClient,
+ comparableResourceVersion);
}
}
+
+ boolean comparableResourceVersion();
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java
index 052b4d8c44..7330a407c1 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java
@@ -41,6 +41,7 @@ public final class Constants {
public static final String RESOURCE_GVK_KEY = "josdk.resource.gvk";
public static final String CONTROLLER_NAME = "controller.name";
public static final boolean DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES = true;
+ public static final boolean DEFAULT_COMPARABLE_RESOURCE_VERSION = true;
private Constants() {}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java
index 38cb95e8dc..11dfd21648 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java
@@ -451,6 +451,11 @@ public static P addFinalizerWithSSA(
}
}
+ public static int compareResourceVersions(HasMetadata h1, HasMetadata h2) {
+ return compareResourceVersions(
+ h1.getMetadata().getResourceVersion(), h2.getMetadata().getResourceVersion());
+ }
+
public static int compareResourceVersions(String v1, String v2) {
int v1Length = validateResourceVersion(v1);
int v2Length = validateResourceVersion(v2);
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java
index 05cddcade1..562a6257b5 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java
@@ -55,7 +55,6 @@ public abstract class KubernetesDependentResource kubernetesDependentResourceConfig;
private volatile Boolean useSSA;
- private volatile Boolean usePreviousAnnotationForEventFiltering;
public KubernetesDependentResource() {}
@@ -72,6 +71,27 @@ public void configureWith(KubernetesDependentResourceConfig config) {
this.kubernetesDependentResourceConfig = config;
}
+ @Override
+ protected R handleCreate(R desired, P primary, Context context) {
+ return eventSource()
+ .orElseThrow()
+ .updateAndCacheResource(
+ desired,
+ context,
+ toCreate -> KubernetesDependentResource.super.handleCreate(toCreate, primary, context));
+ }
+
+ @Override
+ protected R handleUpdate(R actual, R desired, P primary, Context
context) {
+ return eventSource()
+ .orElseThrow()
+ .updateAndCacheResource(
+ desired,
+ context,
+ toUpdate ->
+ KubernetesDependentResource.super.handleUpdate(actual, toUpdate, primary, context));
+ }
+
@SuppressWarnings("unused")
public R create(R desired, P primary, Context
context) {
if (useSSA(context)) {
@@ -158,14 +178,6 @@ protected void addMetadata(
} else {
annotations.remove(InformerEventSource.PREVIOUS_ANNOTATION_KEY);
}
- } else if (usePreviousAnnotation(context)) { // set a new one
- eventSource()
- .orElseThrow()
- .addPreviousAnnotation(
- Optional.ofNullable(actualResource)
- .map(r -> r.getMetadata().getResourceVersion())
- .orElse(null),
- target);
}
addReferenceHandlingMetadata(target, primary);
}
@@ -181,22 +193,6 @@ protected boolean useSSA(Context
context) {
return useSSA;
}
- private boolean usePreviousAnnotation(Context
context) {
- if (usePreviousAnnotationForEventFiltering == null) {
- usePreviousAnnotationForEventFiltering =
- context
- .getControllerConfiguration()
- .getConfigurationService()
- .previousAnnotationForDependentResourcesEventFiltering()
- && !context
- .getControllerConfiguration()
- .getConfigurationService()
- .withPreviousAnnotationForDependentResourcesBlocklist()
- .contains(this.resourceType());
- }
- return usePreviousAnnotationForEventFiltering;
- }
-
@Override
protected void handleDelete(P primary, R secondary, Context
context) {
if (secondary != null) {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CacheKeyMapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CacheKeyMapper.java
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java
index b7a6406e20..f7ed9fdc8e 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java
@@ -47,7 +47,11 @@ public class ControllerEventSource
@SuppressWarnings({"unchecked", "rawtypes"})
public ControllerEventSource(Controller controller) {
- super(NAME, controller.getCRClient(), controller.getConfiguration(), false);
+ super(
+ NAME,
+ controller.getCRClient(),
+ controller.getConfiguration(),
+ controller.getConfiguration().getInformerConfig().isComparableResourceVersions());
this.controller = controller;
final var config = controller.getConfiguration();
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
index ec11db25f4..c6a0c782e3 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
@@ -17,7 +17,7 @@
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
+import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -35,6 +35,8 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
+import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
+
/**
* Wraps informer(s) so they are connected to the eventing system of the framework. Note that since
* this is built on top of Fabric8 client Informers, it also supports caching resources using
@@ -78,28 +80,24 @@ public class InformerEventSource
// we need direct control for the indexer to propagate the just update resource also to the index
private final PrimaryToSecondaryIndex primaryToSecondaryIndex;
private final PrimaryToSecondaryMapper primaryToSecondaryMapper;
- private final String id = UUID.randomUUID().toString();
public InformerEventSource(
InformerEventSourceConfiguration configuration, EventSourceContext context) {
this(
configuration,
configuration.getKubernetesClient().orElse(context.getClient()),
- context
- .getControllerConfiguration()
- .getConfigurationService()
- .parseResourceVersionsForEventFilteringAndCaching());
+ configuration.comparableResourceVersion());
}
InformerEventSource(InformerEventSourceConfiguration configuration, KubernetesClient client) {
- this(configuration, client, false);
+ this(configuration, client, DEFAULT_COMPARABLE_RESOURCE_VERSION);
}
@SuppressWarnings({"unchecked", "rawtypes"})
private InformerEventSource(
InformerEventSourceConfiguration configuration,
KubernetesClient client,
- boolean parseResourceVersions) {
+ boolean comparableResourceVersions) {
super(
configuration.name(),
configuration
@@ -107,7 +105,7 @@ private InformerEventSource(
.map(gvk -> client.genericKubernetesResources(gvk.apiVersion(), gvk.getKind()))
.orElseGet(() -> (MixedOperation) client.resources(configuration.getResourceClass())),
configuration,
- parseResourceVersions);
+ comparableResourceVersions);
// If there is a primary to secondary mapper there is no need for primary to secondary index.
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
if (useSecondaryToPrimaryIndex()) {
@@ -125,6 +123,22 @@ private InformerEventSource(
genericFilter = informerConfig.getGenericFilter();
}
+ public R updateAndCacheResource(
+ R resourceToUpdate, Context> context, UnaryOperator updateMethod) {
+ ResourceID id = ResourceID.fromResource(resourceToUpdate);
+ if (log.isDebugEnabled()) {
+ log.debug("Update and cache: {}", id);
+ }
+ try {
+ temporaryResourceCache.startModifying(id);
+ var updated = updateMethod.apply(resourceToUpdate);
+ handleRecentResourceUpdate(id, updated, resourceToUpdate);
+ return updated;
+ } finally {
+ temporaryResourceCache.doneModifying(id);
+ }
+ }
+
@Override
public void onAdd(R newResource) {
if (log.isDebugEnabled()) {
@@ -134,9 +148,7 @@ public void onAdd(R newResource) {
resourceType().getSimpleName(),
newResource.getMetadata().getResourceVersion());
}
- primaryToSecondaryIndex.onAddOrUpdate(newResource);
- onAddOrUpdate(
- Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource));
+ onAddOrUpdate(Operation.ADD, newResource, null);
}
@Override
@@ -149,16 +161,11 @@ public void onUpdate(R oldObject, R newObject) {
newObject.getMetadata().getResourceVersion(),
oldObject.getMetadata().getResourceVersion());
}
- primaryToSecondaryIndex.onAddOrUpdate(newObject);
- onAddOrUpdate(
- Operation.UPDATE,
- newObject,
- oldObject,
- () -> InformerEventSource.super.onUpdate(oldObject, newObject));
+ onAddOrUpdate(Operation.UPDATE, newObject, oldObject);
}
@Override
- public void onDelete(R resource, boolean b) {
+ public synchronized void onDelete(R resource, boolean b) {
if (log.isDebugEnabled()) {
log.debug(
"On delete event received for resource id: {} type: {}",
@@ -180,68 +187,28 @@ public synchronized void start() {
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
}
- private synchronized void onAddOrUpdate(
- Operation operation, R newObject, R oldObject, Runnable superOnOp) {
+ private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) {
+ primaryToSecondaryIndex.onAddOrUpdate(newObject);
var resourceID = ResourceID.fromResource(newObject);
- if (canSkipEvent(newObject, oldObject, resourceID)) {
+ if (temporaryResourceCache.onAddOrUpdateEvent(newObject)) {
log.debug(
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
+ " ID: {}",
operation,
ResourceID.fromResource(newObject));
- superOnOp.run();
+ } else if (eventAcceptedByFilter(operation, newObject, oldObject)) {
+ log.debug(
+ "Propagating event for {}, resource with same version not result of a reconciliation."
+ + " Resource ID: {}",
+ operation,
+ resourceID);
+ propagateEvent(newObject);
} else {
- superOnOp.run();
- if (eventAcceptedByFilter(operation, newObject, oldObject)) {
- log.debug(
- "Propagating event for {}, resource with same version not result of a reconciliation."
- + " Resource ID: {}",
- operation,
- resourceID);
- propagateEvent(newObject);
- } else {
- log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
- }
+ log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID);
}
}
- private boolean canSkipEvent(R newObject, R oldObject, ResourceID resourceID) {
- var res = temporaryResourceCache.getResourceFromCache(resourceID);
- if (res.isEmpty()) {
- return isEventKnownFromAnnotation(newObject, oldObject);
- }
- boolean resVersionsEqual =
- newObject
- .getMetadata()
- .getResourceVersion()
- .equals(res.get().getMetadata().getResourceVersion());
- log.debug(
- "Resource found in temporal cache for id: {} resource versions equal: {}",
- resourceID,
- resVersionsEqual);
- return resVersionsEqual
- || temporaryResourceCache.isLaterResourceVersion(resourceID, res.get(), newObject);
- }
-
- private boolean isEventKnownFromAnnotation(R newObject, R oldObject) {
- String previous = newObject.getMetadata().getAnnotations().get(PREVIOUS_ANNOTATION_KEY);
- boolean known = false;
- if (previous != null) {
- String[] parts = previous.split(",");
- if (id.equals(parts[0])) {
- if (oldObject == null && parts.length == 1) {
- known = true;
- } else if (oldObject != null
- && parts.length == 2
- && oldObject.getMetadata().getResourceVersion().equals(parts[1])) {
- known = true;
- }
- }
- }
- return known;
- }
-
private void propagateEvent(R object) {
var primaryResourceIdSet =
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object);
@@ -289,23 +256,19 @@ public Set getSecondaryResources(P primary) {
}
@Override
- public synchronized void handleRecentResourceUpdate(
+ public void handleRecentResourceUpdate(
ResourceID resourceID, R resource, R previousVersionOfResource) {
handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource);
}
@Override
- public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) {
+ public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
handleRecentCreateOrUpdate(Operation.ADD, resource, null);
}
private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
primaryToSecondaryIndex.onAddOrUpdate(newResource);
- temporaryResourceCache.putResource(
- newResource,
- Optional.ofNullable(oldResource)
- .map(r -> r.getMetadata().getResourceVersion())
- .orElse(null));
+ temporaryResourceCache.putResource(newResource);
}
private boolean useSecondaryToPrimaryIndex() {
@@ -333,22 +296,6 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) {
&& (genericFilter == null || genericFilter.accept(resource));
}
- /**
- * Add an annotation to the resource so that the subsequent will be omitted
- *
- * @param resourceVersion null if there is no prior version
- * @param target mutable resource that will be returned
- */
- public R addPreviousAnnotation(String resourceVersion, R target) {
- target
- .getMetadata()
- .getAnnotations()
- .put(
- PREVIOUS_ANNOTATION_KEY,
- id + Optional.ofNullable(resourceVersion).map(rv -> "," + rv).orElse(""));
- return target;
- }
-
private enum Operation {
ADD,
UPDATE
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
index 2a6c7ef206..c3a4a9f2c1 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
@@ -156,6 +156,10 @@ public Optional get(ResourceID resourceID) {
return Optional.ofNullable(cache.getByKey(getKey(resourceID)));
}
+ public String getLastSyncResourceVersion() {
+ return this.informer.lastSyncResourceVersion();
+ }
+
private String getKey(ResourceID resourceID) {
return Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null), resourceID.getName());
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
index 2679918b60..af30617d92 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
@@ -34,6 +34,7 @@
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.Informable;
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
+import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
@@ -55,7 +56,7 @@ public abstract class ManagedInformerEventSource<
private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class);
private InformerManager cache;
- private final boolean parseResourceVersions;
+ private final boolean comparableResourceVersions;
private ControllerConfiguration controllerConfiguration;
private final C configuration;
private final Map>> indexers = new HashMap<>();
@@ -63,9 +64,9 @@ public abstract class ManagedInformerEventSource<
protected MixedOperation client;
protected ManagedInformerEventSource(
- String name, MixedOperation client, C configuration, boolean parseResourceVersions) {
+ String name, MixedOperation client, C configuration, boolean comparableResourceVersions) {
super(configuration.getResourceClass(), name);
- this.parseResourceVersions = parseResourceVersions;
+ this.comparableResourceVersions = comparableResourceVersions;
this.client = client;
this.configuration = configuration;
}
@@ -102,7 +103,7 @@ public synchronized void start() {
if (isRunning()) {
return;
}
- temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
+ temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions);
this.cache = new InformerManager<>(client, configuration, this);
cache.setControllerConfiguration(controllerConfiguration);
cache.addIndexers(indexers);
@@ -122,30 +123,34 @@ public synchronized void stop() {
@Override
public void handleRecentResourceUpdate(
ResourceID resourceID, R resource, R previousVersionOfResource) {
- temporaryResourceCache.putResource(
- resource, previousVersionOfResource.getMetadata().getResourceVersion());
+ temporaryResourceCache.putResource(resource);
}
@Override
public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
- temporaryResourceCache.putAddedResource(resource);
+ temporaryResourceCache.putResource(resource);
}
@Override
public Optional get(ResourceID resourceID) {
+ var res = cache.get(resourceID);
Optional resource = temporaryResourceCache.getResourceFromCache(resourceID);
- if (resource.isPresent()) {
- log.debug("Resource found in temporary cache for Resource ID: {}", resourceID);
+ if (comparableResourceVersions
+ && resource.isPresent()
+ && res.filter(
+ r ->
+ PrimaryUpdateAndCacheUtils.compareResourceVersions(r, resource.orElseThrow())
+ > 0)
+ .isEmpty()) {
+ log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID);
return resource;
- } else {
- log.debug(
- "Resource not found in temporary cache reading it from informer cache,"
- + " for Resource ID: {}",
- resourceID);
- var res = cache.get(resourceID);
- log.debug("Resource found in cache: {} for id: {}", res.isPresent(), resourceID);
- return res;
}
+ log.debug(
+ "Resource not found, or older, in temporary cache. Found in informer cache {}, for"
+ + " Resource ID: {}",
+ res.isPresent(),
+ resourceID);
+ return res;
}
@SuppressWarnings("unused")
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
index 06226ae4ba..d918be447d 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
@@ -15,16 +15,16 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.javaoperatorsdk.operator.api.config.ConfigurationService;
+import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -33,157 +33,152 @@
* a create or update is executed the subsequent getResource operation might not return the
* up-to-date resource from informer cache, since it is not received yet.
*
- * The idea of the solution is, that since an update (for create is simpler) was done
- * successfully, and optimistic locking is in place, there were no other operations between reading
- * the resource from the cache and the actual update. So when the new resource is stored in the
- * temporal cache only if the informer still has the previous resource version, from before the
- * update. If not, that means there were already updates on the cache (either by the actual update
- * from DependentResource or other) so the resource does not needs to be cached. Subsequently if
- * event received from the informer, it means that the cache of the informer was updated, so it
- * already contains a more fresh version of the resource.
+ *
Since an update (for create is simpler) was done successfully we can temporarily track that
+ * resource if its version is later than the events we've processed. We then know that we can skip
+ * all events that have the same resource version or earlier than the tracked resource. Once we
+ * process an event that has the same resource version or later, then we know the tracked resource
+ * can be removed.
+ *
+ *
In some cases it is possible for the informer to deliver events prior to the attempt to put
+ * the resource in the temporal cache. The startModifying/doneModifying methods are used to pause
+ * event delivery to ensure that temporal cache recognizes the put entry as an event that can be
+ * skipped.
+ *
+ *
If comparable resource versions are disabled, then this cache is effectively disabled.
*
* @param resource to cache.
*/
public class TemporaryResourceCache {
- static class ExpirationCache {
- private final LinkedHashMap cache;
- private final int ttlMs;
-
- public ExpirationCache(int maxEntries, int ttlMs) {
- this.ttlMs = ttlMs;
- this.cache =
- new LinkedHashMap<>() {
- @Override
- protected boolean removeEldestEntry(Map.Entry eldest) {
- return size() > maxEntries;
- }
- };
- }
+ private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
- public void add(K key) {
- clean();
- cache.putIfAbsent(key, System.currentTimeMillis());
- }
+ private final Map cache = new ConcurrentHashMap<>();
+ private final boolean comparableResourceVersions;
+ private final Map activelyModifying = new ConcurrentHashMap<>();
+ private String latestResourceVersion;
- public boolean contains(K key) {
- clean();
- return cache.get(key) != null;
- }
+ public TemporaryResourceCache(boolean comparableResourceVersions) {
+ this.comparableResourceVersions = comparableResourceVersions;
+ }
- void clean() {
- if (!cache.isEmpty()) {
- long currentTimeMillis = System.currentTimeMillis();
- var iter = cache.entrySet().iterator();
- // the order will already be from oldest to newest, clean a fixed number of entries to
- // amortize the cost amongst multiple calls
- for (int i = 0; i < 10 && iter.hasNext(); i++) {
- var entry = iter.next();
- if (currentTimeMillis - entry.getValue() > ttlMs) {
- iter.remove();
- }
- }
- }
+ public void startModifying(ResourceID id) {
+ if (!comparableResourceVersions) {
+ return;
}
+ activelyModifying
+ .compute(
+ id,
+ (ignored, lock) -> {
+ if (lock != null) {
+ throw new IllegalStateException(); // concurrent modifications to the same resource
+ // not allowed - this could be relaxed if needed
+ }
+ return new ReentrantLock();
+ })
+ .lock();
}
- private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
-
- private final Map cache = new ConcurrentHashMap<>();
-
- // keep up to the last million deletions for up to 10 minutes
- private final ExpirationCache tombstones = new ExpirationCache<>(1000000, 1200000);
- private final ManagedInformerEventSource managedInformerEventSource;
- private final boolean parseResourceVersions;
-
- public TemporaryResourceCache(
- ManagedInformerEventSource managedInformerEventSource,
- boolean parseResourceVersions) {
- this.managedInformerEventSource = managedInformerEventSource;
- this.parseResourceVersions = parseResourceVersions;
+ public void doneModifying(ResourceID id) {
+ if (!comparableResourceVersions) {
+ return;
+ }
+ activelyModifying.computeIfPresent(
+ id,
+ (ignored, lock) -> {
+ lock.unlock();
+ return null;
+ });
}
- public synchronized void onDeleteEvent(T resource, boolean unknownState) {
- tombstones.add(resource.getMetadata().getUid());
+ public void onDeleteEvent(T resource, boolean unknownState) {
onEvent(resource, unknownState);
}
- public synchronized void onAddOrUpdateEvent(T resource) {
- onEvent(resource, false);
+ /**
+ * @return true if the resourceVersion was already known
+ */
+ public boolean onAddOrUpdateEvent(T resource) {
+ return onEvent(resource, false);
}
- synchronized void onEvent(T resource, boolean unknownState) {
- cache.computeIfPresent(
- ResourceID.fromResource(resource),
- (id, cached) ->
- (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached);
+ private boolean onEvent(T resource, boolean unknownState) {
+ ReentrantLock lock = activelyModifying.get(ResourceID.fromResource(resource));
+ if (lock != null) {
+ lock.lock(); // wait for the modification to finish
+ lock.unlock(); // simply unlock as the event is guaranteed after the modification
+ }
+ boolean[] known = new boolean[1];
+ synchronized (this) {
+ if (!unknownState) {
+ latestResourceVersion = resource.getMetadata().getResourceVersion();
+ }
+ cache.computeIfPresent(
+ ResourceID.fromResource(resource),
+ (id, cached) -> {
+ boolean remove = unknownState;
+ if (!unknownState) {
+ int comp = PrimaryUpdateAndCacheUtils.compareResourceVersions(resource, cached);
+ if (comp >= 0) {
+ remove = true;
+ }
+ if (comp <= 0) {
+ known[0] = true;
+ }
+ }
+ if (remove) {
+ return null;
+ }
+ return cached;
+ });
+ return known[0];
+ }
}
- public synchronized void putAddedResource(T newResource) {
- putResource(newResource, null);
- }
+ /** put the item into the cache if it's for a later state than what has already been observed. */
+ public synchronized void putResource(T newResource) {
+ if (!comparableResourceVersions) {
+ return;
+ }
- /**
- * put the item into the cache if the previousResourceVersion matches the current state. If not
- * the currently cached item is removed.
- *
- * @param previousResourceVersion null indicates an add
- */
- public synchronized void putResource(T newResource, String previousResourceVersion) {
var resourceId = ResourceID.fromResource(newResource);
- var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);
-
- boolean moveAhead = false;
- if (previousResourceVersion == null && cachedResource == null) {
- if (tombstones.contains(newResource.getMetadata().getUid())) {
- log.debug(
- "Won't resurrect uid {} for resource id: {}",
- newResource.getMetadata().getUid(),
- resourceId);
- return;
- }
- // we can skip further checks as this is a simple add and there's no previous entry to
- // consider
- moveAhead = true;
+
+ if (newResource.getMetadata().getResourceVersion() == null) {
+ log.warn(
+ "Resource {}: with no resourceVersion put in temporary cache. This is not the expected"
+ + " usage pattern, only resources returned from the api server should be put in the"
+ + " cache.",
+ resourceId);
+ return;
}
- if (moveAhead
- || (cachedResource != null
- && (cachedResource
- .getMetadata()
- .getResourceVersion()
- .equals(previousResourceVersion))
- || isLaterResourceVersion(resourceId, newResource, cachedResource))) {
+ // check against the latestResourceVersion processed by the TemporaryResourceCache
+ // If the resource is older, then we can safely ignore.
+ //
+ // this also prevents resurrecting recently deleted entities for which the delete event
+ // has already been processed
+ if (latestResourceVersion != null
+ && PrimaryUpdateAndCacheUtils.compareResourceVersions(
+ latestResourceVersion, newResource.getMetadata().getResourceVersion())
+ > 0) {
log.debug(
- "Temporarily moving ahead to target version {} for resource id: {}",
+ "Resource {}: resourceVersion {} is not later than latest {}",
+ resourceId,
newResource.getMetadata().getResourceVersion(),
- resourceId);
- cache.put(resourceId, newResource);
- } else if (cache.remove(resourceId) != null) {
- log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
+ latestResourceVersion);
+ return;
}
- }
- /**
- * @return true if {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()}
- * is enabled and the resourceVersion of newResource is numerically greater than
- * cachedResource, otherwise false
- */
- public boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
- try {
- if (parseResourceVersions
- && Long.parseLong(newResource.getMetadata().getResourceVersion())
- > Long.parseLong(cachedResource.getMetadata().getResourceVersion())) {
- return true;
- }
- } catch (NumberFormatException e) {
+ // also make sure that we're later than the existing temporary entry
+ var cachedResource = getResourceFromCache(resourceId).orElse(null);
+
+ if (cachedResource == null
+ || PrimaryUpdateAndCacheUtils.compareResourceVersions(newResource, cachedResource) > 0) {
log.debug(
- "Could not compare resourceVersions {} and {} for {}",
+ "Temporarily moving ahead to target version {} for resource id: {}",
newResource.getMetadata().getResourceVersion(),
- cachedResource.getMetadata().getResourceVersion(),
resourceId);
+ cache.put(resourceId, newResource);
}
- return false;
}
public synchronized Optional getResourceFromCache(ResourceID resourceID) {
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java
index 047929273b..c878a4fc06 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java
@@ -213,7 +213,7 @@ public void compareResourceVersionsTest() {
// naive performance test that compares the work case scenario for the parsing and non-parsing
// variants
@Test
- @Disabled("test sometimes fails, we plan to iterate over it and related features for 5.3")
+ @Disabled
public void compareResourcePerformanceTest() {
var execNum = 30000000;
var startTime = System.currentTimeMillis();
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
index 208d6aeaaa..f54e47304b 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
@@ -94,31 +94,18 @@ public synchronized void start() {}
}
@Test
- void skipsEventPropagationIfResourceWithSameVersionInResourceCache() {
+ void skipsEventPropagation() {
when(temporaryResourceCacheMock.getResourceFromCache(any()))
.thenReturn(Optional.of(testDeployment()));
+ when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(true);
+
informerEventSource.onAdd(testDeployment());
informerEventSource.onUpdate(testDeployment(), testDeployment());
verify(eventHandlerMock, never()).handleEvent(any());
}
- @Test
- void skipsAddEventPropagationViaAnnotation() {
- informerEventSource.onAdd(informerEventSource.addPreviousAnnotation(null, testDeployment()));
-
- verify(eventHandlerMock, never()).handleEvent(any());
- }
-
- @Test
- void skipsUpdateEventPropagationViaAnnotation() {
- informerEventSource.onUpdate(
- testDeployment(), informerEventSource.addPreviousAnnotation("1", testDeployment()));
-
- verify(eventHandlerMock, never()).handleEvent(any());
- }
-
@Test
void processEventPropagationWithoutAnnotation() {
informerEventSource.onUpdate(testDeployment(), testDeployment());
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java
index e3dc2c82e4..4b12148015 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java
@@ -16,10 +16,10 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;
import java.util.Map;
-import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -27,49 +27,40 @@
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
-import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.ExpirationCache;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
class TemporaryPrimaryResourceCacheTest {
public static final String RESOURCE_VERSION = "2";
- @SuppressWarnings("unchecked")
- private InformerEventSource informerEventSource;
-
private TemporaryResourceCache temporaryResourceCache;
@BeforeEach
void setup() {
- informerEventSource = mock(InformerEventSource.class);
- temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, false);
+ temporaryResourceCache = new TemporaryResourceCache<>(true);
}
@Test
void updateAddsTheResourceIntoCacheIfTheInformerHasThePreviousResourceVersion() {
var testResource = testResource();
var prevTestResource = testResource();
- prevTestResource.getMetadata().setResourceVersion("0");
- when(informerEventSource.get(any())).thenReturn(Optional.of(prevTestResource));
+ prevTestResource.getMetadata().setResourceVersion("1");
- temporaryResourceCache.putResource(testResource, "0");
+ temporaryResourceCache.putResource(testResource);
var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource));
assertThat(cached).isPresent();
}
@Test
- void updateNotAddsTheResourceIntoCacheIfTheInformerHasOtherVersion() {
+ void updateNotAddsTheResourceIntoCacheIfLaterVersionKnown() {
var testResource = testResource();
- var informerCachedResource = testResource();
- informerCachedResource.getMetadata().setResourceVersion("x");
- when(informerEventSource.get(any())).thenReturn(Optional.of(informerCachedResource));
- temporaryResourceCache.putResource(testResource, "0");
+ temporaryResourceCache.onAddOrUpdateEvent(
+ testResource.toBuilder().editMetadata().withResourceVersion("3").endMetadata().build());
+
+ temporaryResourceCache.putResource(testResource);
var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource));
assertThat(cached).isNotPresent();
@@ -78,9 +69,8 @@ void updateNotAddsTheResourceIntoCacheIfTheInformerHasOtherVersion() {
@Test
void addOperationAddsTheResourceIfInformerCacheStillEmpty() {
var testResource = testResource();
- when(informerEventSource.get(any())).thenReturn(Optional.empty());
- temporaryResourceCache.putAddedResource(testResource);
+ temporaryResourceCache.putResource(testResource);
var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource));
assertThat(cached).isPresent();
@@ -89,46 +79,79 @@ void addOperationAddsTheResourceIfInformerCacheStillEmpty() {
@Test
void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() {
var testResource = testResource();
- when(informerEventSource.get(any())).thenReturn(Optional.of(testResource()));
- temporaryResourceCache.putAddedResource(testResource);
+ temporaryResourceCache.putResource(testResource);
+
+ temporaryResourceCache.putResource(
+ new ConfigMapBuilder(testResource)
+ .editMetadata()
+ .withResourceVersion("1")
+ .endMetadata()
+ .build());
var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource));
- assertThat(cached).isNotPresent();
+ assertThat(cached.orElseThrow().getMetadata().getResourceVersion()).isEqualTo(RESOURCE_VERSION);
}
@Test
void removesResourceFromCache() {
ConfigMap testResource = propagateTestResourceToCache();
- temporaryResourceCache.onAddOrUpdateEvent(testResource());
+ temporaryResourceCache.onAddOrUpdateEvent(
+ new ConfigMapBuilder(testResource)
+ .editMetadata()
+ .withResourceVersion("3")
+ .endMetadata()
+ .build());
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isNotPresent();
}
@Test
- void resourceVersionParsing() {
- this.temporaryResourceCache = new TemporaryResourceCache<>(informerEventSource, true);
+ void nonComparableResourceVersionsDisables() {
+ this.temporaryResourceCache = new TemporaryResourceCache<>(false);
- ConfigMap testResource = propagateTestResourceToCache();
+ this.temporaryResourceCache.putResource(testResource());
- // an event with a newer version will not remove
- temporaryResourceCache.onAddOrUpdateEvent(
- new ConfigMapBuilder(testResource)
- .editMetadata()
- .withResourceVersion("1")
- .endMetadata()
- .build());
+ assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource())))
+ .isEmpty();
+ }
- assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
- .isPresent();
+ @Test
+ void lockedEventBeforePut() throws Exception {
+ var testResource = testResource();
- // anything else will remove
- temporaryResourceCache.onAddOrUpdateEvent(testResource());
+ temporaryResourceCache.startModifying(ResourceID.fromResource(testResource));
- assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
- .isNotPresent();
+ ExecutorService ex = Executors.newSingleThreadExecutor();
+ try {
+ var result = ex.submit(() -> temporaryResourceCache.onAddOrUpdateEvent(testResource));
+
+ temporaryResourceCache.putResource(testResource);
+ assertThat(result.isDone()).isFalse();
+ temporaryResourceCache.doneModifying(ResourceID.fromResource(testResource));
+ assertThat(result.get(10, TimeUnit.SECONDS)).isTrue();
+ } finally {
+ ex.shutdownNow();
+ }
+ }
+
+ @Test
+ void putBeforeEvent() {
+ var testResource = testResource();
+
+ // first ensure an event is not known
+ var result = temporaryResourceCache.onAddOrUpdateEvent(testResource);
+ assertThat(result).isFalse();
+
+ var nextResource = testResource();
+ nextResource.getMetadata().setResourceVersion("3");
+ temporaryResourceCache.putResource(nextResource);
+
+ // now expect an event with the matching resourceVersion to be known after the put
+ result = temporaryResourceCache.onAddOrUpdateEvent(nextResource);
+ assertThat(result).isTrue();
}
@Test
@@ -143,45 +166,15 @@ void rapidDeletion() {
.endMetadata()
.build(),
false);
- temporaryResourceCache.putAddedResource(testResource);
+ temporaryResourceCache.putResource(testResource);
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isEmpty();
}
- @Test
- void expirationCacheMax() {
- ExpirationCache cache = new ExpirationCache<>(2, Integer.MAX_VALUE);
-
- cache.add(1);
- cache.add(2);
- cache.add(3);
-
- assertThat(cache.contains(1)).isFalse();
- assertThat(cache.contains(2)).isTrue();
- assertThat(cache.contains(3)).isTrue();
- }
-
- @Test
- void expirationCacheTtl() {
- ExpirationCache cache = new ExpirationCache<>(2, 1);
-
- cache.add(1);
- cache.add(2);
-
- Awaitility.await()
- .atMost(1, TimeUnit.SECONDS)
- .untilAsserted(
- () -> {
- assertThat(cache.contains(1)).isFalse();
- assertThat(cache.contains(2)).isFalse();
- });
- }
-
private ConfigMap propagateTestResourceToCache() {
var testResource = testResource();
- when(informerEventSource.get(any())).thenReturn(Optional.empty());
- temporaryResourceCache.putAddedResource(testResource);
+ temporaryResourceCache.putResource(testResource);
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isPresent();
return testResource;
diff --git a/operator-framework-junit5/pom.xml b/operator-framework-junit5/pom.xml
index 4992f9d6bc..60c235a9ec 100644
--- a/operator-framework-junit5/pom.xml
+++ b/operator-framework-junit5/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.2.0-SNAPSHOT
+ 5.3.0-SNAPSHOT
operator-framework-junit-5
diff --git a/operator-framework/pom.xml b/operator-framework/pom.xml
index f1541a729d..6114fff9c9 100644
--- a/operator-framework/pom.xml
+++ b/operator-framework/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.2.0-SNAPSHOT
+ 5.3.0-SNAPSHOT
operator-framework
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/PreviousAnnotationDisabledIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/ComparableResourceVersionsDisabledIT.java
similarity index 91%
rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/PreviousAnnotationDisabledIT.java
rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/ComparableResourceVersionsDisabledIT.java
index f80c75b84b..77aff47a12 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/PreviousAnnotationDisabledIT.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/ComparableResourceVersionsDisabledIT.java
@@ -20,14 +20,12 @@
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
-class PreviousAnnotationDisabledIT {
+class ComparableResourceVersionsDisabledIT {
@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder()
- .withReconciler(new CreateUpdateEventFilterTestReconciler())
- .withConfigurationService(
- overrider -> overrider.withPreviousAnnotationForDependentResources(false))
+ .withReconciler(new CreateUpdateEventFilterTestReconciler(false))
.build();
@Test
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java
index 40bf2cc350..4344356ff9 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java
@@ -41,6 +41,16 @@ public class CreateUpdateEventFilterTestReconciler
private final DirectConfigMapDependentResource configMapDR =
new DirectConfigMapDependentResource(ConfigMap.class);
+ private final boolean comparableResourceVersion;
+
+ public CreateUpdateEventFilterTestReconciler(boolean comparableResourceVersion) {
+ this.comparableResourceVersion = comparableResourceVersion;
+ }
+
+ public CreateUpdateEventFilterTestReconciler() {
+ this(true);
+ }
+
@Override
public UpdateControl reconcile(
CreateUpdateEventFilterTestCustomResource resource,
@@ -89,6 +99,7 @@ public List> prepareEv
InformerEventSourceConfiguration.from(
ConfigMap.class, CreateUpdateEventFilterTestCustomResource.class)
.withLabelSelector("integrationtest = " + this.getClass().getSimpleName())
+ .withComparableResourceVersion(comparableResourceVersion)
.build();
final var informerEventSource = new InformerEventSource<>(informerConfiguration, context);
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java
index de485cfc4e..89d1dee94b 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/externalstate/ExternalStateReconciler.java
@@ -104,13 +104,15 @@ private void createExternalResource(
.withData(Map.of(ID_KEY, createdResource.getId()))
.build();
configMap.addOwnerReference(resource);
- context.getClient().configMaps().resource(configMap).create();
var primaryID = ResourceID.fromResource(resource);
// Making sure that the created resources are in the cache for the next reconciliation.
// This is critical in this case, since on next reconciliation if it would not be in the cache
// it would be created again.
- configMapEventSource.handleRecentResourceCreate(primaryID, configMap);
+ configMapEventSource.updateAndCacheResource(
+ configMap,
+ context,
+ toCreate -> context.getClient().configMaps().resource(toCreate).create());
externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource);
}
@@ -128,6 +130,7 @@ public DeleteControl cleanup(
return DeleteControl.defaultDelete();
}
+ @Override
public int getNumberOfExecutions() {
return numberOfExecutions.get();
}
diff --git a/pom.xml b/pom.xml
index 80f637cab8..ee12648ff6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.2.0-SNAPSHOT
+ 5.3.0-SNAPSHOT
pom
Operator SDK for Java
Java SDK for implementing Kubernetes operators
diff --git a/sample-operators/controller-namespace-deletion/pom.xml b/sample-operators/controller-namespace-deletion/pom.xml
index d3712e99b8..d68cffb6ec 100644
--- a/sample-operators/controller-namespace-deletion/pom.xml
+++ b/sample-operators/controller-namespace-deletion/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.2.0-SNAPSHOT
+ 5.3.0-SNAPSHOT
sample-controller-namespace-deletion
diff --git a/sample-operators/leader-election/pom.xml b/sample-operators/leader-election/pom.xml
index 39c8e1fce3..72f40f835d 100644
--- a/sample-operators/leader-election/pom.xml
+++ b/sample-operators/leader-election/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.2.0-SNAPSHOT
+ 5.3.0-SNAPSHOT
sample-leader-election
diff --git a/sample-operators/mysql-schema/pom.xml b/sample-operators/mysql-schema/pom.xml
index 93ee73e59c..0b96b2f1ca 100644
--- a/sample-operators/mysql-schema/pom.xml
+++ b/sample-operators/mysql-schema/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.2.0-SNAPSHOT
+ 5.3.0-SNAPSHOT
sample-mysql-schema-operator
diff --git a/sample-operators/pom.xml b/sample-operators/pom.xml
index 4ce07ce912..6079d3bb71 100644
--- a/sample-operators/pom.xml
+++ b/sample-operators/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
java-operator-sdk
- 5.2.0-SNAPSHOT
+ 5.3.0-SNAPSHOT
sample-operators
diff --git a/sample-operators/tomcat-operator/pom.xml b/sample-operators/tomcat-operator/pom.xml
index 537acad72a..ce6151d74d 100644
--- a/sample-operators/tomcat-operator/pom.xml
+++ b/sample-operators/tomcat-operator/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.2.0-SNAPSHOT
+ 5.3.0-SNAPSHOT
sample-tomcat-operator
diff --git a/sample-operators/webpage/pom.xml b/sample-operators/webpage/pom.xml
index 30bd8a890f..7b7681b97f 100644
--- a/sample-operators/webpage/pom.xml
+++ b/sample-operators/webpage/pom.xml
@@ -22,7 +22,7 @@
io.javaoperatorsdk
sample-operators
- 5.2.0-SNAPSHOT
+ 5.3.0-SNAPSHOT
sample-webpage-operator