Skip to content

Commit 881f2b9

Browse files
authored
Kafka metadata updates (#15267)
1 parent e766a7b commit 881f2b9

File tree

18 files changed

+1548
-86
lines changed

18 files changed

+1548
-86
lines changed

docs/instrumentation-list.yaml

Lines changed: 1371 additions & 44 deletions
Large diffs are not rendered by default.

instrumentation-docs/ci-collect.sh

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ set -euo pipefail
88
# shellcheck source=instrumentation-docs/instrumentations.sh
99
source "$(dirname "$0")/instrumentations.sh"
1010

11+
# Collect standard and colima tasks (without testLatestDeps)
1112
ALL_TASKS=()
1213
for task in "${INSTRUMENTATIONS[@]}"; do
1314
ALL_TASKS+=(":instrumentation:${task}")
@@ -16,8 +17,23 @@ for task in "${COLIMA_INSTRUMENTATIONS[@]}"; do
1617
ALL_TASKS+=(":instrumentation:${task}")
1718
done
1819

19-
echo "Processing instrumentations..."
20+
echo "Processing standard instrumentations..."
2021
./gradlew "${ALL_TASKS[@]}" \
2122
-PcollectMetadata=true \
2223
--rerun-tasks --continue
24+
25+
# Collect and run tasks that need testLatestDeps
26+
LATEST_DEPS_TASKS=()
27+
for task in "${TEST_LATEST_DEPS_INSTRUMENTATIONS[@]}"; do
28+
LATEST_DEPS_TASKS+=(":instrumentation:${task}")
29+
done
30+
31+
if [[ ${#LATEST_DEPS_TASKS[@]} -gt 0 ]]; then
32+
echo "Processing instrumentations with -PtestLatestDeps=true..."
33+
./gradlew "${LATEST_DEPS_TASKS[@]}" \
34+
-PcollectMetadata=true \
35+
-PtestLatestDeps=true \
36+
--rerun-tasks --continue
37+
fi
38+
2339
echo "Telemetry file regeneration complete."

instrumentation-docs/collect.sh

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,25 @@ run_gradle_tasks() {
134134
--rerun-tasks --continue --no-parallel
135135
}
136136

137+
run_gradle_tasks_with_latest_deps() {
138+
local -a tasks=("$@")
139+
140+
if [[ ${#tasks[@]} -eq 0 ]]; then
141+
echo "No tasks to run"
142+
return 0
143+
fi
144+
145+
echo
146+
echo "Running Gradle tasks with -PtestLatestDeps=true:"
147+
printf ' %s\n' "${tasks[@]}"
148+
echo
149+
150+
./gradlew "${tasks[@]}" \
151+
-PcollectMetadata=true \
152+
-PtestLatestDeps=true \
153+
--rerun-tasks --continue --no-parallel
154+
}
155+
137156
# Cleans any stray .telemetry directories left in the repo.
138157
find_and_remove_all_telemetry() {
139158
echo "Removing stray .telemetry directories..."
@@ -153,6 +172,14 @@ main() {
153172
done < <(process_descriptors "${INSTRUMENTATIONS[@]}")
154173
run_gradle_tasks "${gradle_tasks[@]}"
155174

175+
# Process instrumentations requiring testLatestDeps
176+
echo "Processing instrumentations with -PtestLatestDeps=true..."
177+
gradle_tasks=()
178+
while IFS= read -r line; do
179+
gradle_tasks+=("$line")
180+
done < <(process_descriptors "${TEST_LATEST_DEPS_INSTRUMENTATIONS[@]}")
181+
run_gradle_tasks_with_latest_deps "${gradle_tasks[@]}"
182+
156183
# Setup colima if needed
157184
setup_colima
158185

instrumentation-docs/instrumentations.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ readonly INSTRUMENTATIONS=(
146146
"jsf:jsf-mojarra-3.0:javaagent:test"
147147
"jsf:jsf-myfaces-1.2:javaagent:myfaces2Test"
148148
"jsf:jsf-myfaces-3.0:javaagent:test"
149+
"kafka:kafka-clients:kafka-clients-2.6:library:test"
149150
"kafka:kafka-connect-2.6:testing:test"
150151
"nats:nats-2.17:javaagent:test"
151152
"nats:nats-2.17:javaagent:testExperimental"
@@ -222,3 +223,13 @@ readonly COLIMA_INSTRUMENTATIONS=(
222223
"oracle-ucp-11.2:javaagent:testStableSemconv"
223224
"spring:spring-jms:spring-jms-6.0:javaagent:test"
224225
)
226+
227+
# Some instrumentation test suites need to run with -PtestLatestDeps=true to collect
228+
# metrics telemetry or test against latest library versions.
229+
# shellcheck disable=SC2034
230+
readonly TEST_LATEST_DEPS_INSTRUMENTATIONS=(
231+
"kafka:kafka-clients:kafka-clients-0.11:javaagent:test"
232+
"kafka:kafka-clients:kafka-clients-0.11:javaagent:testExperimental"
233+
"kafka:kafka-streams-0.11:javaagent:test"
234+
"kafka:kafka-streams-0.11:javaagent:testExperimental"
235+
)

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ tasks {
3030
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
3131

3232
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
33-
34-
// TODO run tests both with and without experimental span attributes
35-
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
33+
systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false")
3634
}
3735

3836
val testPropagationDisabled by registering(Test::class) {
@@ -54,6 +52,20 @@ tasks {
5452
include("**/KafkaClientSuppressReceiveSpansTest.*")
5553
}
5654

55+
val testExperimental by registering(Test::class) {
56+
testClassesDirs = sourceSets.test.get().output.classesDirs
57+
classpath = sourceSets.test.get().runtimeClasspath
58+
59+
filter {
60+
excludeTestsMatching("KafkaClientPropagationDisabledTest")
61+
excludeTestsMatching("KafkaClientSuppressReceiveSpansTest")
62+
}
63+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
64+
65+
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
66+
systemProperty("metadataConfig", "otel.instrumentation.kafka.experimental-span-attributes=true")
67+
}
68+
5769
test {
5870
filter {
5971
excludeTestsMatching("KafkaClientPropagationDisabledTest")
@@ -63,7 +75,7 @@ tasks {
6375
}
6476

6577
check {
66-
dependsOn(testPropagationDisabled, testReceiveSpansDisabled)
78+
dependsOn(testPropagationDisabled, testReceiveSpansDisabled, testExperimental)
6779
}
6880
}
6981

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.ConsumerRecord;
2727
import org.apache.kafka.clients.consumer.ConsumerRecords;
2828
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.assertj.core.api.AbstractIterableAssert;
2930
import org.junit.jupiter.api.DisplayName;
3031
import org.junit.jupiter.api.Test;
3132
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -34,6 +35,9 @@
3435

3536
class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
3637

38+
private static final boolean testLatestDeps =
39+
Boolean.parseBoolean(System.getProperty("testLatestDeps", "true"));
40+
3741
@RegisterExtension
3842
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
3943

@@ -110,6 +114,13 @@ void testKafkaProducerAndConsumerSpan(boolean testHeaders) throws Exception {
110114
.hasAttributesSatisfyingExactly(
111115
processAttributes("10", greeting, testHeaders, false)),
112116
span -> span.hasName("processing").hasParent(trace.getSpan(1))));
117+
118+
if (testLatestDeps) {
119+
testing.waitAndAssertMetrics(
120+
"io.opentelemetry.kafka-clients-0.11",
121+
"kafka.producer.record_send_total",
122+
AbstractIterableAssert::isNotEmpty);
123+
}
113124
}
114125

115126
@DisplayName("test pass through tombstone")
@@ -155,6 +166,7 @@ void testPassThroughTombstone()
155166
processAttributes(null, null, false, false))));
156167
}
157168

169+
@ParameterizedTest
158170
@DisplayName("test records(TopicPartition) kafka consume")
159171
@ValueSource(booleans = {true, false})
160172
void testRecordsWithTopicPartitionKafkaConsume(boolean testListIterator)

instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
description: >
2-
This instrumentation enables messaging spans and metrics for Apache Kafka 0.11 clients.
3-
It automatically traces message production and consumption, propagates context, and emits metrics for production and consumption.
1+
description: This instrumentation enables messaging spans for Kafka producers and consumers, and collects internal Kafka client metrics.
2+
display_name: Apache Kafka Client
43
library_link: https://kafka.apache.org/
4+
semantic_conventions:
5+
- MESSAGING_SPANS
56
configurations:
67
- name: otel.instrumentation.kafka.producer-propagation.enabled
7-
description: Enable context propagation for kafka message producers.
8+
description: Enable context propagation for Kafka message producers.
89
type: boolean
910
default: true
1011
- name: otel.instrumentation.kafka.experimental-span-attributes
11-
description: Enables the capture of the experimental consumer attribute "kafka.record.queue_time_ms"
12+
description: Enables the capture of the experimental consumer attribute `kafka.record.queue_time_ms`.
1213
type: boolean
1314
default: false
1415
- name: otel.instrumentation.messaging.experimental.capture-headers

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;
77

8+
import static io.opentelemetry.api.common.AttributeKey.longKey;
89
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
910
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
1011
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT;
@@ -17,6 +18,7 @@
1718
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
1819
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
1920
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
21+
import static org.assertj.core.api.Assertions.assertThat;
2022

2123
import io.opentelemetry.api.common.AttributeKey;
2224
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
@@ -71,6 +73,9 @@ public abstract class KafkaClientBaseTest {
7173
protected Consumer<Integer, String> consumer;
7274
private final CountDownLatch consumerReady = new CountDownLatch(1);
7375

76+
static final boolean isExperimentalEnabled =
77+
Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes");
78+
7479
public static final int partition = 0;
7580
public static final TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, partition);
7681

@@ -230,8 +235,11 @@ protected static List<AttributeAssertion> processAttributes(
230235
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
231236
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
232237
satisfies(
233-
AttributeKey.longKey("kafka.record.queue_time_ms"),
234-
AbstractLongAssert::isNotNegative)));
238+
longKey("kafka.record.queue_time_ms"),
239+
val ->
240+
val.satisfiesAnyOf(
241+
v -> assertThat(v).isNotNegative(),
242+
v -> assertThat(isExperimentalEnabled).isFalse()))));
235243
// consumer group is not available in version 0.11
236244
if (Boolean.getBoolean("testLatestDeps")) {
237245
assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"));

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ tasks {
2020
withType<Test>().configureEach {
2121
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
2222
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
23+
systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false")
2324
}
2425

2526
test {

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ abstract class AbstractInterceptorsTest extends KafkaClientBaseTest {
3232
public Map<String, Object> producerProps() {
3333
Map<String, Object> props = super.producerProps();
3434
props.putAll(kafkaTelemetry().producerInterceptorConfigProperties());
35+
props.putAll(kafkaTelemetry().metricConfigProperties());
3536
return props;
3637
}
3738

3839
@Override
3940
public Map<String, Object> consumerProps() {
4041
Map<String, Object> props = super.consumerProps();
4142
props.putAll(kafkaTelemetry().consumerInterceptorConfigProperties());
43+
props.putAll(kafkaTelemetry().metricConfigProperties());
4244
return props;
4345
}
4446

0 commit comments

Comments
 (0)