Skip to content

Commit 50f341a

Browse files
author
nayeem-kamal
committed
fix muzzle for kafka
1 parent 7680873 commit 50f341a

14 files changed

+67
-26
lines changed

dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ muzzle {
66
group = "org.apache.kafka"
77
module = "kafka-clients"
88
versions = "[3.8.0,)"
9-
assertInverse = false
9+
assertInverse = true
1010
}
1111
}
1212

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import org.apache.kafka.clients.Metadata;
88
import org.apache.kafka.clients.consumer.ConsumerConfig;
99
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
10-
import org.apache.kafka.clients.consumer.ConsumerRecord;
1110
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
1211
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
1312

@@ -54,9 +53,8 @@ public static void captureGroup(
5453
}
5554
}
5655

57-
public static void muzzleCheck(ConsumerRecord record) {
58-
// KafkaConsumerInstrumentation only applies for kafka versions with headers
59-
// Make an explicit call so KafkaConsumerGroupInstrumentation does the same
60-
record.headers();
56+
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
57+
// Only applies for kafka versions with OffsetCommitCallbackInvoker
58+
invoker.executeCallbacks();
6159
}
6260
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConsumerCoordinatorAdvice.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
import java.util.Map;
1313
import net.bytebuddy.asm.Advice;
1414
import org.apache.kafka.clients.Metadata;
15-
import org.apache.kafka.clients.consumer.ConsumerRecord;
1615
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
1716
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
17+
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
1818
import org.apache.kafka.clients.consumer.internals.RequestFuture;
1919
import org.apache.kafka.common.TopicPartition;
2020

@@ -66,9 +66,8 @@ public static void trackCommitOffset(
6666
}
6767
}
6868

69-
public static void muzzleCheck(ConsumerRecord record) {
70-
// KafkaConsumerInstrumentation only applies for kafka versions with headers
71-
// Make an explicit call so ConsumerCoordinatorInstrumentation does the same
72-
record.headers();
69+
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
70+
// Only applies for kafka versions with OffsetCommitCallbackInvoker
71+
invoker.executeCallbacks();
7372
}
7473
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IterableAdvice.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.apache.kafka.clients.Metadata;
66
import org.apache.kafka.clients.consumer.ConsumerRecord;
77
import org.apache.kafka.clients.consumer.ConsumerRecords;
8+
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
89

910
public class IterableAdvice {
1011

@@ -31,4 +32,9 @@ public static void wrap(
3132
bootstrapServers);
3233
}
3334
}
35+
36+
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
37+
// Only applies for kafka versions with OffsetCommitCallbackInvoker
38+
invoker.executeCallbacks();
39+
}
3440
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/IteratorAdvice.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.kafka.clients.Metadata;
77
import org.apache.kafka.clients.consumer.ConsumerRecord;
88
import org.apache.kafka.clients.consumer.ConsumerRecords;
9+
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
910

1011
public class IteratorAdvice {
1112

@@ -32,4 +33,9 @@ public static void wrap(
3233
bootstrapServers);
3334
}
3435
}
36+
37+
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
38+
// Only applies for kafka versions with OffsetCommitCallbackInvoker
39+
invoker.executeCallbacks();
40+
}
3541
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInstrumentationHelper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import datadog.trace.api.Config;
44
import datadog.trace.bootstrap.ContextStore;
55
import org.apache.kafka.clients.Metadata;
6+
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
67

78
public class KafkaConsumerInstrumentationHelper {
89
public static String extractGroup(KafkaConsumerInfo kafkaConsumerInfo) {
@@ -26,4 +27,9 @@ public static String extractClusterId(
2627
public static String extractBootstrapServers(KafkaConsumerInfo kafkaConsumerInfo) {
2728
return kafkaConsumerInfo == null ? null : kafkaConsumerInfo.getBootstrapServers().get();
2829
}
30+
31+
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
32+
// Only applies for kafka versions with OffsetCommitCallbackInvoker
33+
invoker.executeCallbacks();
34+
}
2935
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/LegacyConstructorAdvice.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
import org.apache.kafka.clients.Metadata;
77
import org.apache.kafka.clients.consumer.ConsumerConfig;
88
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
9-
import org.apache.kafka.clients.consumer.ConsumerRecord;
109
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
1110
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
11+
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
1212

1313
public class LegacyConstructorAdvice {
1414
// new - capture the ConsumerDelegate instead of KafkaConsumer
@@ -46,9 +46,8 @@ public static void captureGroup(
4646
}
4747
}
4848

49-
public static void muzzleCheck(ConsumerRecord record) {
50-
// KafkaConsumerInstrumentation only applies for kafka versions with headers
51-
// Make an explicit call so KafkaConsumerGroupInstrumentation does the same
52-
record.headers();
49+
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
50+
// Only applies for kafka versions with OffsetCommitCallbackInvoker
51+
invoker.executeCallbacks();
5352
}
5453
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ListAdvice.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.kafka.clients.Metadata;
1010
import org.apache.kafka.clients.consumer.ConsumerRecord;
1111
import org.apache.kafka.clients.consumer.ConsumerRecords;
12+
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
1213

1314
public class ListAdvice {
1415

@@ -30,4 +31,9 @@ public static void wrap(
3031
iterable, KAFKA_CONSUME, CONSUMER_DECORATE, group, clusterId, bootstrapServers);
3132
}
3233
}
34+
35+
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
36+
// Only applies for kafka versions with OffsetCommitCallbackInvoker
37+
invoker.executeCallbacks();
38+
}
3339
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdate22AndAfterAdvice.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import datadog.trace.bootstrap.InstrumentationContext;
44
import net.bytebuddy.asm.Advice;
55
import org.apache.kafka.clients.Metadata;
6-
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
77
import org.apache.kafka.common.requests.MetadataResponse;
88

99
public class MetadataUpdate22AndAfterAdvice {
@@ -15,9 +15,8 @@ public static void onEnter(
1515
}
1616
}
1717

18-
public static void muzzleCheck(ConsumerRecord record) {
19-
// KafkaConsumerInstrumentation only applies for kafka versions with headers
20-
// Make an explicit call so MetadataInstrumentation does the same
21-
record.headers();
18+
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
19+
// Only applies for kafka versions with OffsetCommitCallbackInvoker
20+
invoker.executeCallbacks();
2221
}
2322
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/MetadataUpdateBefore22Advice.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import datadog.trace.bootstrap.InstrumentationContext;
44
import net.bytebuddy.asm.Advice;
55
import org.apache.kafka.clients.Metadata;
6-
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
77
import org.apache.kafka.common.Cluster;
88

99
public class MetadataUpdateBefore22Advice {
@@ -16,9 +16,8 @@ public static void onEnter(
1616
}
1717
}
1818

19-
public static void muzzleCheck(ConsumerRecord record) {
20-
// KafkaConsumerInstrumentation only applies for kafka versions with headers
21-
// Make an explicit call so MetadataInstrumentation does the same
22-
record.headers();
19+
public static void muzzleCheck(OffsetCommitCallbackInvoker invoker) {
20+
// Only applies for kafka versions with OffsetCommitCallbackInvoker
21+
invoker.executeCallbacks();
2322
}
2423
}

0 commit comments

Comments
 (0)