Skip to content

Commit e437c52

Browse files
committed
* Improve KafkaMetricsSupport API to make it more friendly for target projects
* Add more JavaDocs to the `KafkaMetricsSupport` * Mention `KafkaMetricsSupport` in the docs
1 parent 7230c23 commit e437c52

File tree

6 files changed

+36
-15
lines changed

6 files changed

+36
-15
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
8888

8989
A similar listener is provided for the `StreamsBuilderFactoryBean` - see xref:streams.adoc#streams-micrometer[KafkaStreams Micrometer Support].
9090

91+
Starting with version 3.3, a `KafkaMetricsSupport` abstract class is introduced to manage `io.micrometer.core.instrument.binder.kafka.KafkaMetrics` binding into a `MeterRegistry` for provided Kafka client.
92+
This class is a super for the mentioned above `MicrometerConsumerListener`, `MicrometerProducerListener` and `KafkaStreamsMicrometerListener`.
93+
However, it can be used for any Kafka client use-cases.
94+
The class needs to be extended and its `bindClient()` and `unbindClient()` API have to be called to connect Kafka client metrics with a Micrometer collector.
95+
9196
[[observation]]
9297
== Micrometer Observation
9398

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,4 @@ For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempt
6767
=== Kafka Metrics Listeners and `TaskScheduler`
6868

6969
The `MicrometerProducerListener`, `MicrometerConsumerListener` and `KafkaStreamsMicrometerListener` can now be configured with a `TaskScheduler`.
70-
See `KafkaMetrics` JavaDocs and xref:kafka/micrometer.adoc[Micrometer Support] for more information.
70+
See `KafkaMetricsSupport` JavaDocs and xref:kafka/micrometer.adoc[Micrometer Support] for more information.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaMetricsSupport.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,22 @@
3838
import org.springframework.util.Assert;
3939
import org.springframework.util.ReflectionUtils;
4040

41-
4241
import io.micrometer.core.instrument.ImmutableTag;
4342
import io.micrometer.core.instrument.MeterRegistry;
4443
import io.micrometer.core.instrument.Tag;
4544
import io.micrometer.core.instrument.binder.MeterBinder;
4645
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
4746

4847
/**
49-
* An abstract class to manage {@link io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics}.
48+
* An abstract class to manage {@link KafkaClientMetrics}.
5049
*
5150
* @param <C> the Kafka Client type.
5251
*
5352
* @author Artem Bilan
5453
*
5554
* @since 3.3
55+
*
56+
* @see KafkaClientMetrics
5657
*/
5758
public abstract class KafkaMetricsSupport<C> {
5859

@@ -67,7 +68,6 @@ public abstract class KafkaMetricsSupport<C> {
6768

6869
/**
6970
* Construct an instance with the provided registry.
70-
*
7171
* @param meterRegistry the registry.
7272
*/
7373
protected KafkaMetricsSupport(MeterRegistry meterRegistry) {
@@ -76,7 +76,6 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry) {
7676

7777
/**
7878
* Construct an instance with the provided {@link MeterRegistry} and {@link TaskScheduler}.
79-
*
8079
* @param meterRegistry the registry.
8180
* @param taskScheduler the task scheduler.
8281
*/
@@ -86,7 +85,6 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry, TaskScheduler taskSch
8685

8786
/**
8887
* Construct an instance with the provided {@link MeterRegistry} and tags.
89-
*
9088
* @param meterRegistry the registry.
9189
* @param tags the tags.
9290
*/
@@ -99,7 +97,6 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry, List<Tag> tags) {
9997

10098
/**
10199
* Construct an instance with the provided {@link MeterRegistry}, tags and {@link TaskScheduler}.
102-
*
103100
* @param meterRegistry the registry.
104101
* @param tags the tags.
105102
* @param taskScheduler the task scheduler.
@@ -112,7 +109,12 @@ protected KafkaMetricsSupport(MeterRegistry meterRegistry, List<Tag> tags, TaskS
112109
this.scheduler = obtainScheduledExecutorService(taskScheduler);
113110
}
114111

115-
protected void clientAdded(String id, C client) {
112+
/**
113+
* Bind metrics for the Apache Kafka client with provided id.
114+
* @param id the unique identifier for the client to manage in store.
115+
* @param client the Kafka client instance to bind.
116+
*/
117+
protected final void bindClient(String id, C client) {
116118
if (!this.metrics.containsKey(id)) {
117119
List<Tag> clientTags = new ArrayList<>(this.tags);
118120
clientTags.add(new ImmutableTag("spring.id", id));
@@ -121,6 +123,15 @@ protected void clientAdded(String id, C client) {
121123
}
122124
}
123125

126+
/**
127+
* Create a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} instance
128+
* for the provided Kafka client and metric tags.
129+
* By default, this factory is aware of {@link Consumer}, {@link Producer} and {@link AdminClient} types.
130+
* For other use-case this method can be overridden.
131+
* @param client the client to create a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} instance for.
132+
* @param tags the tags for the {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics}.
133+
* @return the {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics}.
134+
*/
124135
protected MeterBinder createClientMetrics(C client, List<Tag> tags) {
125136
if (client instanceof Consumer<?, ?> consumer) {
126137
return createConsumerMetrics(consumer, tags);
@@ -153,7 +164,12 @@ private KafkaClientMetrics createAdminMetrics(AdminClient adminClient, List<Tag>
153164
: new KafkaClientMetrics(adminClient, tags);
154165
}
155166

156-
protected void clientRemoved(String id, C client) {
167+
/**
168+
* Unbind a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} for the provided Kafka client.
169+
* @param id the unique identifier for the client to manage in store.
170+
* @param client the Kafka client instance to unbind.
171+
*/
172+
protected final void unbindClient(String id, C client) {
157173
AutoCloseable removed = (AutoCloseable) this.metrics.remove(id);
158174
if (removed != null) {
159175
try {

spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ public MicrometerConsumerListener(MeterRegistry meterRegistry, List<Tag> tags, T
8282

8383
@Override
8484
public synchronized void consumerAdded(String id, Consumer<K, V> consumer) {
85-
clientAdded(id, consumer);
85+
bindClient(id, consumer);
8686
}
8787

8888
@Override
8989
public synchronized void consumerRemoved(String id, Consumer<K, V> consumer) {
90-
clientRemoved(id, consumer);
90+
unbindClient(id, consumer);
9191
}
9292

9393
}

spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerProducerListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,12 @@ public MicrometerProducerListener(MeterRegistry meterRegistry, List<Tag> tags, T
8181

8282
@Override
8383
public synchronized void producerAdded(String id, Producer<K, V> producer) {
84-
clientAdded(id, producer);
84+
bindClient(id, producer);
8585
}
8686

8787
@Override
8888
public synchronized void producerRemoved(String id, Producer<K, V> producer) {
89-
clientRemoved(id, producer);
89+
unbindClient(id, producer);
9090
}
9191

9292
}

spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsMicrometerListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry, List<Tag> tag
8282

8383
@Override
8484
public synchronized void streamsAdded(String id, KafkaStreams kafkaStreams) {
85-
clientAdded(id, kafkaStreams);
85+
bindClient(id, kafkaStreams);
8686
}
8787

8888
@Override
@@ -94,7 +94,7 @@ protected MeterBinder createClientMetrics(KafkaStreams client, List<Tag> tags) {
9494

9595
@Override
9696
public synchronized void streamsRemoved(String id, KafkaStreams streams) {
97-
clientRemoved(id, streams);
97+
unbindClient(id, streams);
9898
}
9999

100100
}

0 commit comments

Comments
 (0)