Skip to content

Commit 1da2cfa

Browse files
committed
End spring kafka span in afterRecord callback
1 parent 58d1661 commit 1da2cfa

File tree

5 files changed

+132
-48
lines changed

5 files changed

+132
-48
lines changed

instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K,
2525

2626
private final Instrumenter<KafkaProcessRequest, Void> processInstrumenter;
2727
@Nullable private final RecordInterceptor<K, V> decorated;
28+
private static final ThreadLocal<ThreadState> threadLocalState = new ThreadLocal<>();
2829

2930
InstrumentedRecordInterceptor(
3031
Instrumenter<KafkaProcessRequest, Void> processInstrumenter,
@@ -74,7 +75,10 @@ public void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
7475
decorated.success(record, consumer);
7576
}
7677
} finally {
77-
end(record, null);
78+
// if thread state is present span is ended in afterRecord
79+
if (threadLocalState.get() == null) {
80+
end(record, null);
81+
}
7882
}
7983
}
8084

@@ -85,7 +89,13 @@ public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K
8589
decorated.failure(record, exception, consumer);
8690
}
8791
} finally {
88-
end(record, exception);
92+
// if thread state is present span is ended in afterRecord
93+
ThreadState threadState = threadLocalState.get();
94+
if (threadState == null) {
95+
end(record, exception);
96+
} else {
97+
threadState.error = exception;
98+
}
8999
}
90100
}
91101

@@ -102,6 +112,7 @@ private void end(ConsumerRecord<K, V> record, @Nullable Throwable error) {
102112
@NoMuzzle // method was added in 2.8.0
103113
@Override
104114
public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
115+
end(record, threadLocalState.get().error);
105116
if (decorated != null) {
106117
decorated.afterRecord(record, consumer);
107118
}
@@ -110,6 +121,7 @@ public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
110121
@NoMuzzle // method was added in 2.8.0
111122
@Override
112123
public void setupThreadState(Consumer<?, ?> consumer) {
124+
threadLocalState.set(new ThreadState());
113125
if (decorated != null) {
114126
decorated.setupThreadState(consumer);
115127
}
@@ -118,8 +130,14 @@ public void setupThreadState(Consumer<?, ?> consumer) {
118130
@NoMuzzle // method was added in 2.8.0
119131
@Override
120132
public void clearThreadState(Consumer<?, ?> consumer) {
133+
threadLocalState.remove();
121134
if (decorated != null) {
122135
decorated.clearThreadState(consumer);
123136
}
124137
}
138+
139+
private static class ThreadState {
140+
// used to record the error in failure() so it could be used in afterRecord()
141+
Throwable error;
142+
}
125143
}

instrumentation/spring/spring-kafka-2.7/testing/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ dependencies {
66
implementation("io.opentelemetry.javaagent:opentelemetry-testing-common")
77
implementation("org.testcontainers:testcontainers-kafka")
88

9-
compileOnly("org.springframework.kafka:spring-kafka:2.7.0")
9+
compileOnly("org.springframework.kafka:spring-kafka:2.9.0")
1010
compileOnly("org.springframework.boot:spring-boot-starter-test:2.5.3")
1111
compileOnly("org.springframework.boot:spring-boot-starter:2.5.3")
1212
}

instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java

Lines changed: 71 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@
2121

2222
import io.opentelemetry.api.trace.SpanKind;
2323
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
24+
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
2425
import io.opentelemetry.sdk.trace.data.SpanData;
2526
import io.opentelemetry.sdk.trace.data.StatusData;
2627
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
28+
import java.util.ArrayList;
2729
import java.util.Arrays;
2830
import java.util.HashMap;
2931
import java.util.List;
3032
import java.util.Map;
3133
import java.util.concurrent.atomic.AtomicReference;
34+
import java.util.function.Consumer;
3235
import org.assertj.core.api.AbstractLongAssert;
3336
import org.assertj.core.api.AbstractStringAssert;
3437
import org.junit.jupiter.api.Test;
@@ -127,51 +130,74 @@ void shouldHandleFailureInKafkaListener() {
127130

128131
testing()
129132
.waitAndAssertTraces(
130-
trace ->
131-
trace.hasSpansSatisfyingExactly(
132-
span -> span.hasName("producer"),
133-
span ->
134-
span.hasName("testSingleTopic publish")
135-
.hasKind(SpanKind.PRODUCER)
136-
.hasParent(trace.getSpan(0))
137-
.hasAttributesSatisfyingExactly(
138-
equalTo(MESSAGING_SYSTEM, "kafka"),
139-
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
140-
equalTo(MESSAGING_OPERATION, "publish"),
141-
satisfies(
142-
MESSAGING_CLIENT_ID,
143-
stringAssert -> stringAssert.startsWith("producer")),
144-
satisfies(
145-
MessagingIncubatingAttributes
146-
.MESSAGING_DESTINATION_PARTITION_ID,
147-
AbstractStringAssert::isNotEmpty),
148-
satisfies(
149-
MESSAGING_KAFKA_MESSAGE_OFFSET,
150-
AbstractLongAssert::isNotNegative),
151-
equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10")),
152-
span ->
153-
span.hasName("testSingleTopic process")
154-
.hasKind(SpanKind.CONSUMER)
155-
.hasParent(trace.getSpan(1))
156-
.hasStatus(StatusData.error())
157-
.hasException(new IllegalArgumentException("boom"))
158-
.hasAttributesSatisfyingExactly(processAttributes),
159-
span -> span.hasName("consumer").hasParent(trace.getSpan(2)),
160-
span ->
161-
span.hasName("testSingleTopic process")
162-
.hasKind(SpanKind.CONSUMER)
163-
.hasParent(trace.getSpan(1))
164-
.hasStatus(StatusData.error())
165-
.hasException(new IllegalArgumentException("boom"))
166-
.hasAttributesSatisfyingExactly(processAttributes),
167-
span -> span.hasName("consumer").hasParent(trace.getSpan(4)),
168-
span ->
169-
span.hasName("testSingleTopic process")
170-
.hasKind(SpanKind.CONSUMER)
171-
.hasParent(trace.getSpan(1))
172-
.hasStatus(StatusData.unset())
173-
.hasAttributesSatisfyingExactly(processAttributes),
174-
span -> span.hasName("consumer").hasParent(trace.getSpan(6))));
133+
trace -> {
134+
List<Consumer<SpanDataAssert>> assertions =
135+
new ArrayList<>(
136+
Arrays.asList(
137+
span -> span.hasName("producer"),
138+
span ->
139+
span.hasName("testSingleTopic publish")
140+
.hasKind(SpanKind.PRODUCER)
141+
.hasParent(trace.getSpan(0))
142+
.hasAttributesSatisfyingExactly(
143+
equalTo(MESSAGING_SYSTEM, "kafka"),
144+
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
145+
equalTo(MESSAGING_OPERATION, "publish"),
146+
satisfies(
147+
MESSAGING_CLIENT_ID,
148+
stringAssert -> stringAssert.startsWith("producer")),
149+
satisfies(
150+
MessagingIncubatingAttributes
151+
.MESSAGING_DESTINATION_PARTITION_ID,
152+
AbstractStringAssert::isNotEmpty),
153+
satisfies(
154+
MESSAGING_KAFKA_MESSAGE_OFFSET,
155+
AbstractLongAssert::isNotNegative),
156+
equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10")),
157+
span ->
158+
span.hasName("testSingleTopic process")
159+
.hasKind(SpanKind.CONSUMER)
160+
.hasParent(trace.getSpan(1))
161+
.hasStatus(StatusData.error())
162+
.hasException(new IllegalArgumentException("boom"))
163+
.hasAttributesSatisfyingExactly(processAttributes),
164+
span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
165+
if (Boolean.getBoolean("testLatestDeps")) {
166+
assertions.add(
167+
span -> span.hasName("handle exception").hasParent(trace.getSpan(2)));
168+
}
169+
assertions.addAll(
170+
Arrays.asList(
171+
span ->
172+
span.hasName("testSingleTopic process")
173+
.hasKind(SpanKind.CONSUMER)
174+
.hasParent(trace.getSpan(1))
175+
.hasStatus(StatusData.error())
176+
.hasException(new IllegalArgumentException("boom"))
177+
.hasAttributesSatisfyingExactly(processAttributes),
178+
span ->
179+
span.hasName("consumer")
180+
.hasParent(
181+
trace.getSpan(Boolean.getBoolean("testLatestDeps") ? 5 : 4))));
182+
if (Boolean.getBoolean("testLatestDeps")) {
183+
assertions.add(
184+
span -> span.hasName("handle exception").hasParent(trace.getSpan(5)));
185+
}
186+
assertions.addAll(
187+
Arrays.asList(
188+
span ->
189+
span.hasName("testSingleTopic process")
190+
.hasKind(SpanKind.CONSUMER)
191+
.hasParent(trace.getSpan(1))
192+
.hasStatus(StatusData.unset())
193+
.hasAttributesSatisfyingExactly(processAttributes),
194+
span ->
195+
span.hasName("consumer")
196+
.hasParent(
197+
trace.getSpan(Boolean.getBoolean("testLatestDeps") ? 8 : 6))));
198+
199+
trace.hasSpansSatisfyingExactly(assertions);
200+
});
175201
}
176202

177203
@Test

instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
6868
factory.setConsumerFactory(consumerFactory);
6969
factory.setBatchListener(false);
7070
factory.setAutoStartup(true);
71+
try {
72+
// available since spring 2.8
73+
Class.forName("org.springframework.kafka.listener.CommonErrorHandler");
74+
ConsumerConfigUtil.addErrorHandler(factory);
75+
} catch (ClassNotFoundException e) {
76+
// ignore
77+
}
7178
customizerProvider.ifAvailable(factory::setContainerCustomizer);
7279
return factory;
7380
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.testing;
7+
8+
import io.opentelemetry.instrumentation.testing.GlobalTraceUtil;
9+
import org.apache.kafka.clients.consumer.Consumer;
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
12+
import org.springframework.kafka.listener.CommonErrorHandler;
13+
import org.springframework.kafka.listener.MessageListenerContainer;
14+
15+
class ConsumerConfigUtil {
16+
static void addErrorHandler(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
17+
factory.setCommonErrorHandler(
18+
new CommonErrorHandler() {
19+
20+
@Override
21+
public boolean handleOne(
22+
Exception thrownException,
23+
ConsumerRecord<?, ?> record,
24+
Consumer<?, ?> consumer,
25+
MessageListenerContainer container) {
26+
GlobalTraceUtil.runWithSpan("handle exception", () -> {});
27+
return false;
28+
}
29+
});
30+
}
31+
32+
private ConsumerConfigUtil() {}
33+
}

0 commit comments

Comments
 (0)