Skip to content

Commit 2dc9cb9

Browse files
committed
kafka
1 parent 3705e33 commit 2dc9cb9

File tree

3 files changed

+55
-27
lines changed

3 files changed

+55
-27
lines changed

instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/AbstractMessageListenerContainerInstrumentation.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1515
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1616
import net.bytebuddy.asm.Advice;
17+
import net.bytebuddy.asm.Advice.AssignReturned;
1718
import net.bytebuddy.description.type.TypeDescription;
1819
import net.bytebuddy.matcher.ElementMatcher;
1920
import org.springframework.kafka.listener.RecordInterceptor;
@@ -43,9 +44,11 @@ public void transform(TypeTransformer transformer) {
4344
@SuppressWarnings("unused")
4445
public static class GetRecordInterceptorAdvice {
4546

47+
@AssignReturned.ToReturned
4648
@Advice.OnMethodExit(suppress = Throwable.class)
47-
public static <K, V> void onExit(
48-
@Advice.Return(readOnly = false) RecordInterceptor<K, V> interceptor) {
49+
public static <K, V> RecordInterceptor<K, V> onExit(
50+
@Advice.Return RecordInterceptor<K, V> originalInterceptor) {
51+
RecordInterceptor<K, V> interceptor = originalInterceptor;
4952

5053
if (interceptor == null
5154
|| !interceptor
@@ -55,6 +58,7 @@ public static <K, V> void onExit(
5558
"io.opentelemetry.instrumentation.spring.kafka.v2_7.InstrumentedRecordInterceptor")) {
5659
interceptor = telemetry().createRecordInterceptor(interceptor);
5760
}
61+
return interceptor;
5862
}
5963
}
6064
}

instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/ListenerConsumerInstrumentation.java

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.opentelemetry.javaagent.bootstrap.spring.SpringSchedulingTaskTracing;
2020
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2121
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
22+
import javax.annotation.Nullable;
2223
import net.bytebuddy.asm.Advice;
2324
import net.bytebuddy.description.type.TypeDescription;
2425
import net.bytebuddy.matcher.ElementMatcher;
@@ -77,37 +78,53 @@ public static void onExit(@Advice.Enter boolean previousValue) {
7778
@SuppressWarnings("unused")
7879
public static class InvokeBatchAdvice {
7980

81+
public static class AdviceScope {
82+
public final KafkaReceiveRequest request;
83+
public final Context context;
84+
public final Scope scope;
85+
86+
private AdviceScope(KafkaReceiveRequest request, Context context, Scope scope) {
87+
this.request = request;
88+
this.context = context;
89+
this.scope = scope;
90+
}
91+
92+
@Nullable
93+
public static AdviceScope start(ConsumerRecords<?, ?> records, Consumer<?, ?> consumer) {
94+
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
95+
Context receiveContext = consumerContext.getContext();
96+
97+
// use the receive CONSUMER span as parent if it's available
98+
Context parentContext = receiveContext != null ? receiveContext : Context.current();
99+
KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumer);
100+
101+
if (!batchProcessInstrumenter().shouldStart(parentContext, request)) {
102+
return null;
103+
}
104+
Context context = batchProcessInstrumenter().start(parentContext, request);
105+
return new AdviceScope(request, context, context.makeCurrent());
106+
}
107+
108+
public void close(@Nullable Throwable throwable) {
109+
scope.close();
110+
batchProcessInstrumenter().end(context, request, null, throwable);
111+
}
112+
}
113+
80114
@Advice.OnMethodEnter(suppress = Throwable.class)
81-
public static void onEnter(
115+
public static AdviceScope onEnter(
82116
@Advice.Argument(0) ConsumerRecords<?, ?> records,
83-
@Advice.FieldValue("consumer") Consumer<?, ?> consumer,
84-
@Advice.Local("otelRequest") KafkaReceiveRequest request,
85-
@Advice.Local("otelContext") Context context,
86-
@Advice.Local("otelScope") Scope scope) {
87-
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
88-
Context receiveContext = consumerContext.getContext();
89-
90-
// use the receive CONSUMER span as parent if it's available
91-
Context parentContext = receiveContext != null ? receiveContext : Context.current();
92-
93-
request = KafkaReceiveRequest.create(records, consumer);
94-
if (batchProcessInstrumenter().shouldStart(parentContext, request)) {
95-
context = batchProcessInstrumenter().start(parentContext, request);
96-
scope = context.makeCurrent();
97-
}
117+
@Advice.FieldValue("consumer") Consumer<?, ?> consumer) {
118+
return AdviceScope.start(records, consumer);
98119
}
99120

100121
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
101122
public static void onExit(
102-
@Advice.Thrown Throwable throwable,
103-
@Advice.Local("otelRequest") KafkaReceiveRequest request,
104-
@Advice.Local("otelContext") Context context,
105-
@Advice.Local("otelScope") Scope scope) {
106-
if (scope == null) {
107-
return;
123+
@Advice.Thrown @Nullable Throwable throwable,
124+
@Advice.Enter @Nullable AdviceScope adviceScope) {
125+
if (adviceScope != null) {
126+
adviceScope.close(throwable);
108127
}
109-
scope.close();
110-
batchProcessInstrumenter().end(context, request, null, throwable);
111128
}
112129
}
113130
}

instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaInstrumentationModule.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
import com.google.auto.service.AutoService;
1111
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1212
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
1314
import java.util.List;
1415

1516
@AutoService(InstrumentationModule.class)
16-
public class SpringKafkaInstrumentationModule extends InstrumentationModule {
17+
public class SpringKafkaInstrumentationModule extends InstrumentationModule
18+
implements ExperimentalInstrumentationModule {
1719
public SpringKafkaInstrumentationModule() {
1820
super("spring-kafka", "spring-kafka-2.7");
1921
}
@@ -24,4 +26,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
2426
new AbstractMessageListenerContainerInstrumentation(),
2527
new ListenerConsumerInstrumentation());
2628
}
29+
30+
@Override
31+
public boolean isIndyReady() {
32+
return true;
33+
}
2734
}

0 commit comments

Comments
 (0)