Skip to content

Commit 11b5d49

Browse files
committed
Merge branch 'main' of github.com:jaydeluca/opentelemetry-java-instrumentation into metadata-functions
2 parents 2d5ff04 + 4bc7282 commit 11b5d49

File tree

22 files changed

+369
-249
lines changed

22 files changed

+369
-249
lines changed

.github/workflows/overhead-benchmark-daily.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,18 @@ jobs:
2828
run: |
2929
rsync -avv gh-pages/benchmark-overhead/results/ benchmark-overhead/results/
3030
31+
- name: Set up JDK for running Gradle
32+
uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0
33+
with:
34+
distribution: temurin
35+
java-version-file: .java-version
36+
3137
- name: Setup Gradle
3238
uses: gradle/actions/setup-gradle@ed408507eac070d1f99cc633dbcf757c94c7933a # v4.4.3
3339

40+
- name: Build Latest Snapshot JAR
41+
run: ./gradlew assemble -x javadoc
42+
3443
- name: Run tests
3544
working-directory: benchmark-overhead
3645
run: ./gradlew test

benchmark-overhead/src/test/java/io/opentelemetry/agents/LatestAgentSnapshotResolver.java

Lines changed: 51 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -5,115 +5,80 @@
55

66
package io.opentelemetry.agents;
77

8-
import static org.joox.JOOX.$;
9-
108
import java.io.IOException;
119
import java.nio.file.Files;
1210
import java.nio.file.Path;
1311
import java.nio.file.Paths;
14-
import java.nio.file.StandardOpenOption;
12+
import java.nio.file.StandardCopyOption;
1513
import java.util.Optional;
16-
import java.time.Duration;
17-
import okhttp3.OkHttpClient;
18-
import okhttp3.Request;
19-
import okhttp3.Response;
20-
import okhttp3.ResponseBody;
21-
import org.joox.Match;
2214
import org.slf4j.Logger;
2315
import org.slf4j.LoggerFactory;
24-
import org.w3c.dom.Document;
2516

2617
public class LatestAgentSnapshotResolver {
2718

2819
private static final Logger logger = LoggerFactory.getLogger(LatestAgentSnapshotResolver.class);
2920

30-
static final String BASE_URL =
31-
"https://oss.sonatype.org/content/repositories/snapshots/io/opentelemetry/javaagent/opentelemetry-javaagent";
32-
static final String LATEST_SNAPSHOT_META = BASE_URL + "/maven-metadata.xml";
33-
34-
private static final OkHttpClient client = new OkHttpClient.Builder()
35-
.connectTimeout(Duration.ofMinutes(1))
36-
.readTimeout(Duration.ofMinutes(1))
37-
.build();
38-
3921
Optional<Path> resolve() throws IOException {
40-
String version = fetchLatestSnapshotVersion();
41-
logger.info("Latest snapshot version is {}", version);
42-
String latestFilename = fetchLatestFilename(version);
43-
String url = BASE_URL + "/" + version + "/" + latestFilename;
44-
byte[] jarBytes = fetchBodyBytesFrom(url);
45-
Path path = Paths.get(".", "opentelemetry-javaagent-SNAPSHOT.jar");
46-
Files.write(
47-
path,
48-
jarBytes,
49-
StandardOpenOption.CREATE,
50-
StandardOpenOption.WRITE,
51-
StandardOpenOption.TRUNCATE_EXISTING);
52-
return Optional.of(path);
53-
}
22+
Path localJavaagentPath = findLocalJavaagentJar();
5423

55-
private String fetchLatestFilename(String version) throws IOException {
56-
String url = BASE_URL + "/" + version + "/maven-metadata.xml";
57-
String body = fetchBodyStringFrom(url);
58-
Document document = $(body).document();
59-
Match match = $(document).xpath("/metadata/versioning/snapshotVersions/snapshotVersion");
60-
return match.get().stream()
61-
.filter(
62-
elem -> {
63-
Match classifierMatch = $(elem).child("classifier");
64-
String classifier = classifierMatch == null ? null : classifierMatch.content();
65-
String extension = $(elem).child("extension").content();
66-
return "jar".equals(extension) && (classifier == null);
67-
})
68-
.map(e -> $(e).child("value").content())
69-
.findFirst()
70-
.map(value -> "opentelemetry-javaagent-" + value + ".jar")
71-
.orElseThrow();
72-
}
24+
if (localJavaagentPath == null || !Files.exists(localJavaagentPath)) {
25+
throw new IOException("Local javaagent JAR not found. Please run './gradlew :javaagent:assemble' from the project root first.");
26+
}
7327

74-
private String fetchLatestSnapshotVersion() throws IOException {
75-
String url = LATEST_SNAPSHOT_META;
76-
String body = fetchBodyStringFrom(url);
77-
Document document = $(body).document();
78-
Match match = $(document).xpath("/metadata/versioning/latest");
79-
return match.get(0).getTextContent();
80-
}
28+
logger.info("Using local javaagent JAR: {}", localJavaagentPath);
8129

82-
private String fetchBodyStringFrom(String url) throws IOException {
83-
return fetchBodyFrom(url).string();
30+
Path targetPath = Paths.get(".", "opentelemetry-javaagent-SNAPSHOT.jar");
31+
Files.copy(localJavaagentPath, targetPath, StandardCopyOption.REPLACE_EXISTING);
32+
return Optional.of(targetPath);
8433
}
8534

86-
private byte[] fetchBodyBytesFrom(String url) throws IOException {
87-
return fetchBodyFrom(url).bytes();
88-
}
35+
private Path findLocalJavaagentJar() {
36+
Path relativePath = Paths.get("../javaagent/build/libs").toAbsolutePath().normalize();
37+
Path javaagentJar = findJavaagentJarInDirectory(relativePath);
8938

90-
// The sonatype repository can be very unreliable, so we retry a few times
91-
private ResponseBody fetchBodyFrom(String url) throws IOException {
92-
Request request = new Request.Builder().url(url).build();
93-
IOException lastException = null;
39+
if (javaagentJar != null) {
40+
return javaagentJar;
41+
}
9442

95-
for (int attempt = 0; attempt < 3; attempt++) {
96-
try {
97-
try (Response response = client.newCall(request).execute()) {
98-
if (!response.isSuccessful()) {
99-
throw new IOException("Unexpected HTTP code " + response.code() + " for " + url);
100-
}
101-
ResponseBody body = response.body();
102-
if (body != null) {
103-
byte[] data = body.bytes();
104-
return ResponseBody.create(data, body.contentType());
105-
} else {
106-
throw new IOException("Response body is null");
107-
}
108-
}
109-
} catch (IOException e) {
110-
lastException = e;
111-
if (attempt < 2) {
112-
logger.warn("Attempt {} to fetch {} failed: {}. Retrying...", attempt + 1, url, e.getMessage());
43+
// If not found, try from the project root (in case running from different location)
44+
Path projectRoot = Paths.get(".").toAbsolutePath().normalize();
45+
while (projectRoot.getParent() != null) {
46+
Path gradlewFile = projectRoot.resolve("gradlew");
47+
if (Files.exists(gradlewFile)) {
48+
// Found the project root
49+
Path javaagentLibsDir = projectRoot.resolve("javaagent/build/libs");
50+
javaagentJar = findJavaagentJarInDirectory(javaagentLibsDir);
51+
if (javaagentJar != null) {
52+
return javaagentJar;
11353
}
54+
break;
11455
}
56+
projectRoot = projectRoot.getParent();
57+
}
58+
59+
return null;
60+
}
61+
62+
private Path findJavaagentJarInDirectory(Path directory) {
63+
if (!Files.exists(directory) || !Files.isDirectory(directory)) {
64+
return null;
65+
}
66+
67+
try {
68+
return Files.list(directory)
69+
.filter(path -> {
70+
String filename = path.getFileName().toString();
71+
// Look for the main jar: opentelemetry-javaagent-VERSION.jar (no additional suffixes)
72+
return filename.startsWith("opentelemetry-javaagent-") &&
73+
filename.endsWith(".jar") &&
74+
!filename.matches(".*-[a-z]+\\.jar"); // excludes anything with -word.jar pattern
75+
})
76+
.findFirst()
77+
.orElse(null);
78+
} catch (IOException e) {
79+
logger.warn("Failed to list files in directory {}: {}", directory, e.getMessage());
80+
return null;
11581
}
116-
throw lastException;
11782
}
11883
}
11984

conventions/src/main/kotlin/otel.nullaway-conventions.gradle.kts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,16 @@ nullaway {
1818

1919
tasks {
2020
withType<JavaCompile>().configureEach {
21-
if (name.contains("test", ignoreCase = true)) {
22-
options.errorprone.nullaway {
23-
enabled = false
24-
}
25-
} else {
26-
options.errorprone.nullaway {
27-
severity.set(CheckSeverity.ERROR)
28-
}
29-
}
3021
options.errorprone.nullaway {
22+
if (name.contains("test", ignoreCase = true)) {
23+
disable()
24+
} else {
25+
error()
26+
}
3127
customInitializerAnnotations.add("org.openjdk.jmh.annotations.Setup")
3228
excludedFieldAnnotations.add("org.mockito.Mock")
3329
excludedFieldAnnotations.add("org.mockito.InjectMocks")
3430
}
3531
}
3632
}
33+

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterable;
2020
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingIterator;
2121
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList;
22+
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingListIterator;
2223
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2324
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
2425
import java.util.Iterator;
2526
import java.util.List;
27+
import java.util.ListIterator;
2628
import net.bytebuddy.asm.Advice;
2729
import net.bytebuddy.asm.Advice.AssignReturned;
2830
import net.bytebuddy.description.type.TypeDescription;
@@ -60,6 +62,13 @@ public void transform(TypeTransformer transformer) {
6062
.and(takesArguments(0))
6163
.and(returns(Iterator.class)),
6264
ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice");
65+
transformer.applyAdviceToMethod(
66+
isMethod()
67+
.and(isPublic())
68+
.and(named("listIterator"))
69+
.and(takesArguments(0))
70+
.and(returns(ListIterator.class)),
71+
ConsumerRecordsInstrumentation.class.getName() + "$ListIteratorAdvice");
6372
}
6473

6574
@SuppressWarnings("unused")
@@ -118,4 +127,23 @@ public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
118127
iterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
119128
}
120129
}
130+
131+
@SuppressWarnings("unused")
132+
public static class ListIteratorAdvice {
133+
134+
@AssignReturned.ToReturned
135+
@Advice.OnMethodExit(suppress = Throwable.class)
136+
public static <K, V> ListIterator<ConsumerRecord<K, V>> wrap(
137+
@Advice.This ConsumerRecords<?, ?> records,
138+
@Advice.Return ListIterator<ConsumerRecord<K, V>> listIterator) {
139+
140+
// it's important not to suppress consumer span creation here because this instrumentation can
141+
// leak the context and so there may be a leaked consumer span in the context, in which
142+
// case it's important to overwrite the leaked span instead of suppressing the correct span
143+
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
144+
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records);
145+
return TracingListIterator.wrap(
146+
listIterator, consumerProcessInstrumenter(), wrappingEnabledSupplier(), consumerContext);
147+
}
148+
}
121149
}

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.nio.charset.StandardCharsets;
1919
import java.time.Duration;
2020
import java.util.List;
21+
import java.util.ListIterator;
2122
import java.util.concurrent.ExecutionException;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.concurrent.TimeoutException;
@@ -155,8 +156,8 @@ void testPassThroughTombstone()
155156
}
156157

157158
@DisplayName("test records(TopicPartition) kafka consume")
158-
@Test
159-
void testRecordsWithTopicPartitionKafkaConsume()
159+
@ValueSource(booleans = {true, false})
160+
void testRecordsWithTopicPartitionKafkaConsume(boolean testListIterator)
160161
throws ExecutionException, InterruptedException, TimeoutException {
161162
String greeting = "Hello from MockConsumer!";
162163
producer
@@ -172,9 +173,19 @@ void testRecordsWithTopicPartitionKafkaConsume()
172173
assertThat(recordsInPartition.size()).isEqualTo(1);
173174

174175
// iterate over records to generate spans
175-
for (ConsumerRecord<?, ?> record : recordsInPartition) {
176-
assertThat(record.value()).isEqualTo(greeting);
177-
assertThat(record.key()).isNull();
176+
if (testListIterator) {
177+
for (ListIterator<? extends ConsumerRecord<?, ?>> iterator =
178+
recordsInPartition.listIterator();
179+
iterator.hasNext(); ) {
180+
ConsumerRecord<?, ?> record = iterator.next();
181+
assertThat(record.value()).isEqualTo(greeting);
182+
assertThat(record.key()).isNull();
183+
}
184+
} else {
185+
for (ConsumerRecord<?, ?> record : recordsInPartition) {
186+
assertThat(record.value()).isEqualTo(greeting);
187+
assertThat(record.key()).isNull();
188+
}
178189
}
179190

180191
AtomicReference<SpanData> producerSpan = new AtomicReference<>();

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingIterable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
*/
1717
public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
1818
private final Iterable<ConsumerRecord<K, V>> delegate;
19-
private final Instrumenter<KafkaProcessRequest, Void> instrumenter;
20-
private final BooleanSupplier wrappingEnabled;
21-
private final KafkaConsumerContext consumerContext;
19+
protected final Instrumenter<KafkaProcessRequest, Void> instrumenter;
20+
protected final BooleanSupplier wrappingEnabled;
21+
protected final KafkaConsumerContext consumerContext;
2222
private boolean firstIterator = true;
2323

2424
protected TracingIterable(

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/TracingList.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,16 +137,14 @@ public int lastIndexOf(Object o) {
137137

138138
@Override
139139
public ListIterator<ConsumerRecord<K, V>> listIterator() {
140-
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
141-
// Consumer so we will not do that for now
142-
return delegate.listIterator();
140+
return TracingListIterator.wrap(
141+
delegate.listIterator(), instrumenter, wrappingEnabled, consumerContext);
143142
}
144143

145144
@Override
146145
public ListIterator<ConsumerRecord<K, V>> listIterator(int index) {
147-
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
148-
// Consumer so we will not do that for now
149-
return delegate.listIterator(index);
146+
return TracingListIterator.wrap(
147+
delegate.listIterator(index), instrumenter, wrappingEnabled, consumerContext);
150148
}
151149

152150
@Override

0 commit comments

Comments
 (0)