Skip to content

Commit 8efbf5c

Browse files
committed
wip
1 parent 5c5592a commit 8efbf5c

File tree

5 files changed

+167
-0
lines changed

5 files changed

+167
-0
lines changed

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.kafka.clients.consumer.ConsumerRecords;
3535
import org.apache.kafka.clients.consumer.KafkaConsumer;
3636
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
3739

3840
/**
3941
* This instrumentation saves additional information from the KafkaConsumer, such as consumer group
@@ -43,6 +45,9 @@
4345
public final class KafkaConsumerInfoInstrumentation extends InstrumenterModule.Tracing
4446
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
4547

48+
private static final Logger log =
49+
LoggerFactory.getLogger(KafkaConsumerInfoInstrumentation.class);
50+
4651
public KafkaConsumerInfoInstrumentation() {
4752
super("kafka", "kafka-0.11");
4853
}
@@ -148,6 +153,9 @@ public static void captureGroup(
148153
.put(coordinator, kafkaConsumerInfo);
149154
}
150155
}
156+
157+
// Log consumer configuration
158+
logConsumerConfiguration(consumerConfig, normalizedConsumerGroup);
151159
}
152160

153161
public static void muzzleCheck(ConsumerRecord record) {
@@ -191,6 +199,9 @@ public static void captureGroup(
191199
.put(coordinator, kafkaConsumerInfo);
192200
}
193201
}
202+
203+
// Log consumer configuration
204+
logConsumerConfigurationFromMap(consumerConfig, normalizedConsumerGroup);
194205
}
195206

196207
public static void muzzleCheck(ConsumerRecord record) {
@@ -256,4 +267,45 @@ public static void captureGroup(
256267
scope.close();
257268
}
258269
}
270+
271+
private static void logConsumerConfiguration(
272+
ConsumerConfig consumerConfig, String consumerGroup) {
273+
try {
274+
log.info("Kafka Consumer started - Group: {}", consumerGroup);
275+
log.info("Consumer Configuration (all properties):");
276+
277+
// Get all configuration values
278+
java.util.Map<String, ?> allConfigs = consumerConfig.values();
279+
280+
// Sort by key for consistent output
281+
allConfigs.entrySet().stream()
282+
.sorted(java.util.Map.Entry.comparingByKey())
283+
.forEach(entry -> {
284+
log.info(" {}: {}", entry.getKey(), entry.getValue());
285+
});
286+
287+
// TODO: Add data capture logic here
288+
} catch (Exception e) {
289+
log.debug("Error logging consumer configuration", e);
290+
}
291+
}
292+
293+
private static void logConsumerConfigurationFromMap(
294+
Map<String, Object> consumerConfig, String consumerGroup) {
295+
try {
296+
log.info("Kafka Consumer started - Group: {}", consumerGroup);
297+
log.info("Consumer Configuration (all properties):");
298+
299+
// Sort by key for consistent output
300+
consumerConfig.entrySet().stream()
301+
.sorted(java.util.Map.Entry.comparingByKey())
302+
.forEach(entry -> {
303+
log.info(" {}: {}", entry.getKey(), entry.getValue());
304+
});
305+
306+
// TODO: Add data capture logic here
307+
} catch (Exception e) {
308+
log.debug("Error logging consumer configuration", e);
309+
}
310+
}
259311
}

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
1717
import static datadog.trace.instrumentation.kafka_common.StreamingContext.STREAMING_CONTEXT;
1818
import static java.util.Collections.singletonMap;
19+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
1920
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
2021
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
2122
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
@@ -48,11 +49,15 @@
4849
import org.apache.kafka.clients.producer.ProducerRecord;
4950
import org.apache.kafka.clients.producer.internals.Sender;
5051
import org.apache.kafka.common.record.RecordBatch;
52+
import org.slf4j.Logger;
53+
import org.slf4j.LoggerFactory;
5154

5255
@AutoService(InstrumenterModule.class)
5356
public final class KafkaProducerInstrumentation extends InstrumenterModule.Tracing
5457
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
5558

59+
private static final Logger log = LoggerFactory.getLogger(KafkaProducerInstrumentation.class);
60+
5661
public KafkaProducerInstrumentation() {
5762
super("kafka", "kafka-0.11");
5863
}
@@ -94,6 +99,13 @@ public Map<String, String> contextStore() {
9499

95100
@Override
96101
public void methodAdvice(MethodTransformer transformer) {
102+
transformer.applyAdvice(
103+
isConstructor()
104+
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerConfig")))
105+
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Serializer")))
106+
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Serializer"))),
107+
KafkaProducerInstrumentation.class.getName() + "$ProducerConstructorAdvice");
108+
97109
transformer.applyAdvice(
98110
isMethod()
99111
.and(isPublic())
@@ -217,6 +229,13 @@ public static void stopSpan(
217229
}
218230
}
219231

232+
public static class ProducerConstructorAdvice {
233+
@Advice.OnMethodExit(suppress = Throwable.class)
234+
public static void captureConfiguration(@Advice.Argument(0) ProducerConfig producerConfig) {
235+
logProducerConfiguration(producerConfig);
236+
}
237+
}
238+
220239
public static class PayloadSizeAdvice {
221240

222241
/**
@@ -245,4 +264,25 @@ public static void onEnter(@Advice.Argument(value = 0) int estimatedPayloadSize)
245264
}
246265
}
247266
}
267+
268+
private static void logProducerConfiguration(ProducerConfig producerConfig) {
269+
try {
270+
log.info("Kafka Producer started");
271+
log.info("Producer Configuration (all properties):");
272+
273+
// Get all configuration values
274+
java.util.Map<String, ?> allConfigs = producerConfig.values();
275+
276+
// Sort by key for consistent output
277+
allConfigs.entrySet().stream()
278+
.sorted(java.util.Map.Entry.comparingByKey())
279+
.forEach(entry -> {
280+
log.info(" {}: {}", entry.getKey(), entry.getValue());
281+
});
282+
283+
// TODO: Add data capture logic here
284+
} catch (Exception e) {
285+
log.debug("Error logging producer configuration", e);
286+
}
287+
}
248288
}

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
44
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
55
import static java.util.Collections.singletonMap;
6+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
67
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
78
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
89
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
@@ -54,6 +55,13 @@ public Map<String, String> contextStore() {
5455

5556
@Override
5657
public void methodAdvice(MethodTransformer transformer) {
58+
transformer.applyAdvice(
59+
isConstructor()
60+
.and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerConfig")))
61+
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Serializer")))
62+
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Serializer"))),
63+
packageName + ".ProducerConstructorAdvice");
64+
5765
transformer.applyAdvice(
5866
isMethod()
5967
.and(isPublic())

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ConstructorAdvice.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@
1010
import org.apache.kafka.clients.consumer.ConsumerRecord;
1111
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
1212
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
1315

1416
public class ConstructorAdvice {
17+
private static final Logger log = LoggerFactory.getLogger(ConstructorAdvice.class);
18+
1519
// new - capturing OffsetCommitCallbackInvoker instead of the old ConsumerCoordinator
1620
@Advice.OnMethodExit(suppress = Throwable.class)
1721
public static void captureGroup(
@@ -52,6 +56,31 @@ public static void captureGroup(
5256
InstrumentationContext.get(OffsetCommitCallbackInvoker.class, KafkaConsumerInfo.class)
5357
.put(offsetCommitCallbackInvoker, kafkaConsumerInfo);
5458
}
59+
60+
// Log consumer configuration
61+
logConsumerConfiguration(consumerConfig, normalizedConsumerGroup);
62+
}
63+
64+
private static void logConsumerConfiguration(
65+
ConsumerConfig consumerConfig, String consumerGroup) {
66+
try {
67+
log.info("Kafka Consumer started - Group: {}", consumerGroup);
68+
log.info("Consumer Configuration (all properties):");
69+
70+
// Get all configuration values
71+
java.util.Map<String, ?> allConfigs = consumerConfig.values();
72+
73+
// Sort by key for consistent output
74+
allConfigs.entrySet().stream()
75+
.sorted(java.util.Map.Entry.comparingByKey())
76+
.forEach(entry -> {
77+
log.info(" {}: {}", entry.getKey(), entry.getValue());
78+
});
79+
80+
// TODO: Add data capture logic here
81+
} catch (Exception e) {
82+
log.debug("Error logging consumer configuration", e);
83+
}
5584
}
5685

5786
public static void muzzleCheck(ConsumerRecord record) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package datadog.trace.instrumentation.kafka_clients38;
2+
3+
import net.bytebuddy.asm.Advice;
4+
import org.apache.kafka.clients.producer.ProducerConfig;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
public class ProducerConstructorAdvice {
9+
private static final Logger log = LoggerFactory.getLogger(ProducerConstructorAdvice.class);
10+
11+
@Advice.OnMethodExit(suppress = Throwable.class)
12+
public static void captureConfiguration(
13+
@Advice.Argument(0) ProducerConfig producerConfig) {
14+
logProducerConfiguration(producerConfig);
15+
}
16+
17+
private static void logProducerConfiguration(ProducerConfig producerConfig) {
18+
try {
19+
log.info("Kafka Producer started");
20+
log.info("Producer Configuration (all properties):");
21+
22+
// Get all configuration values
23+
java.util.Map<String, ?> allConfigs = producerConfig.values();
24+
25+
// Sort by key for consistent output
26+
allConfigs.entrySet().stream()
27+
.sorted(java.util.Map.Entry.comparingByKey())
28+
.forEach(entry -> {
29+
log.info(" {}: {}", entry.getKey(), entry.getValue());
30+
});
31+
32+
// TODO: Add data capture logic here
33+
} catch (Exception e) {
34+
log.debug("Error logging producer configuration", e);
35+
}
36+
}
37+
}
38+

0 commit comments

Comments
 (0)