Skip to content

Commit 0fd679c

Browse files
committed
Add TaskScheduler option for Kafka metrics components
Related to: micrometer-metrics/micrometer#4976 * Introduce a `KafkaMetricsSupport` to have a common API for the `MeterBinder` registration * Rework `MicrometerConsumerListener`, `MicrometerProducerListener` and `KafkaStreamsMicrometerListener` to extend the `KafkaMetricsSupport` which allows to minimize code duplication * Expose ctors on those listeners based on the `TaskScheduler` * Implement a simple `ScheduledExecutorServiceAdapter` to adapt a `TaskScheduler` to the expected by the `KafkaMetrics` `ScheduledExecutorService`
1 parent 64870ed commit 0fd679c

File tree

7 files changed

+315
-77
lines changed

7 files changed

+315
-77
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ When using `KafkaStreamsCustomizer` it is now possible to return a custom implem
5858

5959
[[x33-kafka-headers-for-batch-listeners]]
6060
=== KafkaHeaders.DELIVERY_ATTEMPT for batch listeners
61+
6162
When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields.
6263
If the `DeliveryAttemptAwareRetryListener` is set to error handler as retry listener, each `ConsumerRecord` has delivery attempt header.
63-
For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempts-header-for-batch-listener[kafka-headers-for-batch-listener].
64+
For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempts-header-for-batch-listener[Kafka Headers for Batch Listener].
65+
66+
[[x33-task-scheduler-for-kafka-metrics]]
67+
=== Kafka Metrics Listeners and `TaskScheduler`
68+
69+
The `MicrometerProducerListener`, `MicrometerConsumerListener` and `KafkaStreamsMicrometerListener` can now be configured with a `TaskScheduler`.
70+
See `KafkaMetrics` JavaDocs and xref:kafka/micrometer.adoc[Micrometer Support] for more information.
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.time.Duration;
20+
import java.time.Instant;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.ScheduledFuture;
28+
import java.util.concurrent.ScheduledThreadPoolExecutor;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import org.apache.kafka.clients.admin.AdminClient;
32+
import org.apache.kafka.clients.consumer.Consumer;
33+
import org.apache.kafka.clients.producer.Producer;
34+
35+
import org.springframework.lang.Nullable;
36+
import org.springframework.scheduling.TaskScheduler;
37+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
38+
import org.springframework.util.Assert;
39+
import org.springframework.util.ReflectionUtils;
40+
41+
42+
import io.micrometer.core.instrument.ImmutableTag;
43+
import io.micrometer.core.instrument.MeterRegistry;
44+
import io.micrometer.core.instrument.Tag;
45+
import io.micrometer.core.instrument.binder.MeterBinder;
46+
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
47+
48+
/**
49+
* An abstract class to manage {@link io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics}.
50+
*
51+
* @param <C> the Kafka Client type.
52+
*
53+
* @author Artem Bilan
54+
*
55+
* @since 3.3
56+
*/
57+
public abstract class KafkaMetricsSupport<C> {
58+
59+
protected final MeterRegistry meterRegistry;
60+
61+
protected final List<Tag> tags;
62+
63+
@Nullable
64+
protected final ScheduledExecutorService scheduler;
65+
66+
private final Map<String, MeterBinder> metrics = new HashMap<>();
67+
68+
/**
69+
* Construct an instance with the provided registry.
70+
*
71+
* @param meterRegistry the registry.
72+
*/
73+
protected KafkaMetricsSupport(MeterRegistry meterRegistry) {
74+
this(meterRegistry, Collections.emptyList());
75+
}
76+
77+
/**
78+
* Construct an instance with the provided {@link MeterRegistry} and {@link TaskScheduler}.
79+
*
80+
* @param meterRegistry the registry.
81+
* @param taskScheduler the task scheduler.
82+
*/
83+
protected KafkaMetricsSupport(MeterRegistry meterRegistry, TaskScheduler taskScheduler) {
84+
this(meterRegistry, Collections.emptyList(), taskScheduler);
85+
}
86+
87+
/**
88+
* Construct an instance with the provided {@link MeterRegistry} and tags.
89+
*
90+
* @param meterRegistry the registry.
91+
* @param tags the tags.
92+
*/
93+
protected KafkaMetricsSupport(MeterRegistry meterRegistry, List<Tag> tags) {
94+
Assert.notNull(meterRegistry, "The 'meterRegistry' cannot be null");
95+
this.meterRegistry = meterRegistry;
96+
this.tags = tags;
97+
this.scheduler = null;
98+
}
99+
100+
/**
101+
* Construct an instance with the provided {@link MeterRegistry}, tags and {@link TaskScheduler}.
102+
*
103+
* @param meterRegistry the registry.
104+
* @param tags the tags.
105+
* @param taskScheduler the task scheduler.
106+
*/
107+
protected KafkaMetricsSupport(MeterRegistry meterRegistry, List<Tag> tags, TaskScheduler taskScheduler) {
108+
Assert.notNull(meterRegistry, "The 'meterRegistry' cannot be null");
109+
Assert.notNull(taskScheduler, "The 'taskScheduler' cannot be null");
110+
this.meterRegistry = meterRegistry;
111+
this.tags = tags;
112+
this.scheduler = obtainScheduledExecutorService(taskScheduler);
113+
}
114+
115+
protected void clientAdded(String id, C client) {
116+
if (!this.metrics.containsKey(id)) {
117+
List<Tag> clientTags = new ArrayList<>(this.tags);
118+
clientTags.add(new ImmutableTag("spring.id", id));
119+
this.metrics.put(id, createClientMetrics(client, clientTags));
120+
this.metrics.get(id).bindTo(this.meterRegistry);
121+
}
122+
}
123+
124+
protected MeterBinder createClientMetrics(C client, List<Tag> tags) {
125+
if (client instanceof Consumer<?, ?> consumer) {
126+
return createConsumerMetrics(consumer, tags);
127+
}
128+
else if (client instanceof Producer<?, ?> producer) {
129+
return createProducerMetrics(producer, tags);
130+
}
131+
else if (client instanceof AdminClient admin) {
132+
return createAdminMetrics(admin, tags);
133+
}
134+
135+
throw new IllegalArgumentException("Unsupported client type: " + client.getClass());
136+
}
137+
138+
private KafkaClientMetrics createConsumerMetrics(Consumer<?, ?> consumer, List<Tag> tags) {
139+
return this.scheduler != null
140+
? new KafkaClientMetrics(consumer, tags, this.scheduler)
141+
: new KafkaClientMetrics(consumer, tags);
142+
}
143+
144+
private KafkaClientMetrics createProducerMetrics(Producer<?, ?> producer, List<Tag> tags) {
145+
return this.scheduler != null
146+
? new KafkaClientMetrics(producer, tags, this.scheduler)
147+
: new KafkaClientMetrics(producer, tags);
148+
}
149+
150+
private KafkaClientMetrics createAdminMetrics(AdminClient adminClient, List<Tag> tags) {
151+
return this.scheduler != null
152+
? new KafkaClientMetrics(adminClient, tags, this.scheduler)
153+
: new KafkaClientMetrics(adminClient, tags);
154+
}
155+
156+
protected void clientRemoved(String id, C client) {
157+
AutoCloseable removed = (AutoCloseable) this.metrics.remove(id);
158+
if (removed != null) {
159+
try {
160+
removed.close();
161+
}
162+
catch (Exception ex) {
163+
ReflectionUtils.rethrowRuntimeException(ex);
164+
}
165+
}
166+
}
167+
168+
private static ScheduledExecutorService obtainScheduledExecutorService(TaskScheduler taskScheduler) {
169+
if (taskScheduler instanceof ThreadPoolTaskScheduler threadPoolTaskScheduler) {
170+
return threadPoolTaskScheduler.getScheduledExecutor();
171+
}
172+
173+
return new ScheduledExecutorServiceAdapter(taskScheduler);
174+
}
175+
176+
private static final class ScheduledExecutorServiceAdapter extends ScheduledThreadPoolExecutor {
177+
178+
private final TaskScheduler delegate;
179+
180+
private ScheduledExecutorServiceAdapter(TaskScheduler delegate) {
181+
super(0);
182+
this.delegate = delegate;
183+
}
184+
185+
@Override
186+
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
187+
long initialDelay,
188+
long period,
189+
TimeUnit unit) {
190+
191+
return this.delegate.scheduleAtFixedRate(command,
192+
Instant.now().plus(initialDelay, unit.toChronoUnit()),
193+
Duration.of(period, unit.toChronoUnit()));
194+
}
195+
196+
}
197+
198+
}
Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,14 @@
1616

1717
package org.springframework.kafka.core;
1818

19-
import java.util.ArrayList;
2019
import java.util.Collections;
21-
import java.util.HashMap;
2220
import java.util.List;
23-
import java.util.Map;
21+
2422

2523
import org.apache.kafka.clients.consumer.Consumer;
2624

27-
import io.micrometer.core.instrument.ImmutableTag;
25+
import org.springframework.scheduling.TaskScheduler;
26+
2827
import io.micrometer.core.instrument.MeterRegistry;
2928
import io.micrometer.core.instrument.Tag;
3029
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
@@ -36,16 +35,12 @@
3635
* @param <V> the value type.
3736
*
3837
* @author Gary Russell
39-
* @since 2.5
38+
* @author Artem Bilan
4039
*
40+
* @since 2.5
4141
*/
42-
public class MicrometerConsumerListener<K, V> implements ConsumerFactory.Listener<K, V> {
43-
44-
private final MeterRegistry meterRegistry;
45-
46-
private final List<Tag> tags;
47-
48-
private final Map<String, KafkaClientMetrics> metrics = new HashMap<>();
42+
public class MicrometerConsumerListener<K, V> extends KafkaMetricsSupport<Consumer<K, V>>
43+
implements ConsumerFactory.Listener<K, V> {
4944

5045
/**
5146
* Construct an instance with the provided registry.
@@ -55,32 +50,44 @@ public MicrometerConsumerListener(MeterRegistry meterRegistry) {
5550
this(meterRegistry, Collections.emptyList());
5651
}
5752

53+
/**
54+
* Construct an instance with the provided registry and task scheduler.
55+
* @param meterRegistry the registry.
56+
* @param taskScheduler the task scheduler.
57+
* @since 3.3
58+
*/
59+
public MicrometerConsumerListener(MeterRegistry meterRegistry, TaskScheduler taskScheduler) {
60+
this(meterRegistry, Collections.emptyList(), taskScheduler);
61+
}
62+
5863
/**
5964
* Construct an instance with the provided registry and tags.
6065
* @param meterRegistry the registry.
6166
* @param tags the tags.
6267
*/
6368
public MicrometerConsumerListener(MeterRegistry meterRegistry, List<Tag> tags) {
64-
this.meterRegistry = meterRegistry;
65-
this.tags = tags;
69+
super(meterRegistry, tags);
70+
}
71+
72+
/**
73+
* Construct an instance with the provided registry, tags and task scheduler.
74+
* @param meterRegistry the registry.
75+
* @param tags the tags.
76+
* @param taskScheduler the task scheduler.
77+
* @since 3.3
78+
*/
79+
public MicrometerConsumerListener(MeterRegistry meterRegistry, List<Tag> tags, TaskScheduler taskScheduler) {
80+
super(meterRegistry, tags, taskScheduler);
6681
}
6782

6883
@Override
6984
public synchronized void consumerAdded(String id, Consumer<K, V> consumer) {
70-
if (!this.metrics.containsKey(id)) {
71-
List<Tag> consumerTags = new ArrayList<>(this.tags);
72-
consumerTags.add(new ImmutableTag("spring.id", id));
73-
this.metrics.put(id, new KafkaClientMetrics(consumer, consumerTags));
74-
this.metrics.get(id).bindTo(this.meterRegistry);
75-
}
85+
clientAdded(id, consumer);
7686
}
7787

7888
@Override
7989
public synchronized void consumerRemoved(String id, Consumer<K, V> consumer) {
80-
KafkaClientMetrics removed = this.metrics.remove(id);
81-
if (removed != null) {
82-
removed.close();
83-
}
90+
clientRemoved(id, consumer);
8491
}
8592

8693
}

0 commit comments

Comments
 (0)