Skip to content

Commit 299c0b0

Browse files
committed
feat: add EventMonitor to decouple metrics from DefaultEventHandler
A global EventMonitor is set when the Operator is created based on the Metrics instance provided by the ConfigurationService.
1 parent edfc4f8 commit 299c0b0

File tree

7 files changed

+68
-66
lines changed

7 files changed

+68
-66
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
1616
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
1717
import io.javaoperatorsdk.operator.processing.ConfiguredController;
18+
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
19+
import io.javaoperatorsdk.operator.processing.DefaultEventHandler.EventMonitor;
20+
import io.javaoperatorsdk.operator.processing.event.Event;
1821

1922
@SuppressWarnings("rawtypes")
2023
public class Operator implements AutoCloseable {
@@ -24,27 +27,31 @@ public class Operator implements AutoCloseable {
2427
private final Object lock;
2528
private final List<ConfiguredController> controllers;
2629
private volatile boolean started;
27-
private final Metrics metrics;
2830

29-
public Operator(
30-
KubernetesClient k8sClient, ConfigurationService configurationService, Metrics metrics) {
31+
public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) {
3132
this.k8sClient = k8sClient;
3233
this.configurationService = configurationService;
3334
this.lock = new Object();
3435
this.controllers = new ArrayList<>();
3536
this.started = false;
36-
this.metrics = metrics;
37+
DefaultEventHandler.setEventMonitor(new EventMonitor() {
38+
@Override
39+
public void processedEvent(String uid, Event event) {
40+
configurationService.getMetrics().timeControllerEvents();
41+
}
42+
43+
@Override
44+
public void failedEvent(String uid, Event event) {
45+
configurationService.getMetrics().timeControllerRetry();
46+
}
47+
});
3748
}
3849

3950
/** Adds a shutdown hook that automatically calls {@link #close()} when the app shuts down. */
4051
public void installShutdownHook() {
4152
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
4253
}
4354

44-
public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) {
45-
this(k8sClient, configurationService, Metrics.NOOP);
46-
}
47-
4855
public KubernetesClient getKubernetesClient() {
4956
return k8sClient;
5057
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.fabric8.kubernetes.client.CustomResource;
2121
import io.javaoperatorsdk.operator.api.RetryInfo;
2222
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
23-
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
2423
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
2524
import io.javaoperatorsdk.operator.processing.event.Event;
2625
import io.javaoperatorsdk.operator.processing.event.EventHandler;
@@ -35,6 +34,13 @@
3534
public class DefaultEventHandler<R extends CustomResource<?, ?>> implements EventHandler {
3635

3736
private static final Logger log = LoggerFactory.getLogger(DefaultEventHandler.class);
37+
private static EventMonitor monitor = new EventMonitor() {
38+
@Override
39+
public void processedEvent(String uid, Event event) {}
40+
41+
@Override
42+
public void failedEvent(String uid, Event event) {}
43+
};
3844

3945
private final EventBuffer eventBuffer;
4046
private final Set<String> underProcessing = new HashSet<>();
@@ -46,37 +52,35 @@ public class DefaultEventHandler<R extends CustomResource<?, ?>> implements Even
4652
private final int terminationTimeout;
4753
private final ReentrantLock lock = new ReentrantLock();
4854
private DefaultEventSourceManager<R> eventSourceManager;
49-
private final ControllerConfiguration<R> configuration;
5055

5156
public DefaultEventHandler(ConfiguredController<R> controller) {
5257
this(
5358
new EventDispatcher<>(controller),
5459
controller.getConfiguration().getName(),
5560
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()),
5661
controller.getConfiguration().getConfigurationService().concurrentReconciliationThreads(),
57-
controller.getConfiguration().getConfigurationService().getTerminationTimeoutSeconds(),
58-
controller.getConfiguration());
62+
controller.getConfiguration().getConfigurationService().getTerminationTimeoutSeconds());
5963
}
6064

6165
DefaultEventHandler(
6266
EventDispatcher<R> eventDispatcher,
6367
String relatedControllerName,
6468
Retry retry,
65-
int concurrentReconciliationThreads, ControllerConfiguration<R> configuration) {
69+
int concurrentReconciliationThreads) {
6670
this(
6771
eventDispatcher,
6872
relatedControllerName,
6973
retry,
7074
concurrentReconciliationThreads,
71-
ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS, configuration);
75+
ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS);
7276
}
7377

7478
private DefaultEventHandler(
7579
EventDispatcher<R> eventDispatcher,
7680
String relatedControllerName,
7781
Retry retry,
7882
int concurrentReconciliationThreads,
79-
int terminationTimeout, ControllerConfiguration<R> configuration) {
83+
int terminationTimeout) {
8084
this.eventDispatcher = eventDispatcher;
8185
this.retry = retry;
8286
this.controllerName = relatedControllerName;
@@ -86,7 +90,6 @@ private DefaultEventHandler(
8690
new ScheduledThreadPoolExecutor(
8791
concurrentReconciliationThreads,
8892
runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName));
89-
this.configuration = configuration;
9093
}
9194

9295
@Override
@@ -106,6 +109,16 @@ public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManage
106109
this.eventSourceManager = eventSourceManager;
107110
}
108111

112+
public static void setEventMonitor(EventMonitor monitor) {
113+
DefaultEventHandler.monitor = monitor;
114+
}
115+
116+
public interface EventMonitor {
117+
void processedEvent(String uid, Event event);
118+
119+
void failedEvent(String uid, Event event);
120+
}
121+
109122
@Override
110123
public void handleEvent(Event event) {
111124
try {
@@ -115,10 +128,7 @@ public void handleEvent(Event event) {
115128
final Predicate<CustomResource> selector = event.getCustomResourcesSelector();
116129
for (String uid : eventSourceManager.getLatestResourceUids(selector)) {
117130
eventBuffer.addEvent(uid, event);
118-
configuration
119-
.getConfigurationService()
120-
.getMetrics()
121-
.timeControllerEvents();
131+
monitor.processedEvent(uid, event);
122132
executeBufferedEvents(uid);
123133
}
124134
} finally {
@@ -168,10 +178,8 @@ void eventProcessingFinished(
168178

169179
if (retry != null && postExecutionControl.exceptionDuringExecution()) {
170180
handleRetryOnException(executionScope);
171-
configuration
172-
.getConfigurationService()
173-
.getMetrics()
174-
.timeControllerRetry();
181+
executionScope.getEvents()
182+
.forEach(e -> monitor.failedEvent(executionScope.getCustomResourceUid(), e));
175183
return;
176184
}
177185

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ExecutionScope.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package io.javaoperatorsdk.operator.processing;
22

3+
import java.util.List;
4+
35
import io.fabric8.kubernetes.client.CustomResource;
46
import io.javaoperatorsdk.operator.api.RetryInfo;
57
import io.javaoperatorsdk.operator.processing.event.Event;
6-
import java.util.List;
78

8-
public class ExecutionScope<R extends CustomResource> {
9+
public class ExecutionScope<R extends CustomResource<?, ?>> {
910

1011
private final List<Event> events;
1112
// the latest custom resource from cache

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public DefaultEventSourceManager(ConfiguredController<R> controller) {
5050
new CustomResourceEventSource<>(controller));
5151
}
5252

53+
public DefaultEventHandler<R> getEventHandler() {
54+
return defaultEventHandler;
55+
}
56+
5357
@Override
5458
public void close() {
5559
try {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/CustomResourceSelectorTest.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
import org.junit.jupiter.api.Test;
1818
import org.mockito.ArgumentCaptor;
1919

20-
import io.javaoperatorsdk.operator.Metrics;
2120
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
22-
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
2321
import io.javaoperatorsdk.operator.processing.event.DefaultEvent;
2422
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
2523
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
@@ -34,16 +32,12 @@ class CustomResourceSelectorTest {
3432
private final DefaultEventSourceManager defaultEventSourceManagerMock =
3533
mock(DefaultEventSourceManager.class);
3634

37-
private ControllerConfiguration configuration =
38-
mock(ControllerConfiguration.class);
39-
private final ConfigurationService configService = mock(ConfigurationService.class);
40-
4135
private final DefaultEventHandler defaultEventHandler =
4236
new DefaultEventHandler(
4337
eventDispatcherMock,
4438
"Test",
4539
null,
46-
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER, configuration);
40+
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER);
4741

4842
@BeforeEach
4943
public void setup() {
@@ -63,10 +57,6 @@ public void setup() {
6357
})
6458
.when(defaultEventSourceManagerMock)
6559
.cleanup(any());
66-
67-
when(configuration.getName()).thenReturn("DefaultEventHandlerTest");
68-
when(configService.getMetrics()).thenReturn(Metrics.NOOP);
69-
when(configuration.getConfigurationService()).thenReturn(configService);
7060
}
7161

7262
@Test

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,27 @@
1313
import static org.mockito.Mockito.verify;
1414
import static org.mockito.Mockito.when;
1515

16-
import io.fabric8.kubernetes.client.CustomResource;
17-
import io.fabric8.kubernetes.client.Watcher;
18-
import io.javaoperatorsdk.operator.Metrics;
19-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
20-
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
21-
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
22-
import io.javaoperatorsdk.operator.processing.event.Event;
23-
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent;
24-
import io.javaoperatorsdk.operator.processing.event.internal.TimerEvent;
25-
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
26-
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
27-
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
2816
import java.util.Arrays;
2917
import java.util.List;
3018
import java.util.UUID;
19+
3120
import org.junit.jupiter.api.BeforeEach;
3221
import org.junit.jupiter.api.Test;
3322
import org.mockito.ArgumentCaptor;
3423
import org.mockito.stubbing.Answer;
3524
import org.slf4j.Logger;
3625
import org.slf4j.LoggerFactory;
3726

27+
import io.fabric8.kubernetes.client.Watcher;
28+
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
29+
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
30+
import io.javaoperatorsdk.operator.processing.event.Event;
31+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent;
32+
import io.javaoperatorsdk.operator.processing.event.internal.TimerEvent;
33+
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
34+
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
35+
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
36+
3837
class DefaultEventHandlerTest {
3938

4039
private static final Logger log = LoggerFactory.getLogger(DefaultEventHandlerTest.class);
@@ -47,25 +46,20 @@ class DefaultEventHandlerTest {
4746
mock(DefaultEventSourceManager.class);
4847

4948
private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class);
50-
private ControllerConfiguration configuration =
51-
mock(ControllerConfiguration.class);
52-
private final ConfigurationService configService = mock(ConfigurationService.class);
5349

5450
private DefaultEventHandler defaultEventHandler =
5551
new DefaultEventHandler(
5652
eventDispatcherMock,
5753
"Test",
5854
null,
59-
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER,
60-
configuration);
55+
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER);
6156

6257
private DefaultEventHandler defaultEventHandlerWithRetry =
6358
new DefaultEventHandler(
6459
eventDispatcherMock,
6560
"Test",
6661
GenericRetry.defaultLimitedExponentialRetry(),
67-
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER,
68-
configuration);
62+
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER);
6963

7064
@BeforeEach
7165
public void setup() {
@@ -74,10 +68,6 @@ public void setup() {
7468
defaultEventHandler.setEventSourceManager(defaultEventSourceManagerMock);
7569
defaultEventHandlerWithRetry.setEventSourceManager(defaultEventSourceManagerMock);
7670

77-
when(configuration.getName()).thenReturn("DefaultEventHandlerTest");
78-
when(configService.getMetrics()).thenReturn(Metrics.NOOP);
79-
when(configuration.getConfigurationService()).thenReturn(configService);
80-
8171
// todo: remove
8272
when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache);
8373
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any());

operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.awaitility.Awaitility.await;
55

6+
import java.io.IOException;
7+
import java.io.InputStream;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
613
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
714
import io.fabric8.kubernetes.api.model.Namespace;
815
import io.fabric8.kubernetes.api.model.NamespaceBuilder;
@@ -19,11 +26,6 @@
1926
import io.javaoperatorsdk.operator.processing.retry.Retry;
2027
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
2128
import io.javaoperatorsdk.operator.sample.simple.TestCustomResourceSpec;
22-
import java.io.IOException;
23-
import java.io.InputStream;
24-
import java.util.concurrent.TimeUnit;
25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
2729

2830
public class IntegrationTestSupport {
2931

@@ -59,7 +61,7 @@ public void initialize(KubernetesClient k8sClient, ResourceController controller
5961
namespaces.create(
6062
new NamespaceBuilder().withNewMetadata().withName(TEST_NAMESPACE).endMetadata().build());
6163
}
62-
operator = new Operator(k8sClient, configurationService, Metrics.NOOP);
64+
operator = new Operator(k8sClient, configurationService);
6365
final var overriddenConfig =
6466
ControllerConfigurationOverrider.override(config).settingNamespace(TEST_NAMESPACE);
6567
if (retry != null) {

0 commit comments

Comments
 (0)