Skip to content

Commit 853f20f

Browse files
authored
Merge pull request kroxylicious#1867 from SamBarker/operator-metrics
Operator metrics
2 parents 809572a + 73fede1 commit 853f20f

File tree

5 files changed

+286
-11
lines changed

5 files changed

+286
-11
lines changed

kroxylicious-operator/pom.xml

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
<dependencyManagement>
3030
<dependencies>
31-
<!-- note, we're inheriting the jackson BOM from our parent pom -->
31+
<!-- note, we're inheriting the jackson & micrometer BOMs from our parent pom -->
3232

3333
<dependency>
3434
<groupId>io.javaoperatorsdk</groupId>
@@ -52,6 +52,10 @@
5252
<groupId>io.javaoperatorsdk</groupId>
5353
<artifactId>operator-framework-core</artifactId>
5454
</dependency>
55+
<dependency>
56+
<groupId>io.javaoperatorsdk</groupId>
57+
<artifactId>micrometer-support</artifactId>
58+
</dependency>
5559
<dependency>
5660
<groupId>io.fabric8</groupId>
5761
<artifactId>kubernetes-client</artifactId>
@@ -151,6 +155,16 @@
151155
<scope>compile</scope>
152156
</dependency>
153157

158+
<dependency>
159+
<groupId>io.micrometer</groupId>
160+
<artifactId>micrometer-core</artifactId>
161+
</dependency>
162+
163+
<dependency>
164+
<groupId>io.micrometer</groupId>
165+
<artifactId>micrometer-registry-prometheus</artifactId>
166+
</dependency>
167+
154168
<dependency>
155169
<groupId>org.junit.jupiter</groupId>
156170
<artifactId>junit-jupiter-api</artifactId>
@@ -191,6 +205,17 @@
191205
<artifactId>junit-pioneer</artifactId>
192206
<scope>test</scope>
193207
</dependency>
208+
<dependency>
209+
<groupId>io.fabric8</groupId>
210+
<artifactId>kubernetes-server-mock</artifactId>
211+
<scope>test</scope>
212+
<!-- TODO exclude okhttp -->
213+
</dependency>
214+
<dependency>
215+
<groupId>io.fabric8</groupId>
216+
<artifactId>kubernetes-model-apiextensions</artifactId>
217+
<scope>test</scope>
218+
</dependency>
194219
</dependencies>
195220
<build>
196221
<plugins>

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/OperatorMain.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,43 +11,83 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14+
import io.fabric8.kubernetes.client.KubernetesClient;
1415
import io.javaoperatorsdk.operator.Operator;
16+
import io.javaoperatorsdk.operator.monitoring.micrometer.MicrometerMetrics;
17+
import io.micrometer.core.instrument.MeterRegistry;
18+
import io.micrometer.core.instrument.Metrics;
19+
import io.micrometer.prometheusmetrics.PrometheusConfig;
20+
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
1521

1622
import io.kroxylicious.kubernetes.operator.config.FilterApiDecl;
1723
import io.kroxylicious.kubernetes.operator.config.RuntimeDecl;
1824

1925
import edu.umd.cs.findbugs.annotations.NonNull;
26+
import edu.umd.cs.findbugs.annotations.Nullable;
2027

2128
/**
2229
* The {@code main} method entrypoint for the operator
2330
*/
2431
public class OperatorMain {
2532

2633
private static final Logger LOGGER = LoggerFactory.getLogger(OperatorMain.class);
34+
private MeterRegistry registry;
35+
private final Operator operator;
36+
37+
public OperatorMain() {
38+
this(null);
39+
}
40+
41+
public OperatorMain(@Nullable KubernetesClient kubeClient) {
42+
final MicrometerMetrics metrics = enablePrometheusMetrics();
43+
// o.withMetrics is invoked multiple times so can cause issues with enabling metrics.
44+
operator = new Operator(o -> {
45+
o.withMetrics(metrics);
46+
if (kubeClient != null) {
47+
o.withKubernetesClient(kubeClient);
48+
}
49+
});
50+
}
2751

2852
public static void main(String[] args) {
2953
try {
30-
run();
54+
new OperatorMain().start();
3155
}
3256
catch (Exception e) {
3357
LOGGER.error("Operator has thrown exception during startup. Will now exit.", e);
3458
System.exit(1);
3559
}
3660
}
3761

38-
static void run() {
39-
Operator operator = new Operator();
62+
/**
63+
* Starts the operator instance and returns once that has completed successfully.
64+
*/
65+
void start() {
4066
operator.installShutdownHook(Duration.ofSeconds(10));
4167
var registeredController = operator.register(new ProxyReconciler(runtimeDecl()));
4268
// TODO couple the health of the registeredController to the operator's HTTP healthchecks
4369
operator.start();
4470
LOGGER.info("Operator started.");
4571
}
4672

73+
void stop() {
74+
operator.stop();
75+
LOGGER.info("Operator stopped.");
76+
}
77+
4778
@NonNull
4879
static RuntimeDecl runtimeDecl() {
4980
// TODO read these from some configuration CR
5081
return new RuntimeDecl(List.of(
5182
new FilterApiDecl("filter.kroxylicious.io", "v1alpha1", "KafkaProtocolFilter")));
5283
}
84+
85+
private MicrometerMetrics enablePrometheusMetrics() {
86+
registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
87+
Metrics.globalRegistry.add(registry);
88+
return MicrometerMetrics.newPerResourceCollectingMicrometerMetricsBuilder(Metrics.globalRegistry)
89+
.withCleanUpDelayInSeconds(35)
90+
.withCleaningThreadNumber(1)
91+
.build();
92+
}
5393
}

kroxylicious-operator/src/main/resources/log4j2.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,8 @@ Configuration:
3636
additivity: false
3737
AppenderRef:
3838
- ref: STDOUT
39+
- name: io.fabric8.kubernetes.client.http
40+
level: INFO
41+
additivity: false
42+
AppenderRef:
43+
- ref: STDOUT

kroxylicious-operator/src/test/java/io/kroxylicious/kubernetes/operator/OperatorMainIT.java

Lines changed: 105 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,116 @@
66

77
package io.kroxylicious.kubernetes.operator;
88

9-
import org.assertj.core.api.Assumptions;
10-
import org.junit.jupiter.api.Disabled;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import org.awaitility.Awaitility;
12+
import org.junit.jupiter.api.AfterAll;
13+
import org.junit.jupiter.api.AfterEach;
14+
import org.junit.jupiter.api.BeforeAll;
15+
import org.junit.jupiter.api.BeforeEach;
1116
import org.junit.jupiter.api.Test;
17+
import org.junit.jupiter.api.condition.EnabledIf;
18+
19+
import io.fabric8.kubernetes.client.KubernetesClient;
20+
import io.javaoperatorsdk.operator.OperatorException;
21+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
22+
import io.micrometer.core.instrument.Metrics;
23+
import io.micrometer.core.instrument.search.MeterNotFoundException;
24+
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
25+
26+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaClusterRef;
27+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
28+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyBuilder;
29+
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
30+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.assertj.core.api.Assertions.fail;
1234

13-
@Disabled
35+
@EnabledIf(value = "io.kroxylicious.kubernetes.operator.OperatorTestUtils#isKubeClientAvailable", disabledReason = "no viable kube client available")
1436
class OperatorMainIT {
37+
private OperatorMain operatorMain;
1538
// This is an IT because it depends on having a running Kube cluster
1639

40+
@BeforeAll
41+
static void beforeAll() {
42+
LocallyRunOperatorExtension.applyCrd(KafkaProtocolFilter.class, OperatorTestUtils.kubeClientIfAvailable());
43+
LocallyRunOperatorExtension.applyCrd(KafkaProxy.class, OperatorTestUtils.kubeClientIfAvailable());
44+
LocallyRunOperatorExtension.applyCrd(VirtualKafkaCluster.class, OperatorTestUtils.kubeClientIfAvailable());
45+
LocallyRunOperatorExtension.applyCrd(KafkaClusterRef.class, OperatorTestUtils.kubeClientIfAvailable());
46+
}
47+
48+
@AfterAll
49+
static void afterAll() {
50+
try (KubernetesClient kubernetesClient = OperatorTestUtils.kubeClientIfAvailable()) {
51+
if (kubernetesClient != null) {
52+
kubernetesClient.resources(KafkaProtocolFilter.class).delete();
53+
kubernetesClient.resources(KafkaProxy.class).delete();
54+
kubernetesClient.resources(VirtualKafkaCluster.class).delete();
55+
kubernetesClient.resources(KafkaClusterRef.class).delete();
56+
}
57+
}
58+
}
59+
60+
@BeforeEach
61+
void beforeEach() {
62+
operatorMain = new OperatorMain(null);
63+
}
64+
65+
@AfterEach
66+
void afterEach() {
67+
if (operatorMain != null) {
68+
operatorMain.stop();
69+
}
70+
}
71+
1772
@Test
18-
void run() {
19-
Assumptions.assumeThat(OperatorTestUtils.isKubeClientAvailable()).describedAs("Test requires a viable kube client").isTrue();
20-
OperatorMain.run();
73+
void start() {
74+
try {
75+
operatorMain.start();
76+
}
77+
catch (OperatorException e) {
78+
fail("Exception occurred starting operator: " + e.getMessage());
79+
}
80+
2181
}
2282

23-
}
83+
@Test
84+
void shouldRegisterPrometheusMetricsInGlobalRegistry() {
85+
// Given
86+
87+
// When
88+
operatorMain.start();
89+
90+
// Then
91+
assertThat(Metrics.globalRegistry.getRegistries())
92+
.hasAtLeastOneElementOfType(PrometheusMeterRegistry.class);
93+
}
94+
95+
@Test
96+
void shouldRegisterOperatorMetrics() {
97+
// Given
98+
final KafkaProxyBuilder proxyBuilder = new KafkaProxyBuilder().withKind("KafkaProxy").withNewMetadata().withName("mycoolproxy").endMetadata();
99+
operatorMain.start();
100+
101+
// When
102+
OperatorTestUtils.kubeClientIfAvailable().resource(proxyBuilder.build()).create();
103+
104+
// Then
105+
Awaitility.await()
106+
.atMost(10, TimeUnit.SECONDS)
107+
.ignoreException(MeterNotFoundException.class)
108+
.untilAsserted(() -> assertThat(Metrics.globalRegistry.get("operator.sdk.events.received").meter().getId()).isNotNull());
109+
}
110+
111+
@Test
112+
void shouldRegisterMetricsForProxyReconciler() {
113+
// Given
114+
115+
// When
116+
operatorMain.start();
117+
118+
// Then
119+
assertThat(Metrics.globalRegistry.get("operator.sdk.reconciliations.executions.proxyreconciler").meter().getId()).isNotNull();
120+
}
121+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.kubernetes.operator;
8+
9+
import java.util.concurrent.TimeUnit;
10+
11+
import org.awaitility.Awaitility;
12+
import org.junit.jupiter.api.AfterEach;
13+
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
15+
16+
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition;
17+
import io.fabric8.kubernetes.client.CustomResource;
18+
import io.fabric8.kubernetes.client.KubernetesClient;
19+
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
20+
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
21+
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
22+
import io.micrometer.core.instrument.Metrics;
23+
import io.micrometer.core.instrument.search.MeterNotFoundException;
24+
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
25+
26+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaClusterRef;
27+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxy;
28+
import io.kroxylicious.kubernetes.api.v1alpha1.KafkaProxyBuilder;
29+
import io.kroxylicious.kubernetes.api.v1alpha1.VirtualKafkaCluster;
30+
import io.kroxylicious.kubernetes.filter.api.v1alpha1.KafkaProtocolFilter;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
34+
@EnableKubernetesMockClient(crud = true)
35+
class OperatorMainTest {
36+
37+
KubernetesClient kubeClient;
38+
KubernetesMockServer mockServer;
39+
40+
private OperatorMain operatorMain;
41+
42+
@BeforeEach
43+
void setUp() {
44+
expectApiResources();
45+
operatorMain = new OperatorMain(kubeClient);
46+
}
47+
48+
@AfterEach
49+
void tearDown() {
50+
if (operatorMain != null) {
51+
operatorMain.stop();
52+
}
53+
}
54+
55+
@Test
56+
void shouldRegisterPrometheusMeterRegistry() {
57+
// Given
58+
59+
// When
60+
operatorMain.start();
61+
62+
// Then
63+
assertThat(Metrics.globalRegistry.getRegistries())
64+
.hasAtLeastOneElementOfType(PrometheusMeterRegistry.class);
65+
}
66+
67+
@Test
68+
void shouldRegisterOperatorMetrics() {
69+
// Given
70+
final KafkaProxyBuilder proxyBuilder = new KafkaProxyBuilder().withKind("KafkaProxy").withNewMetadata().withName("mycoolproxy").endMetadata();
71+
operatorMain.start();
72+
73+
// When
74+
kubeClient.resource(proxyBuilder.build()).create();
75+
76+
// Then
77+
Awaitility.await()
78+
.atMost(10, TimeUnit.SECONDS)
79+
.ignoreException(MeterNotFoundException.class)
80+
.untilAsserted(() -> assertThat(Metrics.globalRegistry.get("operator.sdk.events.received").meter().getId()).isNotNull());
81+
82+
}
83+
84+
@Test
85+
void shouldRegisterMetricsForProxyReconciler() {
86+
// Given
87+
88+
// When
89+
operatorMain.start();
90+
91+
// Then
92+
assertThat(Metrics.globalRegistry.get("operator.sdk.reconciliations.executions.proxyreconciler").meter().getId()).isNotNull();
93+
}
94+
95+
private void expectApiResources() {
96+
expectCrd(KafkaProtocolFilter.class);
97+
expectCrd(KafkaProxy.class);
98+
expectCrd(VirtualKafkaCluster.class);
99+
expectCrd(KafkaClusterRef.class);
100+
}
101+
102+
private void expectCrd(Class<? extends CustomResource<?, ?>> crdClass) {
103+
final CustomResourceDefinition resourceDefinition = CustomResourceDefinitionContext.v1CRDFromCustomResourceType(crdClass)
104+
.build();
105+
mockServer.expectCustomResource(CustomResourceDefinitionContext.fromCrd(resourceDefinition));
106+
}
107+
}

0 commit comments

Comments
 (0)