Skip to content

Commit a761578

Browse files
committed
fix ci && test
- library instrumentation test
1 parent 489a41b commit a761578

File tree

3 files changed

+259
-6
lines changed
  • instrumentation
    • kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal
    • spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7

3 files changed

+259
-6
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerRequest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;
77

8-
import io.opentelemetry.instrumentation.api.util.VirtualField;
98
import java.util.Iterator;
109
import java.util.Map;
1110
import javax.annotation.Nullable;
@@ -24,9 +23,6 @@ public final class KafkaProducerRequest {
2423
@Nullable private final String clientId;
2524
@Nullable private final String bootstrapServers;
2625

27-
private static final VirtualField<Producer<?, ?>, String> producerStringVirtualField =
28-
VirtualField.find(Producer.class, String.class);
29-
3026
public static KafkaProducerRequest create(ProducerRecord<?, ?> record, Producer<?, ?> producer) {
3127
return new KafkaProducerRequest(
3228
record, extractClientId(producer), extractBootstrapServers(producer));
@@ -69,6 +65,6 @@ private static String extractClientId(Producer<?, ?> producer) {
6965
}
7066

7167
private static String extractBootstrapServers(Producer<?, ?> producer) {
72-
return producerStringVirtualField.get(producer);
68+
return KafkaUtil.getBootstrapServers(producer);
7369
}
7470
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaUtil.java

Lines changed: 232 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
import java.util.Map;
1616
import javax.annotation.Nullable;
1717
import org.apache.kafka.clients.consumer.Consumer;
18+
import org.apache.kafka.clients.producer.Producer;
1819
import org.apache.kafka.common.Metric;
1920
import org.apache.kafka.common.MetricName;
21+
import org.apache.kafka.common.Node;
2022

2123
/**
2224
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
@@ -35,6 +37,8 @@ public final class KafkaUtil {
3537

3638
private static final VirtualField<Consumer<?, ?>, String> consumerVirtualField =
3739
VirtualField.find(Consumer.class, String.class);
40+
private static final VirtualField<Producer<?, ?>, String> producerVirtualField =
41+
VirtualField.find(Producer.class, String.class);
3842

3943
static {
4044
MethodHandle getGroupMetadata;
@@ -71,7 +75,234 @@ public static String getClientId(Consumer<?, ?> consumer) {
7175

7276
@Nullable
7377
public static String getBootstrapServers(Consumer<?, ?> consumer) {
74-
return consumerVirtualField.get(consumer);
78+
String bootstrapServers = consumerVirtualField.get(consumer);
79+
// If bootstrap servers are not available from virtual field (library instrumentation),
80+
// try to extract them via reflection from Kafka client's metadata
81+
if (bootstrapServers == null) {
82+
bootstrapServers = extractBootstrapServersViaReflection(consumer);
83+
if (bootstrapServers != null) {
84+
consumerVirtualField.set(consumer, bootstrapServers);
85+
}
86+
}
87+
return bootstrapServers;
88+
}
89+
90+
@Nullable
91+
public static String getBootstrapServers(Producer<?, ?> producer) {
92+
String bootstrapServers = producerVirtualField.get(producer);
93+
// If bootstrap servers are not available from virtual field (library instrumentation),
94+
// try to extract them via reflection from Kafka client's metadata
95+
if (bootstrapServers == null) {
96+
bootstrapServers = extractBootstrapServersViaReflection(producer);
97+
if (bootstrapServers != null) {
98+
producerVirtualField.set(producer, bootstrapServers);
99+
}
100+
}
101+
return bootstrapServers;
102+
}
103+
104+
/**
105+
* Extract bootstrap servers from Kafka client's metadata using reflection. 1.metadata -> Cluster
106+
* -> nodes 2.metadata -> MetadataCache -> nodes 3.metadata -> MetadataSnapshot -> nodes
107+
* 4.delegate -> metadata -> ...
108+
*/
109+
@Nullable
110+
private static String extractBootstrapServersViaReflection(Object client) {
111+
if (client == null) {
112+
return null;
113+
}
114+
try {
115+
Object metadata = extractMetadata(client);
116+
117+
if (metadata == null) {
118+
return null;
119+
}
120+
121+
String bootstrapServers = extractFromMetadataSnapshot(metadata);
122+
123+
if (bootstrapServers == null) {
124+
bootstrapServers = extractFromMetadataCache(metadata);
125+
}
126+
if (bootstrapServers == null) {
127+
bootstrapServers = extractFromCluster(metadata);
128+
}
129+
130+
return bootstrapServers;
131+
} catch (RuntimeException e) {
132+
return null;
133+
}
134+
}
135+
136+
@Nullable
137+
private static Object extractMetadata(Object client) {
138+
try {
139+
java.lang.reflect.Field metadataField = findField(client.getClass(), "metadata");
140+
if (metadataField != null) {
141+
metadataField.setAccessible(true);
142+
Object metadata = metadataField.get(client);
143+
if (metadata != null) {
144+
return metadata;
145+
}
146+
}
147+
148+
java.lang.reflect.Field delegateField = findField(client.getClass(), "delegate");
149+
if (delegateField != null) {
150+
delegateField.setAccessible(true);
151+
Object delegate = delegateField.get(client);
152+
if (delegate != null) {
153+
java.lang.reflect.Field delegateMetadataField =
154+
findField(delegate.getClass(), "metadata");
155+
if (delegateMetadataField != null) {
156+
delegateMetadataField.setAccessible(true);
157+
return delegateMetadataField.get(delegate);
158+
}
159+
}
160+
}
161+
162+
return null;
163+
} catch (Exception e) {
164+
return null;
165+
}
166+
}
167+
168+
@Nullable
169+
private static java.lang.reflect.Field findField(Class<?> clazz, String fieldName) {
170+
Class<?> currentClass = clazz;
171+
while (currentClass != null) {
172+
try {
173+
return currentClass.getDeclaredField(fieldName);
174+
} catch (NoSuchFieldException e) {
175+
// Field not found in current class, try parent class
176+
currentClass = currentClass.getSuperclass();
177+
}
178+
}
179+
return null;
180+
}
181+
182+
@Nullable
183+
private static String extractFromCluster(Object metadata) {
184+
try {
185+
java.lang.reflect.Field clusterField = findField(metadata.getClass(), "cluster");
186+
if (clusterField == null) {
187+
return null;
188+
}
189+
clusterField.setAccessible(true);
190+
Object cluster = clusterField.get(metadata);
191+
192+
if (cluster == null) {
193+
return null;
194+
}
195+
196+
java.lang.reflect.Method nodesMethod = cluster.getClass().getDeclaredMethod("nodes");
197+
nodesMethod.setAccessible(true);
198+
Object nodes = nodesMethod.invoke(cluster);
199+
200+
return formatNodesAsBootstrapServers(nodes);
201+
} catch (Exception e) {
202+
return null;
203+
}
204+
}
205+
206+
@Nullable
207+
private static String extractFromMetadataCache(Object metadata) {
208+
try {
209+
java.lang.reflect.Field cacheField = findField(metadata.getClass(), "cache");
210+
if (cacheField == null) {
211+
return null;
212+
}
213+
cacheField.setAccessible(true);
214+
Object cache = cacheField.get(metadata);
215+
216+
if (cache == null) {
217+
return null;
218+
}
219+
220+
java.lang.reflect.Field nodesField = cache.getClass().getDeclaredField("nodes");
221+
nodesField.setAccessible(true);
222+
Object nodes = nodesField.get(cache);
223+
224+
return formatNodesAsBootstrapServers(nodes);
225+
} catch (Exception e) {
226+
return null;
227+
}
228+
}
229+
230+
@Nullable
231+
private static String extractFromMetadataSnapshot(Object metadata) {
232+
try {
233+
java.lang.reflect.Field snapshotField = findField(metadata.getClass(), "metadataSnapshot");
234+
if (snapshotField == null) {
235+
return null;
236+
}
237+
snapshotField.setAccessible(true);
238+
Object snapshot = snapshotField.get(metadata);
239+
240+
if (snapshot == null) {
241+
return null;
242+
}
243+
244+
java.lang.reflect.Field nodesField = snapshot.getClass().getDeclaredField("nodes");
245+
nodesField.setAccessible(true);
246+
Object nodes = nodesField.get(snapshot);
247+
248+
return formatNodesAsBootstrapServers(nodes);
249+
} catch (Exception e) {
250+
return null;
251+
}
252+
}
253+
254+
@Nullable
255+
private static String formatNodesAsBootstrapServers(Object nodes) {
256+
if (nodes == null) {
257+
return null;
258+
}
259+
260+
try {
261+
StringBuilder sb = new StringBuilder();
262+
263+
if (nodes instanceof java.util.Map) {
264+
// nodes is Map<Integer, Node>
265+
@SuppressWarnings("unchecked")
266+
java.util.Map<Integer, Object> nodeMap = (java.util.Map<Integer, Object>) nodes;
267+
268+
for (Object node : nodeMap.values()) {
269+
String address = getNodeAddress(node);
270+
if (address != null) {
271+
if (sb.length() > 0) {
272+
sb.append(",");
273+
}
274+
sb.append(address);
275+
}
276+
}
277+
} else if (nodes instanceof java.util.Collection) {
278+
// nodes is Collection<Node>
279+
@SuppressWarnings("unchecked")
280+
java.util.Collection<Object> nodeCollection = (java.util.Collection<Object>) nodes;
281+
282+
for (Object node : nodeCollection) {
283+
String address = getNodeAddress(node);
284+
if (address != null) {
285+
if (sb.length() > 0) {
286+
sb.append(",");
287+
}
288+
sb.append(address);
289+
}
290+
}
291+
}
292+
293+
return sb.length() > 0 ? sb.toString() : null;
294+
} catch (RuntimeException e) {
295+
return null;
296+
}
297+
}
298+
299+
@Nullable
300+
private static String getNodeAddress(Object o) {
301+
if (o == null || !(o instanceof Node)) {
302+
return null;
303+
}
304+
Node node = (Node) o;
305+
return node.host() + ":" + node.port();
75306
}
76307

77308
private static Map<String, String> getConsumerInfo(Consumer<?, ?> consumer) {

instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ void shouldCreateSpansForSingleRecordProcess() {
8585
equalTo(MESSAGING_SYSTEM, "kafka"),
8686
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
8787
equalTo(MESSAGING_OPERATION, "publish"),
88+
satisfies(
89+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
8890
satisfies(
8991
MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
9092
satisfies(
@@ -106,6 +108,9 @@ void shouldCreateSpansForSingleRecordProcess() {
106108
equalTo(MESSAGING_SYSTEM, "kafka"),
107109
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
108110
equalTo(MESSAGING_OPERATION, "receive"),
111+
satisfies(
112+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
113+
AbstractStringAssert::isNotEmpty),
109114
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testSingleListener"),
110115
satisfies(
111116
MESSAGING_CLIENT_ID,
@@ -120,6 +125,9 @@ void shouldCreateSpansForSingleRecordProcess() {
120125
equalTo(MESSAGING_SYSTEM, "kafka"),
121126
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
122127
equalTo(MESSAGING_OPERATION, "process"),
128+
satisfies(
129+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
130+
AbstractStringAssert::isNotEmpty),
123131
satisfies(
124132
MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative),
125133
satisfies(
@@ -160,6 +168,7 @@ void shouldHandleFailureInKafkaListener() {
160168
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
161169
equalTo(MESSAGING_OPERATION, "receive"),
162170
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testSingleListener"),
171+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
163172
satisfies(
164173
MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
165174
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1));
@@ -168,6 +177,7 @@ void shouldHandleFailureInKafkaListener() {
168177
equalTo(MESSAGING_SYSTEM, "kafka"),
169178
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
170179
equalTo(MESSAGING_OPERATION, "process"),
180+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
171181
satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative),
172182
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
173183
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
@@ -190,6 +200,8 @@ void shouldHandleFailureInKafkaListener() {
190200
equalTo(MESSAGING_SYSTEM, "kafka"),
191201
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
192202
equalTo(MESSAGING_OPERATION, "publish"),
203+
satisfies(
204+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
193205
satisfies(
194206
MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
195207
satisfies(
@@ -260,6 +272,8 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
260272
equalTo(MESSAGING_SYSTEM, "kafka"),
261273
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
262274
equalTo(MESSAGING_OPERATION, "publish"),
275+
satisfies(
276+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
263277
satisfies(
264278
MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
265279
satisfies(
@@ -276,6 +290,8 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
276290
equalTo(MESSAGING_SYSTEM, "kafka"),
277291
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
278292
equalTo(MESSAGING_OPERATION, "publish"),
293+
satisfies(
294+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
279295
satisfies(
280296
MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
281297
satisfies(
@@ -299,6 +315,9 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
299315
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
300316
equalTo(MESSAGING_OPERATION, "receive"),
301317
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"),
318+
satisfies(
319+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
320+
AbstractStringAssert::isNotEmpty),
302321
satisfies(
303322
MESSAGING_CLIENT_ID,
304323
stringAssert -> stringAssert.startsWith("consumer")),
@@ -315,6 +334,9 @@ void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
315334
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
316335
equalTo(MESSAGING_OPERATION, "process"),
317336
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"),
337+
satisfies(
338+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
339+
AbstractStringAssert::isNotEmpty),
318340
satisfies(
319341
MESSAGING_CLIENT_ID,
320342
stringAssert -> stringAssert.startsWith("consumer")),
@@ -349,6 +371,8 @@ void shouldHandleFailureInKafkaBatchListener() {
349371
equalTo(MESSAGING_SYSTEM, "kafka"),
350372
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
351373
equalTo(MESSAGING_OPERATION, "publish"),
374+
satisfies(
375+
MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
352376
satisfies(
353377
MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
354378
satisfies(
@@ -406,6 +430,7 @@ private static void assertReceiveSpan(SpanDataAssert span) {
406430
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
407431
equalTo(MESSAGING_OPERATION, "receive"),
408432
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"),
433+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
409434
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
410435
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1));
411436
}
@@ -421,6 +446,7 @@ private static void assertProcessSpan(
421446
equalTo(MESSAGING_DESTINATION_NAME, "testBatchTopic"),
422447
equalTo(MESSAGING_OPERATION, "process"),
423448
equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "testBatchListener"),
449+
satisfies(MESSAGING_KAFKA_BOOTSTRAP_SERVERS, AbstractStringAssert::isNotEmpty),
424450
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
425451
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1));
426452
if (failed) {

0 commit comments

Comments
 (0)