Skip to content

Commit 5e6a62a

Browse files
committed
improve: thread-safe idempotent dynamic informer event source start (#2174)
Signed-off-by: Attila Mészáros <[email protected]>
1 parent cd0fe64 commit 5e6a62a

File tree

7 files changed

+188
-8
lines changed

7 files changed

+188
-8
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,13 +229,18 @@ public <R> List<ResourceEventSource<R, P>> getResourceEventSourcesFor(Class<R> d
229229
}
230230

231231
@Override
232-
public synchronized EventSource dynamicallyRegisterEventSource(String name,
232+
public EventSource dynamicallyRegisterEventSource(String name,
233233
EventSource eventSource) {
234-
var es = eventSources.existing(name, eventSource);
235-
if (es != null) {
236-
return es;
234+
synchronized (this) {
235+
var actual = eventSources.existing(name, eventSource);
236+
if (actual != null) {
237+
eventSource = actual;
238+
} else {
239+
registerEventSource(name, eventSource);
240+
}
237241
}
238-
registerEventSource(name, eventSource);
242+
// The start itself is blocking thus blocking only the threads which are attempt to start the
243+
// actual event source. Think of this as a form of lock striping.
239244
eventSource.start();
240245
return eventSource;
241246
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ default <R> ResourceEventSource<R, P> getResourceEventSourceFor(Class<R> depende
4545
*/
4646
EventSource dynamicallyRegisterEventSource(String name, EventSource eventSource);
4747

48+
4849
/**
4950
* De-registers (and stops) the {@link EventSource} associated with the specified name. If no such
5051
* source exists, this method will do nothing.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public ControllerResourceEventSource(Controller<T> controller) {
5555
}
5656

5757
@Override
58-
public void start() {
58+
public synchronized void start() {
5959
try {
6060
super.start();
6161
} catch (KubernetesClientException e) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ public void changeNamespaces(Set<String> namespaces) {
7676
}
7777

7878
@Override
79-
public void start() {
79+
public synchronized void start() {
80+
if (isRunning()) {
81+
return;
82+
}
8083
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
8184
this.cache = new InformerManager<>(client, configuration, this);
8285
cache.setConfigurationService(configurationService);
@@ -86,7 +89,10 @@ public void start() {
8689
}
8790

8891
@Override
89-
public void stop() {
92+
public synchronized void stop() {
93+
if (!isRunning()) {
94+
return;
95+
}
9096
super.stop();
9197
manager().stop();
9298
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.time.Duration;
4+
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.extension.RegisterExtension;
7+
8+
import io.fabric8.kubernetes.api.model.ConfigMap;
9+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
10+
import io.fabric8.kubernetes.api.model.Secret;
11+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
12+
import io.javaoperatorsdk.operator.sample.dynamicgenericeventsourceregistration.DynamicGenericEventSourceRegistrationCustomResource;
13+
import io.javaoperatorsdk.operator.sample.dynamicgenericeventsourceregistration.DynamicGenericEventSourceRegistrationReconciler;
14+
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
import static org.awaitility.Awaitility.await;
17+
18+
class DynamicGenericEventSourceRegistrationIT {
19+
20+
public static final String TEST_RESOURCE_NAME = "test1";
21+
@RegisterExtension
22+
LocallyRunOperatorExtension extension =
23+
LocallyRunOperatorExtension.builder()
24+
.withReconciler(DynamicGenericEventSourceRegistrationReconciler.class)
25+
.build();
26+
27+
@Test
28+
void registersEventSourcesDynamically() {
29+
var reconciler =
30+
extension.getReconcilerOfType(DynamicGenericEventSourceRegistrationReconciler.class);
31+
extension.create(testResource());
32+
33+
await().pollDelay(Duration.ofMillis(150)).untilAsserted(() -> {
34+
var cm = extension.get(ConfigMap.class, TEST_RESOURCE_NAME);
35+
var secret = extension.get(Secret.class, TEST_RESOURCE_NAME);
36+
assertThat(cm).isNotNull();
37+
assertThat(secret).isNotNull();
38+
});
39+
var executions = reconciler.getNumberOfExecutions();
40+
assertThat(reconciler.getNumberOfEventSources()).isEqualTo(2);
41+
assertThat(executions).isLessThanOrEqualTo(3);
42+
43+
var cm = extension.get(ConfigMap.class, TEST_RESOURCE_NAME);
44+
cm.getData().put("key2", "val2");
45+
46+
extension.replace(cm); // triggers the reconciliation
47+
48+
await().untilAsserted(() -> {
49+
assertThat(reconciler.getNumberOfExecutions() - executions).isEqualTo(2);
50+
});
51+
assertThat(reconciler.getNumberOfEventSources()).isEqualTo(2);
52+
}
53+
54+
55+
DynamicGenericEventSourceRegistrationCustomResource testResource() {
56+
var res = new DynamicGenericEventSourceRegistrationCustomResource();
57+
res.setMetadata(new ObjectMetaBuilder()
58+
.withName(TEST_RESOURCE_NAME)
59+
.build());
60+
return res;
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.javaoperatorsdk.operator.sample.dynamicgenericeventsourceregistration;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.ShortNames;
7+
import io.fabric8.kubernetes.model.annotation.Version;
8+
9+
@Group("sample.javaoperatorsdk")
10+
@Version("v1")
11+
@ShortNames("dger")
12+
public class DynamicGenericEventSourceRegistrationCustomResource
13+
extends CustomResource<Void, Void>
14+
implements Namespaced {
15+
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package io.javaoperatorsdk.operator.sample.dynamicgenericeventsourceregistration;
2+
3+
import java.util.Base64;
4+
import java.util.Map;
5+
import java.util.concurrent.atomic.AtomicInteger;
6+
7+
import io.fabric8.kubernetes.api.model.*;
8+
import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
9+
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
10+
import io.javaoperatorsdk.operator.api.reconciler.Context;
11+
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
12+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
13+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
14+
import io.javaoperatorsdk.operator.processing.GroupVersionKind;
15+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
16+
17+
@ControllerConfiguration
18+
public class DynamicGenericEventSourceRegistrationReconciler
19+
implements Reconciler<DynamicGenericEventSourceRegistrationCustomResource> {
20+
21+
private final AtomicInteger numberOfExecutions = new AtomicInteger(0);
22+
private final AtomicInteger numberOfEventSources = new AtomicInteger();
23+
24+
@Override
25+
public UpdateControl<DynamicGenericEventSourceRegistrationCustomResource> reconcile(
26+
DynamicGenericEventSourceRegistrationCustomResource primary,
27+
Context<DynamicGenericEventSourceRegistrationCustomResource> context) {
28+
29+
numberOfExecutions.addAndGet(1);
30+
31+
context.eventSourceRetriever().dynamicallyRegisterEventSource(ConfigMap.class.getSimpleName(),
32+
genericInformerFor(ConfigMap.class, context));
33+
context.eventSourceRetriever().dynamicallyRegisterEventSource(Secret.class.getSimpleName(),
34+
genericInformerFor(Secret.class, context));
35+
36+
context.getClient().resource(secret(primary)).createOr(NonDeletingOperation::update);
37+
context.getClient().resource(configMap(primary)).createOr(NonDeletingOperation::update);
38+
39+
numberOfEventSources.set(context.eventSourceRetriever()
40+
.getResourceEventSourcesFor(GenericKubernetesResource.class).size());
41+
42+
return UpdateControl.noUpdate();
43+
}
44+
45+
private Secret secret(DynamicGenericEventSourceRegistrationCustomResource primary) {
46+
var secret = new SecretBuilder()
47+
.withMetadata(new ObjectMetaBuilder()
48+
.withName(primary.getMetadata().getName())
49+
.withNamespace(primary.getMetadata().getNamespace())
50+
.build())
51+
.withData(Map.of("key", Base64.getEncoder().encodeToString("val".getBytes())))
52+
.build();
53+
secret.addOwnerReference(primary);
54+
return secret;
55+
}
56+
57+
private ConfigMap configMap(DynamicGenericEventSourceRegistrationCustomResource primary) {
58+
var cm = new ConfigMapBuilder()
59+
.withMetadata(new ObjectMetaBuilder()
60+
.withName(primary.getMetadata().getName())
61+
.withNamespace(primary.getMetadata().getNamespace())
62+
.build())
63+
.withData(Map.of("key", "val"))
64+
.build();
65+
cm.addOwnerReference(primary);
66+
return cm;
67+
}
68+
69+
private GroupVersionKind gvkFor(Class<? extends HasMetadata> clazz) {
70+
return new GroupVersionKind(HasMetadata.getApiVersion(clazz), HasMetadata.getKind(clazz));
71+
}
72+
73+
private InformerEventSource<GenericKubernetesResource, DynamicGenericEventSourceRegistrationCustomResource> genericInformerFor(
74+
Class<? extends HasMetadata> clazz,
75+
Context<DynamicGenericEventSourceRegistrationCustomResource> context) {
76+
77+
return new InformerEventSource<>(
78+
InformerConfiguration.from(gvkFor(clazz),
79+
context.eventSourceRetriever().eventSourceContextForDynamicRegistration()).build(),
80+
context.eventSourceRetriever().eventSourceContextForDynamicRegistration());
81+
}
82+
83+
public int getNumberOfExecutions() {
84+
return numberOfExecutions.get();
85+
}
86+
87+
public int getNumberOfEventSources() {
88+
return numberOfEventSources.get();
89+
}
90+
}

0 commit comments

Comments
 (0)