Skip to content

Commit 9f021ff

Browse files
SylvainJugelaurit
andauthored
make kafka indy-ready (#14675)
Co-authored-by: Lauri Tulmin <[email protected]>
1 parent b7222b9 commit 9f021ff

File tree

9 files changed

+131
-61
lines changed

9 files changed

+131
-61
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Iterator;
2525
import java.util.List;
2626
import net.bytebuddy.asm.Advice;
27+
import net.bytebuddy.asm.Advice.AssignReturned;
2728
import net.bytebuddy.description.type.TypeDescription;
2829
import net.bytebuddy.matcher.ElementMatcher;
2930
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -64,60 +65,57 @@ public void transform(TypeTransformer transformer) {
6465
@SuppressWarnings("unused")
6566
public static class IterableAdvice {
6667

67-
@SuppressWarnings("unchecked")
68+
@AssignReturned.ToReturned
6869
@Advice.OnMethodExit(suppress = Throwable.class)
69-
public static <K, V> void wrap(
70+
public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
7071
@Advice.This ConsumerRecords<?, ?> records,
71-
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<K, V>> iterable) {
72+
@Advice.Return Iterable<ConsumerRecord<K, V>> iterable) {
7273

7374
// it's important not to suppress consumer span creation here because this instrumentation can
7475
// leak the context and so there may be a leaked consumer span in the context, in which
7576
// case it's important to overwrite the leaked span instead of suppressing the correct span
7677
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
7778
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
78-
iterable =
79-
TracingIterable.wrap(
80-
iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
79+
return TracingIterable.wrap(
80+
iterable, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
8181
}
8282
}
8383

8484
@SuppressWarnings("unused")
8585
public static class ListAdvice {
8686

87-
@SuppressWarnings("unchecked")
87+
@AssignReturned.ToReturned
8888
@Advice.OnMethodExit(suppress = Throwable.class)
89-
public static <K, V> void wrap(
89+
public static <K, V> List<ConsumerRecord<K, V>> wrap(
9090
@Advice.This ConsumerRecords<?, ?> records,
91-
@Advice.Return(readOnly = false) List<ConsumerRecord<K, V>> list) {
91+
@Advice.Return List<ConsumerRecord<K, V>> list) {
9292

9393
// it's important not to suppress consumer span creation here because this instrumentation can
9494
// leak the context and so there may be a leaked consumer span in the context, in which
9595
// case it's important to overwrite the leaked span instead of suppressing the correct span
9696
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
9797
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
98-
list =
99-
TracingList.wrap(
100-
list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
98+
return TracingList.wrap(
99+
list, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
101100
}
102101
}
103102

104103
@SuppressWarnings("unused")
105104
public static class IteratorAdvice {
106105

107-
@SuppressWarnings("unchecked")
106+
@AssignReturned.ToReturned
108107
@Advice.OnMethodExit(suppress = Throwable.class)
109-
public static <K, V> void wrap(
108+
public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
110109
@Advice.This ConsumerRecords<?, ?> records,
111-
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<K, V>> iterator) {
110+
@Advice.Return Iterator<ConsumerRecord<K, V>> iterator) {
112111

113112
// it's important not to suppress consumer span creation here because this instrumentation can
114113
// leak the context and so there may be a leaked consumer span in the context, in which
115114
// case it's important to overwrite the leaked span instead of suppressing the correct span
116115
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
117116
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
118-
iterator =
119-
TracingIterator.wrap(
120-
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
117+
return TracingIterator.wrap(
118+
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
121119
}
122120
}
123121
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientsInstrumentationModule.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
import com.google.auto.service.AutoService;
1111
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1212
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
1314
import java.util.List;
1415

1516
@AutoService(InstrumentationModule.class)
16-
public class KafkaClientsInstrumentationModule extends InstrumentationModule {
17+
public class KafkaClientsInstrumentationModule extends InstrumentationModule
18+
implements ExperimentalInstrumentationModule {
1719
public KafkaClientsInstrumentationModule() {
1820
super("kafka-clients", "kafka-clients-0.11", "kafka");
1921
}
@@ -25,4 +27,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
2527
new KafkaConsumerInstrumentation(),
2628
new ConsumerRecordsInstrumentation());
2729
}
30+
31+
@Override
32+
public boolean isIndyReady() {
33+
return true;
34+
}
2835
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java

Lines changed: 68 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
1919
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2020
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
21+
import javax.annotation.Nullable;
2122
import net.bytebuddy.asm.Advice;
23+
import net.bytebuddy.asm.Advice.AssignReturned;
24+
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
2225
import net.bytebuddy.description.type.TypeDescription;
2326
import net.bytebuddy.matcher.ElementMatcher;
2427
import org.apache.kafka.clients.ApiVersions;
@@ -46,48 +49,83 @@ public void transform(TypeTransformer transformer) {
4649
@SuppressWarnings("unused")
4750
public static class SendAdvice {
4851

49-
@Advice.OnMethodEnter(suppress = Throwable.class)
50-
public static KafkaProducerRequest onEnter(
51-
@Advice.FieldValue("apiVersions") ApiVersions apiVersions,
52-
@Advice.FieldValue("clientId") String clientId,
53-
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<?, ?> record,
54-
@Advice.Argument(value = 1, readOnly = false) Callback callback,
55-
@Advice.Local("otelContext") Context context,
56-
@Advice.Local("otelScope") Scope scope) {
52+
public static class AdviceScope {
53+
private final KafkaProducerRequest request;
54+
private final Context context;
55+
private final Scope scope;
56+
private final Context parentContext;
5757

58-
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
59-
Context parentContext = Java8BytecodeBridge.currentContext();
60-
if (!producerInstrumenter().shouldStart(parentContext, request)) {
61-
return null;
58+
private AdviceScope(
59+
Context parentContext, KafkaProducerRequest request, Context context, Scope scope) {
60+
this.parentContext = parentContext;
61+
this.request = request;
62+
this.context = context;
63+
this.scope = scope;
6264
}
6365

64-
context = producerInstrumenter().start(parentContext, request);
65-
scope = context.makeCurrent();
66+
@Nullable
67+
public static AdviceScope start(KafkaProducerRequest request) {
68+
Context parentContext = Java8BytecodeBridge.currentContext();
69+
if (!producerInstrumenter().shouldStart(parentContext, request)) {
70+
return null;
71+
}
72+
Context context = producerInstrumenter().start(parentContext, request);
73+
return new AdviceScope(parentContext, request, context, context.makeCurrent());
74+
}
6675

67-
if (KafkaSingletons.isProducerPropagationEnabled()
68-
&& KafkaPropagation.shouldPropagate(apiVersions)) {
69-
record = KafkaPropagation.propagateContext(context, record);
76+
public Callback wrapCallback(Callback originalCallback) {
77+
return new ProducerCallback(originalCallback, parentContext, context, request);
7078
}
7179

72-
callback = new ProducerCallback(callback, parentContext, context, request);
73-
return request;
80+
public ProducerRecord<?, ?> propagateContext(
81+
ApiVersions apiVersions, ProducerRecord<?, ?> record) {
82+
if (KafkaSingletons.isProducerPropagationEnabled()
83+
&& KafkaPropagation.shouldPropagate(apiVersions)) {
84+
return KafkaPropagation.propagateContext(context, record);
85+
}
86+
return record;
87+
}
88+
89+
public void end(@Nullable Throwable throwable) {
90+
scope.close();
91+
if (throwable != null) {
92+
producerInstrumenter().end(context, request, null, throwable);
93+
}
94+
// span finished by ProducerCallback
95+
}
96+
}
97+
98+
@AssignReturned.ToArguments({
99+
@ToArgument(value = 0, index = 1),
100+
@ToArgument(value = 1, index = 2)
101+
})
102+
@Advice.OnMethodEnter(suppress = Throwable.class)
103+
public static Object[] onEnter(
104+
@Advice.FieldValue("apiVersions") ApiVersions apiVersions,
105+
@Advice.FieldValue("clientId") String clientId,
106+
@Advice.Argument(0) ProducerRecord<?, ?> originalRecord,
107+
@Advice.Argument(1) Callback originalCallback) {
108+
ProducerRecord<?, ?> record = originalRecord;
109+
Callback callback = originalCallback;
110+
111+
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
112+
AdviceScope adviceScope = AdviceScope.start(request);
113+
if (adviceScope == null) {
114+
return new Object[] {null, record, callback};
115+
}
116+
record = adviceScope.propagateContext(apiVersions, record);
117+
callback = adviceScope.wrapCallback(callback);
118+
return new Object[] {adviceScope, record, callback};
74119
}
75120

76121
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
77122
public static void stopSpan(
78-
@Advice.Enter KafkaProducerRequest request,
79-
@Advice.Thrown Throwable throwable,
80-
@Advice.Local("otelContext") Context context,
81-
@Advice.Local("otelScope") Scope scope) {
82-
if (scope == null) {
83-
return;
84-
}
85-
scope.close();
123+
@Advice.Thrown @Nullable Throwable throwable, @Advice.Enter Object[] enterResult) {
86124

87-
if (throwable != null) {
88-
producerInstrumenter().end(context, request, null, throwable);
125+
AdviceScope adviceScope = (AdviceScope) enterResult[0];
126+
if (adviceScope != null) {
127+
adviceScope.end(throwable);
89128
}
90-
// span finished by ProducerCallback
91129
}
92130
}
93131
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsConsumerInstrumentation.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.Map;
1717
import java.util.Properties;
1818
import net.bytebuddy.asm.Advice;
19+
import net.bytebuddy.asm.Advice.AssignReturned;
20+
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
1921
import net.bytebuddy.description.type.TypeDescription;
2022
import net.bytebuddy.matcher.ElementMatcher;
2123

@@ -39,9 +41,10 @@ public void transform(TypeTransformer transformer) {
3941
@SuppressWarnings("unused")
4042
public static class ConstructorMapAdvice {
4143

44+
@AssignReturned.ToArguments(@ToArgument(0))
4245
@Advice.OnMethodEnter(suppress = Throwable.class)
43-
public static void onEnter(
44-
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
46+
public static Map<String, Object> onEnter(
47+
@Advice.Argument(0) Map<String, Object> originalConfig) {
4548

4649
// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
4750
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
@@ -59,8 +62,9 @@ public static void onEnter(
5962
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538
6063

6164
// ensure config is a mutable map and avoid concurrency conflicts
62-
config = new HashMap<>(config);
65+
Map<String, Object> config = new HashMap<>(originalConfig);
6366
enhanceConfig(config);
67+
return config;
6468
}
6569
}
6670

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsInstrumentationModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
4040
return asList(
4141
new KafkaMetricsProducerInstrumentation(), new KafkaMetricsConsumerInstrumentation());
4242
}
43+
44+
@Override
45+
public boolean isIndyReady() {
46+
return true;
47+
}
4348
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/metrics/KafkaMetricsProducerInstrumentation.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.Map;
1717
import java.util.Properties;
1818
import net.bytebuddy.asm.Advice;
19+
import net.bytebuddy.asm.Advice.AssignReturned;
20+
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
1921
import net.bytebuddy.description.type.TypeDescription;
2022
import net.bytebuddy.matcher.ElementMatcher;
2123

@@ -39,9 +41,11 @@ public void transform(TypeTransformer transformer) {
3941
@SuppressWarnings("unused")
4042
public static class ConstructorMapAdvice {
4143

44+
@AssignReturned.ToArguments(@ToArgument(0))
4245
@Advice.OnMethodEnter(suppress = Throwable.class)
43-
public static void onEnter(
44-
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
46+
public static Map<String, Object> onEnter(
47+
@Advice.Argument(0) Map<String, Object> originalConfig) {
48+
Map<String, Object> config = originalConfig;
4549

4650
// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
4751
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
@@ -61,6 +65,7 @@ public static void onEnter(
6165
// ensure config is a mutable map and avoid concurrency conflicts
6266
config = new HashMap<>(config);
6367
enhanceConfig(config);
68+
return config;
6469
}
6570
}
6671

instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsInstrumentationModule.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
import com.google.auto.service.AutoService;
1111
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1212
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
1314
import java.util.List;
1415

1516
@AutoService(InstrumentationModule.class)
16-
public class KafkaStreamsInstrumentationModule extends InstrumentationModule {
17+
public class KafkaStreamsInstrumentationModule extends InstrumentationModule
18+
implements ExperimentalInstrumentationModule {
1719
public KafkaStreamsInstrumentationModule() {
1820
super("kafka-streams", "kafka-streams-0.11", "kafka");
1921
}
@@ -27,4 +29,9 @@ public List<TypeInstrumentation> typeInstrumentations() {
2729
new StreamTaskInstrumentation(),
2830
new StreamThreadInstrumentation());
2931
}
32+
33+
@Override
34+
public boolean isIndyReady() {
35+
return true;
36+
}
3037
}

instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1818
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1919
import net.bytebuddy.asm.Advice;
20+
import net.bytebuddy.asm.Advice.AssignReturned;
2021
import net.bytebuddy.description.type.TypeDescription;
2122
import net.bytebuddy.matcher.ElementMatcher;
2223
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -45,12 +46,13 @@ public void transform(TypeTransformer transformer) {
4546
@SuppressWarnings("unused")
4647
public static class DeserializeAdvice {
4748

49+
@AssignReturned.ToReturned
4850
@Advice.OnMethodExit(suppress = Throwable.class)
49-
public static void onExit(
51+
public static ConsumerRecord<?, ?> onExit(
5052
@Advice.Argument(1) ConsumerRecord<?, ?> incoming,
51-
@Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) {
53+
@Advice.Return ConsumerRecord<?, ?> result) {
5254
if (result == null) {
53-
return;
55+
return null;
5456
}
5557

5658
// on 1.x we need to copy headers from incoming to result
@@ -62,6 +64,7 @@ public static void onExit(
6264

6365
// copy the receive CONSUMER span association
6466
KafkaConsumerContextUtil.set(result, KafkaConsumerContextUtil.get(incoming));
67+
return result;
6568
}
6669
}
6770
}

instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1616
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1717
import net.bytebuddy.asm.Advice;
18+
import net.bytebuddy.asm.Advice.AssignReturned;
1819
import net.bytebuddy.description.type.TypeDescription;
1920
import net.bytebuddy.matcher.ElementMatcher;
2021
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -42,12 +43,13 @@ public void transform(TypeTransformer transformer) {
4243
@SuppressWarnings("unused")
4344
public static class SaveHeadersAdvice {
4445

46+
@AssignReturned.ToReturned
4547
@Advice.OnMethodExit(suppress = Throwable.class)
46-
public static void saveHeaders(
48+
public static ConsumerRecord<?, ?> saveHeaders(
4749
@Advice.Argument(0) ConsumerRecord<?, ?> incoming,
48-
@Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) {
50+
@Advice.Return ConsumerRecord<?, ?> result) {
4951
if (result == null) {
50-
return;
52+
return null;
5153
}
5254

5355
// copy headers from incoming to result
@@ -57,6 +59,7 @@ public static void saveHeaders(
5759

5860
// copy the receive CONSUMER span association
5961
KafkaConsumerContextUtil.set(result, KafkaConsumerContextUtil.get(incoming));
62+
return result;
6063
}
6164
}
6265
}

0 commit comments

Comments
 (0)