Skip to content

Commit 1c0f0c8

Browse files
authored
Merge pull request #1360 from yue9944882/controller-metrics
Feat: Controller metrics
2 parents 5425fbd + 69a5c4c commit 1c0f0c8

File tree

9 files changed

+129
-25
lines changed

9 files changed

+129
-25
lines changed

examples/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,18 @@
7676
<artifactId>wiremock</artifactId>
7777
<scope>test</scope>
7878
</dependency>
79+
<!--for spring controller example-->
80+
<dependency>
81+
<groupId>org.springframework.boot</groupId>
82+
<artifactId>spring-boot-starter-web</artifactId>
83+
<version>${spring.boot.version}</version>
84+
</dependency>
85+
<dependency>
86+
<groupId>org.springframework.boot</groupId>
87+
<artifactId>spring-boot-starter-actuator</artifactId>
88+
<version>${spring.boot.version}</version>
89+
</dependency>
90+
7991
</dependencies>
8092
<build>
8193
<plugins>

examples/src/main/java/io/kubernetes/client/examples/SpringControllerExample.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.kubernetes.client.spring.extended.controller.KubernetesInformerConfigurer;
2828
import io.kubernetes.client.spring.extended.controller.annotation.*;
2929
import io.kubernetes.client.spring.extended.controller.factory.KubernetesControllerFactory;
30+
import io.kubernetes.client.spring.extended.controller.metrics.PrometheusScrapeEndpoint;
3031
import io.kubernetes.client.util.ClientBuilder;
3132
import java.io.IOException;
3233
import java.time.Duration;
@@ -94,6 +95,14 @@ public ApiClient myApiClient() throws IOException {
9495
public SharedInformerFactory sharedInformerFactory() {
9596
return new MySharedInformerFactory();
9697
}
98+
99+
// *OPTIONAL*
100+
// Enabling prometheus scraping endpoint at `/actuator/prometheus`
101+
// SHOULD set `management.endpoints.web.exposure.include=prometheus` property.
102+
@Bean
103+
public PrometheusScrapeEndpoint prometheusScrapeEndpoint() {
104+
return new PrometheusScrapeEndpoint();
105+
}
97106
}
98107

99108
@KubernetesInformers({ // Defining what resources is the informer-factory actually watching.
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
namespace=airflow
1+
namespace=airflow
2+
management.endpoints.web.exposure.include=prometheus

extended/src/main/java/io/kubernetes/client/extended/controller/DefaultController.java

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
import io.kubernetes.client.extended.controller.reconciler.Result;
1818
import io.kubernetes.client.extended.wait.Wait;
1919
import io.kubernetes.client.extended.workqueue.RateLimitingQueue;
20+
import io.prometheus.client.CollectorRegistry;
21+
import io.prometheus.client.Counter;
22+
import io.prometheus.client.Gauge;
2023
import java.time.Duration;
2124
import java.util.concurrent.CountDownLatch;
2225
import java.util.concurrent.ExecutorService;
@@ -36,28 +39,58 @@
3639
public class DefaultController implements Controller {
3740
private static final Logger log = LoggerFactory.getLogger(DefaultController.class);
3841

39-
private Reconciler reconciler;
40-
private String name;
42+
private static Gauge gaugeWorkQueueLength =
43+
Gauge.build("controller_work_queue_length", "Current length of the controller's work-queue")
44+
.labelNames("name")
45+
.register();
46+
private static Counter counterControllerReconcile =
47+
Counter.build("controller_reconcile_count_total", "Total count of controller reconciliation")
48+
.labelNames("name", "requeue")
49+
.register();
50+
51+
private final Reconciler reconciler;
52+
private final String name;
53+
private final RateLimitingQueue<Request> workQueue;
54+
private final Supplier<Boolean>[] readyFuncs;
55+
4156
private int workerCount;
4257
private ScheduledExecutorService workerThreadPool;
4358

44-
private RateLimitingQueue<Request> workQueue;
45-
private Supplier<Boolean>[] readyFuncs;
46-
4759
private Duration readyTimeout;
4860
private Duration readyCheckInternal;
4961

5062
/**
5163
* Instantiates a new Default controller.
5264
*
65+
* @param name the name
5366
* @param reconciler the reconciler
5467
* @param workQueue the work queue
5568
* @param readyFuncs the ready funcs
5669
*/
5770
public DefaultController(
71+
String name,
5872
Reconciler reconciler,
5973
RateLimitingQueue<Request> workQueue,
6074
Supplier<Boolean>... readyFuncs) {
75+
this(name, reconciler, workQueue, null, readyFuncs);
76+
}
77+
78+
/**
79+
* Instantiates a new Default controller.
80+
*
81+
* @param name the name
82+
* @param reconciler the reconciler
83+
* @param workQueue the work queue
84+
* @param collectorRegistry the collector registry
85+
* @param readyFuncs the ready funcs
86+
*/
87+
public DefaultController(
88+
String name,
89+
Reconciler reconciler,
90+
RateLimitingQueue<Request> workQueue,
91+
CollectorRegistry collectorRegistry,
92+
Supplier<Boolean>... readyFuncs) {
93+
this.name = name;
6194
this.reconciler = reconciler;
6295
this.workQueue = workQueue;
6396
this.readyFuncs = readyFuncs;
@@ -149,6 +182,7 @@ public void shutdown() {
149182
private void worker() {
150183
// taking tasks from work-queue in a loop
151184
while (!workQueue.isShuttingDown()) {
185+
gaugeWorkQueueLength.labels(name).set(workQueue.length());
152186
Request request = null;
153187
try {
154188
request = workQueue.get();
@@ -173,6 +207,8 @@ private void worker() {
173207
} catch (Throwable t) {
174208
log.error("Reconciler aborted unexpectedly", t);
175209
result = new Result(true);
210+
} finally {
211+
counterControllerReconcile.labels(this.name, Boolean.toString(result.isRequeue())).inc();
176212
}
177213

178214
try {
@@ -194,6 +230,7 @@ private void worker() {
194230
}
195231
} finally {
196232
workQueue.done(request);
233+
gaugeWorkQueueLength.labels(name).set(workQueue.length());
197234
log.debug("Controller {} finished reconciling {}..", this.name, request);
198235
}
199236
}
@@ -203,19 +240,10 @@ public RateLimitingQueue<Request> getWorkQueue() {
203240
return workQueue;
204241
}
205242

206-
public DefaultController setWorkQueue(RateLimitingQueue<Request> workQueue) {
207-
this.workQueue = workQueue;
208-
return this;
209-
}
210-
211243
public String getName() {
212244
return name;
213245
}
214246

215-
public void setName(String name) {
216-
this.name = name;
217-
}
218-
219247
public int getWorkerCount() {
220248
return workerCount;
221249
}
@@ -236,10 +264,6 @@ public Reconciler getReconciler() {
236264
return reconciler;
237265
}
238266

239-
public void setReconciler(Reconciler reconciler) {
240-
this.reconciler = reconciler;
241-
}
242-
243267
public Duration getReadyTimeout() {
244268
return readyTimeout;
245269
}

extended/src/main/java/io/kubernetes/client/extended/controller/builder/DefaultControllerBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,16 @@ public Controller build() throws IllegalStateException {
149149

150150
DefaultController controller =
151151
new DefaultController(
152-
this.reconciler, this.workQueue, this.readyFuncs.stream().toArray(Supplier[]::new));
152+
this.controllerName,
153+
this.reconciler,
154+
this.workQueue,
155+
this.readyFuncs.stream().toArray(Supplier[]::new));
153156

154-
controller.setName(this.controllerName);
155157
controller.setWorkerCount(this.workerCount);
156158
controller.setWorkerThreadPool(
157159
Executors.newScheduledThreadPool(
158160
this.workerCount, Controllers.namedControllerThreadFactory(this.controllerName)));
159161

160-
controller.setReconciler(this.reconciler);
161-
162162
return controller;
163163
}
164164
}

extended/src/test/java/io/kubernetes/client/extended/controller/DefaultControllerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void tearDown() throws Exception {}
6161
@Test
6262
public void testStartingStoppingController() throws InterruptedException {
6363

64-
DefaultController testController = new DefaultController(mockReconciler, workQueue);
64+
DefaultController testController = new DefaultController("", mockReconciler, workQueue);
6565

6666
testController.setWorkerCount(1);
6767
testController.setWorkerThreadPool(Executors.newScheduledThreadPool(1));
@@ -96,7 +96,7 @@ public void testControllerWontStartBeforeReady() throws InterruptedException {
9696

9797
AtomicBoolean ready = new AtomicBoolean(false);
9898
DefaultController testController =
99-
new DefaultController(mockReconciler, workQueue, () -> ready.get());
99+
new DefaultController("", mockReconciler, workQueue, () -> ready.get());
100100
testController.setWorkerCount(1);
101101
testController.setWorkerThreadPool(Executors.newScheduledThreadPool(1));
102102
testController.setReadyCheckInternal(Duration.ofMillis(100));
@@ -122,6 +122,7 @@ public void testControllerKeepsWorkingWhenReconcilerAbortsWithRuntimeException()
122122
List<Request> finishedRequests = new ArrayList<>();
123123
DefaultController testController =
124124
new DefaultController(
125+
"",
125126
new Reconciler() {
126127
@Override
127128
public Result reconcile(Request request) {

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@
168168
<artifactId>spring-boot-autoconfigure</artifactId>
169169
<version>${spring.boot.version}</version>
170170
</dependency>
171+
<dependency>
172+
<groupId>org.springframework.boot</groupId>
173+
<artifactId>spring-boot-actuator</artifactId>
174+
<version>${spring.boot.version}</version>
175+
<optional>true</optional>
176+
</dependency>
171177
<dependency>
172178
<groupId>io.prometheus</groupId>
173179
<artifactId>simpleclient</artifactId>

spring/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
<groupId>org.springframework.boot</groupId>
3636
<artifactId>spring-boot-autoconfigure</artifactId>
3737
</dependency>
38+
<dependency>
39+
<groupId>org.springframework.boot</groupId>
40+
<artifactId>spring-boot-actuator</artifactId>
41+
</dependency>
3842

3943
<dependency>
4044
<groupId>junit</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.spring.extended.controller.metrics;
14+
15+
import io.prometheus.client.CollectorRegistry;
16+
import io.prometheus.client.exporter.common.TextFormat;
17+
import java.io.IOException;
18+
import java.io.StringWriter;
19+
import java.io.Writer;
20+
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
21+
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
22+
23+
@Endpoint(id = "prometheus")
24+
public class PrometheusScrapeEndpoint {
25+
26+
private final CollectorRegistry collectorRegistry;
27+
28+
public PrometheusScrapeEndpoint() {
29+
this(CollectorRegistry.defaultRegistry);
30+
}
31+
32+
public PrometheusScrapeEndpoint(CollectorRegistry collectorRegistry) {
33+
this.collectorRegistry = collectorRegistry;
34+
}
35+
36+
@ReadOperation
37+
public String scrape() {
38+
try {
39+
Writer writer = new StringWriter();
40+
TextFormat.write004(writer, collectorRegistry.metricFamilySamples());
41+
return writer.toString();
42+
} catch (IOException e) {
43+
// This actually never happens since StringWriter::write() doesn't throw any IOException
44+
throw new RuntimeException("Writing metrics failed", e);
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)