Skip to content

Commit b53b6dc

Browse files
authored
Support Kafka-clients 3.8+ (#7626)
1 parent a80d13e commit b53b6dc

File tree

43 files changed

+3683
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+3683
-1
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
ext {
2+
minJavaVersionForTests = JavaVersion.VERSION_17
3+
}
4+
muzzle {
5+
pass {
6+
group = "org.apache.kafka"
7+
module = "kafka-clients"
8+
versions = "[3.8.0,)"
9+
assertInverse = false
10+
}
11+
}
12+
13+
apply from: "$rootDir/gradle/java.gradle"
14+
15+
addTestSuite('latestDepTest')
16+
17+
//java {
18+
// toolchain {
19+
// languageVersion.set(JavaLanguageVersion.of(17))
20+
// }
21+
//}
22+
23+
24+
//project.afterEvaluate {
25+
// tasks.withType(Test).configureEach {
26+
// if (javaLauncher.get().metadata.languageVersion.asInt() >= 16) {
27+
// jvmArgs += ['--add-opens', 'java.base/java.util=ALL-UNNAMED']
28+
// }
29+
// }
30+
//}
31+
[compileMain_java17Java, compileTestJava, compileLatestDepTestJava].each {
32+
it.configure {
33+
setJavaVersion(it, 17)
34+
sourceCompatibility = JavaVersion.VERSION_1_8
35+
targetCompatibility = JavaVersion.VERSION_1_8
36+
}
37+
}
38+
tasks.withType(JavaCompile).each {
39+
it.configure {
40+
setJavaVersion(it, 17)
41+
sourceCompatibility = JavaVersion.VERSION_1_8
42+
targetCompatibility = JavaVersion.VERSION_1_8
43+
}
44+
}
45+
tasks.withType(GroovyCompile) {
46+
javaLauncher = getJavaLauncherFor(17)
47+
}
48+
49+
dependencies {
50+
// compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
51+
main_java17CompileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
52+
implementation project(':dd-java-agent:instrumentation:kafka-common')
53+
main_java17Implementation project(':dd-java-agent:instrumentation:kafka-common')
54+
55+
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
56+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.1.0'
57+
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.1.0'
58+
testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.17.0'
59+
testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3'
60+
testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+'
61+
testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0'
62+
testRuntimeOnly project(':dd-java-agent:instrumentation:spring-scheduling-3.1')
63+
64+
65+
// Include latest version of kafka itself along with latest version of client libs.
66+
// This seems to help with jar compatibility hell.
67+
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.+'
68+
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.+'
69+
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.+'
70+
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.+'
71+
latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+'
72+
latestDepTestImplementation libs.guava
73+
74+
}
75+
76+
configurations.testRuntimeClasspath {
77+
// spock-core depends on assertj version that is not compatible with kafka-clients
78+
resolutionStrategy.force 'org.assertj:assertj-core:2.9.1'
79+
}
80+
81+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package datadog.trace.instrumentation.kafka_clients38;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.*;
5+
6+
import com.google.auto.service.AutoService;
7+
import datadog.trace.agent.tooling.Instrumenter;
8+
import datadog.trace.agent.tooling.InstrumenterModule;
9+
import datadog.trace.api.Config;
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
13+
@AutoService(InstrumenterModule.class)
14+
public final class ConsumerCoordinatorInstrumentation extends InstrumenterModule.Tracing
15+
implements Instrumenter.ForSingleType {
16+
17+
public ConsumerCoordinatorInstrumentation() {
18+
super("kafka");
19+
}
20+
21+
@Override
22+
public boolean isEnabled() {
23+
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
24+
}
25+
26+
@Override
27+
public Map<String, String> contextStore() {
28+
Map<String, String> contextStores = new HashMap<>(2);
29+
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
30+
contextStores.put(
31+
"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
32+
KafkaConsumerInfo.class.getName());
33+
return contextStores;
34+
}
35+
36+
@Override
37+
public String instrumentedType() {
38+
return "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator";
39+
}
40+
41+
@Override
42+
public String[] helperClassNames() {
43+
return new String[] {packageName + ".KafkaConsumerInfo"};
44+
}
45+
46+
@Override
47+
public void methodAdvice(MethodTransformer transformer) {
48+
transformer.applyAdvice(
49+
isMethod().and(named("sendOffsetCommitRequest")).and(takesArguments(1)),
50+
packageName + ".ConsumerCoordinatorAdvice");
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package datadog.trace.instrumentation.kafka_clients38;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
5+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
6+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
7+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
8+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
9+
import static net.bytebuddy.matcher.ElementMatchers.returns;
10+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
11+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
12+
13+
import com.google.auto.service.AutoService;
14+
import datadog.trace.agent.tooling.Instrumenter;
15+
import datadog.trace.agent.tooling.InstrumenterModule;
16+
import datadog.trace.api.Config;
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
import net.bytebuddy.description.type.TypeDescription;
20+
import net.bytebuddy.matcher.ElementMatcher;
21+
22+
/**
23+
* This instrumentation saves additional information from the KafkaConsumer, such as consumer group
24+
* and cluster ID, in the context store for later use.
25+
*/
26+
@AutoService(InstrumenterModule.class)
27+
public final class KafkaConsumerInfoInstrumentation extends InstrumenterModule.Tracing
28+
implements Instrumenter.ForTypeHierarchy {
29+
30+
public KafkaConsumerInfoInstrumentation() {
31+
super("kafka");
32+
}
33+
34+
@Override
35+
public boolean isEnabled() {
36+
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
37+
}
38+
39+
@Override
40+
public Map<String, String> contextStore() {
41+
Map<String, String> contextStores = new HashMap<>(4);
42+
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
43+
contextStores.put(
44+
"org.apache.kafka.clients.consumer.ConsumerRecords", KafkaConsumerInfo.class.getName());
45+
// new- here we are storing the callbackinvoker and consumerdelegate in the context store
46+
// as opposed to the old consumercoordinator and kafkaconsumer
47+
contextStores.put(
48+
"org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker",
49+
KafkaConsumerInfo.class.getName());
50+
contextStores.put(
51+
"org.apache.kafka.clients.consumer.internals.ConsumerDelegate",
52+
KafkaConsumerInfo.class.getName());
53+
return contextStores;
54+
}
55+
56+
@Override
57+
public String hierarchyMarkerType() {
58+
return "org.apache.kafka.clients.consumer.internals.ConsumerDelegate";
59+
}
60+
61+
@Override
62+
public ElementMatcher<TypeDescription> hierarchyMatcher() {
63+
return implementsInterface(named(hierarchyMarkerType()))
64+
.and(declaresField(named("offsetCommitCallbackInvoker")));
65+
}
66+
67+
@Override
68+
public String[] helperClassNames() {
69+
return new String[] {
70+
packageName + ".KafkaDecorator", packageName + ".KafkaConsumerInfo",
71+
};
72+
}
73+
74+
@Override
75+
public void methodAdvice(MethodTransformer transformer) {
76+
transformer.applyAdvice(
77+
isConstructor()
78+
.and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerConfig")))
79+
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer")))
80+
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))),
81+
packageName + ".ConstructorAdvice");
82+
transformer.applyAdvice(
83+
isMethod()
84+
.and(isPublic())
85+
.and(named("poll"))
86+
.and(takesArguments(1))
87+
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
88+
packageName + ".RecordsAdvice");
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package datadog.trace.instrumentation.kafka_clients38;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
6+
import static net.bytebuddy.matcher.ElementMatchers.returns;
7+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
8+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
9+
10+
import com.google.auto.service.AutoService;
11+
import datadog.trace.agent.tooling.Instrumenter;
12+
import datadog.trace.agent.tooling.InstrumenterModule;
13+
import datadog.trace.api.Config;
14+
import java.util.Collections;
15+
import java.util.HashMap;
16+
import java.util.Iterator;
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
@AutoService(InstrumenterModule.class)
21+
public final class KafkaConsumerInstrumentation extends InstrumenterModule.Tracing
22+
implements Instrumenter.ForSingleType {
23+
24+
public KafkaConsumerInstrumentation() {
25+
super("kafka");
26+
}
27+
28+
@Override
29+
public boolean isEnabled() {
30+
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
31+
}
32+
33+
@Override
34+
public Map<String, String> contextStore() {
35+
Map<String, String> contextStores = new HashMap<>(2);
36+
contextStores.put("org.apache.kafka.clients.Metadata", "java.lang.String");
37+
contextStores.put(
38+
"org.apache.kafka.clients.consumer.ConsumerRecords",
39+
"datadog.trace.instrumentation.kafka_clients38.KafkaConsumerInfo");
40+
return Collections.unmodifiableMap(contextStores);
41+
}
42+
43+
@Override
44+
public String instrumentedType() {
45+
return "org.apache.kafka.clients.consumer.ConsumerRecords";
46+
}
47+
48+
@Override
49+
public String[] helperClassNames() {
50+
return new String[] {
51+
packageName + ".TextMapInjectAdapterInterface",
52+
packageName + ".KafkaConsumerInfo",
53+
packageName + ".KafkaConsumerInstrumentationHelper",
54+
packageName + ".KafkaDecorator",
55+
packageName + ".TextMapExtractAdapter",
56+
packageName + ".TracingIterableDelegator",
57+
packageName + ".TracingIterable",
58+
packageName + ".TracingIterator",
59+
packageName + ".TracingList",
60+
packageName + ".TracingListIterator",
61+
packageName + ".TextMapInjectAdapter",
62+
"datadog.trace.instrumentation.kafka_common.Utils",
63+
"datadog.trace.instrumentation.kafka_common.StreamingContext",
64+
};
65+
}
66+
67+
@Override
68+
public void methodAdvice(MethodTransformer transformer) {
69+
transformer.applyAdvice(
70+
isMethod()
71+
.and(isPublic())
72+
.and(named("records"))
73+
.and(takesArgument(0, String.class))
74+
.and(returns(Iterable.class)),
75+
packageName + ".IterableAdvice");
76+
transformer.applyAdvice(
77+
isMethod()
78+
.and(isPublic())
79+
.and(named("records"))
80+
.and(takesArgument(0, named("org.apache.kafka.common.TopicPartition")))
81+
.and(returns(List.class)),
82+
packageName + ".ListAdvice");
83+
transformer.applyAdvice(
84+
isMethod()
85+
.and(isPublic())
86+
.and(named("iterator"))
87+
.and(takesArguments(0))
88+
.and(returns(Iterator.class)),
89+
packageName + ".IteratorAdvice");
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package datadog.trace.instrumentation.kafka_clients38;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static java.util.Collections.singletonMap;
5+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
6+
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
7+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
8+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
9+
10+
import com.google.auto.service.AutoService;
11+
import datadog.trace.agent.tooling.Instrumenter;
12+
import datadog.trace.agent.tooling.InstrumenterModule;
13+
import datadog.trace.api.Config;
14+
import java.util.Map;
15+
16+
@AutoService(InstrumenterModule.class)
17+
public final class KafkaProducerInstrumentation extends InstrumenterModule.Tracing
18+
implements Instrumenter.ForSingleType {
19+
20+
public KafkaProducerInstrumentation() {
21+
super("kafka");
22+
}
23+
24+
@Override
25+
public boolean isEnabled() {
26+
return super.isEnabled() && Config.get().isExperimentalKafkaEnabled();
27+
}
28+
29+
@Override
30+
public String instrumentedType() {
31+
return "org.apache.kafka.clients.producer.KafkaProducer";
32+
}
33+
34+
@Override
35+
public String[] helperClassNames() {
36+
return new String[] {
37+
packageName + ".KafkaDecorator",
38+
packageName + ".TextMapInjectAdapterInterface",
39+
packageName + ".TextMapInjectAdapter",
40+
packageName + ".NoopTextMapInjectAdapter",
41+
packageName + ".KafkaProducerCallback",
42+
"datadog.trace.instrumentation.kafka_common.StreamingContext",
43+
packageName + ".AvroSchemaExtractor",
44+
};
45+
}
46+
47+
@Override
48+
public Map<String, String> contextStore() {
49+
return singletonMap("org.apache.kafka.clients.Metadata", "java.lang.String");
50+
}
51+
52+
@Override
53+
public void methodAdvice(MethodTransformer transformer) {
54+
transformer.applyAdvice(
55+
isMethod()
56+
.and(isPublic())
57+
.and(named("send"))
58+
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord")))
59+
.and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))),
60+
packageName + ".ProducerAdvice");
61+
62+
transformer.applyAdvice(
63+
isMethod()
64+
.and(isPrivate())
65+
.and(takesArgument(0, int.class))
66+
.and(named("ensureValidRecordSize")), // intercepting this call allows us to see the
67+
// estimated message size
68+
packageName + ".PayloadSizeAdvice");
69+
}
70+
}

0 commit comments

Comments
 (0)