Skip to content

Commit 42d91e0

Browse files
authored
GH-2650: Observability enhancements in reactive Kafka binder
Fixes #2650 * Enable native observability support for output binding in the reactive Kafka binder * Adding test to verify this support with downstream consumers * Adding ref docs * Addressing PR review
1 parent 058fc66 commit 42d91e0

File tree

8 files changed

+333
-15
lines changed

8 files changed

+333
-15
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/pom.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,39 @@
8484
<artifactId>awaitility</artifactId>
8585
<scope>test</scope>
8686
</dependency>
87+
<dependency>
88+
<groupId>io.micrometer</groupId>
89+
<artifactId>context-propagation</artifactId>
90+
<scope>optional</scope>
91+
</dependency>
92+
93+
<dependency>
94+
<groupId>io.micrometer</groupId>
95+
<artifactId>micrometer-tracing-integration-test</artifactId>
96+
<scope>test</scope>
97+
<exclusions>
98+
<exclusion>
99+
<groupId>io.opentelemetry</groupId>
100+
<artifactId>*</artifactId>
101+
</exclusion>
102+
<exclusion>
103+
<groupId>com.wavefront</groupId>
104+
<artifactId>*</artifactId>
105+
</exclusion>
106+
<exclusion>
107+
<groupId>io.zipkin.reporter2</groupId>
108+
<artifactId>*</artifactId>
109+
</exclusion>
110+
<exclusion>
111+
<groupId>io.micrometer</groupId>
112+
<artifactId>micrometer-tracing-bridge-otel</artifactId>
113+
</exclusion>
114+
<exclusion>
115+
<groupId>io.micrometer</groupId>
116+
<artifactId>micrometer-tracing-reporter-wavefront</artifactId>
117+
</exclusion>
118+
</exclusions>
119+
</dependency>
87120
</dependencies>
88121

89122
</project>

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinder.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.regex.Pattern;
2727

28+
import io.micrometer.observation.ObservationRegistry;
2829
import org.apache.commons.logging.Log;
2930
import org.apache.commons.logging.LogFactory;
3031
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -84,6 +85,7 @@
8485
* @author Gary Russell
8586
* @author Byungjun You
8687
* @author Omer Celik
88+
* @author Soby Chacko
8789
* @since 4.0
8890
*
8991
*/
@@ -111,11 +113,14 @@ public class ReactorKafkaBinder
111113

112114
private final Map<String, MessageProducerSupport> messageProducers = new ConcurrentHashMap<>();
113115

116+
private final ObservationRegistry observationRegistry;
117+
114118
public ReactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
115-
KafkaTopicProvisioner provisioner) {
119+
KafkaTopicProvisioner provisioner, @Nullable ObservationRegistry observationRegistry) {
116120

117121
super(new String[0], provisioner, null, null);
118122
this.configurationProperties = configurationProperties;
123+
this.observationRegistry = observationRegistry;
119124
}
120125

121126
public void setConsumerConfigCustomizer(ConsumerConfigCustomizer consumerConfigCustomizer) {
@@ -194,6 +199,9 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin
194199

195200
SenderOptions<Object, Object> opts = this.senderOptionsCustomizer.apply(producerProperties.getBindingName(),
196201
SenderOptions.create(configs));
202+
if (this.configurationProperties.isEnableObservation() && this.observationRegistry != null) {
203+
opts = opts.withObservation(this.observationRegistry);
204+
}
197205
// TODO bean for converter; MCB doesn't use one on the producer side.
198206
RecordMessageConverter converter = new MessagingMessageConverter();
199207
AbstractApplicationContext applicationContext = getApplicationContext();
@@ -405,7 +413,7 @@ protected void handleMessageInternal(Message<?> message) {
405413
@SuppressWarnings("unchecked")
406414
SenderRecord<Object, Object, Object> sr = SenderRecord.create(
407415
(ProducerRecord<Object, Object>) converter.fromMessage(message, topic), correlation);
408-
Flux<SenderResult<Object>> result = sender.send(Flux.just(sr));
416+
Flux<SenderResult<Object>> result = sender.send(Flux.just(sr)).contextCapture();
409417
result.subscribe(res -> {
410418
if (this.results != null) {
411419
this.results.send(MessageBuilder.withPayload(res)

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.cloud.stream.binder.reactorkafka;
1818

19+
import io.micrometer.observation.ObservationRegistry;
20+
1921
import org.springframework.beans.factory.ObjectProvider;
2022
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2123
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@@ -37,6 +39,7 @@
3739
*
3840
* @author Gary Russell
3941
* @author Chris Bono
42+
* @author Soby Chacko
4043
*/
4144
@Configuration(proxyBeanMethods = false)
4245
@ConditionalOnMissingBean(Binder.class)
@@ -73,13 +76,15 @@ KafkaTopicProvisioner provisioningProvider(
7376

7477
@Bean
7578
ReactorKafkaBinder reactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
76-
KafkaTopicProvisioner provisioningProvider,
77-
KafkaExtendedBindingProperties extendedBindingProperties,
78-
ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
79-
ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
80-
ObjectProvider<ReceiverOptionsCustomizer> receiverOptionsCustomizers,
81-
ObjectProvider<SenderOptionsCustomizer> senderOptionsptionsCustomizers) {
82-
ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider);
79+
KafkaTopicProvisioner provisioningProvider,
80+
KafkaExtendedBindingProperties extendedBindingProperties,
81+
ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
82+
ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
83+
ObjectProvider<ReceiverOptionsCustomizer> receiverOptionsCustomizers,
84+
ObjectProvider<SenderOptionsCustomizer> senderOptionsptionsCustomizers,
85+
ObjectProvider<ObservationRegistry> observationRegistryObjectProvider) {
86+
ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider,
87+
observationRegistryObjectProvider.getIfUnique());
8388
reactorKafkaBinder.setExtendedBindingProperties(extendedBindingProperties);
8489
reactorKafkaBinder.setConsumerConfigCustomizer(consumerConfigCustomizer.getIfUnique());
8590
reactorKafkaBinder.setProducerConfigCustomizer(producerConfigCustomizer.getIfUnique());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright 2022-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.reactorkafka;
18+
19+
import java.lang.reflect.Type;
20+
import java.time.Duration;
21+
import java.util.function.Function;
22+
import java.util.stream.Collectors;
23+
24+
import brave.handler.SpanHandler;
25+
import brave.test.TestSpanHandler;
26+
import io.micrometer.observation.Observation;
27+
import io.micrometer.observation.ObservationRegistry;
28+
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
29+
import io.micrometer.tracing.brave.bridge.BraveFinishedSpan;
30+
import io.micrometer.tracing.test.simple.SpansAssert;
31+
import org.apache.kafka.clients.consumer.ConsumerRecord;
32+
import org.apache.kafka.clients.producer.ProducerRecord;
33+
import org.junit.jupiter.api.Test;
34+
import reactor.core.publisher.Flux;
35+
import reactor.core.publisher.Mono;
36+
import reactor.kafka.receiver.ReceiverRecord;
37+
import reactor.kafka.receiver.observation.KafkaReceiverObservation;
38+
import reactor.kafka.receiver.observation.KafkaRecordReceiverContext;
39+
40+
import org.springframework.beans.factory.annotation.Autowired;
41+
import org.springframework.boot.SpringBootConfiguration;
42+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
43+
import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability;
44+
import org.springframework.boot.test.context.SpringBootTest;
45+
import org.springframework.cloud.stream.function.StreamBridge;
46+
import org.springframework.context.annotation.Bean;
47+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
48+
import org.springframework.integration.support.MessageBuilder;
49+
import org.springframework.kafka.support.Acknowledgment;
50+
import org.springframework.kafka.support.converter.MessagingMessageConverter;
51+
import org.springframework.kafka.support.converter.RecordMessageConverter;
52+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
53+
import org.springframework.kafka.test.context.EmbeddedKafka;
54+
import org.springframework.messaging.Message;
55+
import org.springframework.test.annotation.DirtiesContext;
56+
57+
import static org.assertj.core.api.Assertions.assertThat;
58+
import static org.awaitility.Awaitility.await;
59+
60+
/**
61+
* @author Artem Bilan
62+
* @author Soby Chacko
63+
* @since 4.2.0
64+
*/
65+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
66+
"spring.kafka.consumer.metadata.max.age.ms=1000",
67+
"spring.cloud.function.definition=receive",
68+
"spring.cloud.stream.function.reactive.uppercase=true",
69+
"spring.cloud.stream.bindings.receive-in-0.group=rkbot-in-group",
70+
"spring.cloud.stream.bindings.receive-in-0.destination=rkbot-in-topic",
71+
"spring.cloud.stream.bindings.receive-out-0.destination=rkbot-out-topic",
72+
"spring.cloud.stream.kafka.binder.enable-observation=true",
73+
"spring.cloud.stream.kafka.binder.brokers=${spring.kafka.bootstrap-servers}",
74+
"management.tracing.sampling.probability=1",
75+
"spring.cloud.stream.kafka.bindings.receive-in-0.consumer.converterBeanName=fullRR"
76+
})
77+
@DirtiesContext
78+
@AutoConfigureObservability
79+
@EmbeddedKafka(topics = { "rkbot-out-topic" })
80+
public class ReactorKafkaBinderObservationTests {
81+
82+
private static final TestSpanHandler SPANS = new TestSpanHandler();
83+
84+
@Autowired
85+
StreamBridge streamBridge;
86+
87+
@Autowired
88+
ObservationRegistry observationRegistry;
89+
90+
@Autowired
91+
TestConfiguration testConfiguration;
92+
93+
@Autowired
94+
private EmbeddedKafkaBroker embeddedKafka;
95+
96+
@Test
97+
void endToEndReactorKafkaBinder1() {
98+
99+
streamBridge.send("rkbot-in-topic", MessageBuilder.withPayload("data")
100+
.build());
101+
102+
await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(SPANS.spans()).hasSize(3));
103+
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
104+
.haveSameTraceId();
105+
}
106+
107+
@SpringBootConfiguration
108+
@EnableAutoConfiguration(exclude = org.springframework.cloud.function.observability.ObservationAutoConfiguration.class)
109+
public static class TestConfiguration {
110+
111+
@Bean
112+
SpanHandler testSpanHandler() {
113+
return SPANS;
114+
}
115+
116+
@Bean
117+
RecordMessageConverter fullRR() {
118+
return new RecordMessageConverter() {
119+
120+
private final RecordMessageConverter converter = new MessagingMessageConverter();
121+
122+
@Override
123+
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
124+
org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Type payloadType) {
125+
126+
return MessageBuilder.withPayload(record).build();
127+
}
128+
129+
@Override
130+
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
131+
return this.converter.fromMessage(message, defaultTopic);
132+
}
133+
134+
};
135+
}
136+
137+
@Bean
138+
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
139+
return s -> s
140+
.flatMap(record -> {
141+
Observation receiverObservation =
142+
KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
143+
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
144+
() ->
145+
new KafkaRecordReceiverContext(
146+
record, "user.receiver", "localhost:9092"),
147+
observationRegistry);
148+
149+
return Mono.deferContextual(contextView -> Mono.just(record)
150+
.map(rec -> new String(rec.value()).toLowerCase())
151+
.map(rec -> MessageBuilder.withPayload(rec).setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView).build()))
152+
.doOnTerminate(receiverObservation::stop)
153+
.doOnError(receiverObservation::error)
154+
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
155+
});
156+
}
157+
}
158+
159+
}
160+

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ void consumerBinding() throws Exception {
8383
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
8484
});
8585
provisioner.setMetadataRetryOperations(new RetryTemplate());
86-
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
86+
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
8787
binder.setApplicationContext(mock(GenericApplicationContext.class));
8888

8989
CountDownLatch latch = new CountDownLatch(2);
@@ -148,7 +148,7 @@ void concurrency(String topic, String group, boolean atMostOnce) throws Exceptio
148148
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
149149
});
150150
provisioner.setMetadataRetryOperations(new RetryTemplate());
151-
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
151+
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
152152
binder.setApplicationContext(mock(GenericApplicationContext.class));
153153

154154
CountDownLatch subscriptionLatch = new CountDownLatch(1);
@@ -229,7 +229,7 @@ void autoCommit() throws Exception {
229229
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
230230
});
231231
provisioner.setMetadataRetryOperations(new RetryTemplate());
232-
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
232+
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
233233
binder.setApplicationContext(mock(GenericApplicationContext.class));
234234

235235
CountDownLatch subscriptionLatch = new CountDownLatch(1);
@@ -298,7 +298,7 @@ void producerBinding() throws InterruptedException {
298298
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
299299
});
300300
provisioner.setMetadataRetryOperations(new RetryTemplate());
301-
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
301+
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
302302
CountDownLatch latch = new CountDownLatch(1);
303303
GenericApplicationContext context = new GenericApplicationContext();
304304
context.registerBean("sendResults", FluxMessageChannel.class);

0 commit comments

Comments
 (0)