Skip to content

Commit 84ee013

Browse files
committed
forward all calls to (subject, replyTo, headers, data) instead of (message)
1 parent 6a3baf1 commit 84ee013

File tree

18 files changed

+321
-389
lines changed

18 files changed

+321
-389
lines changed

instrumentation/nats/nats-2.17/javaagent/build.gradle.kts

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,11 @@ muzzle {
66
pass {
77
group.set("io.nats")
88
module.set("jnats")
9-
versions.set("[2.17.7,)")
9+
versions.set("[2.17.2,)")
1010

1111
// Could not find io.nats:nats-parent:1.0-SNAPSHOT
1212
skip("0.5.0", "0.5.1")
1313

14-
// Headers are readOnly, so context can not be propagated
15-
// https://github.com/nats-io/nats.java/pull/1123
16-
skip("2.17.2", "2.17.3", "2.17.4", "2.17.5", "2.17.6")
17-
1814
assertInverse.set(true)
1915
}
2016
}
@@ -26,29 +22,26 @@ dependencies {
2622
testImplementation(project(":instrumentation:nats:nats-2.17:testing"))
2723
}
2824

29-
val collectMetadata = findProperty("collectMetadata")?.toString() ?: "false"
30-
3125
tasks {
3226
withType<Test>().configureEach {
3327
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
28+
systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false")
3429
}
3530

3631
val testExperimental by registering(Test::class) {
32+
testClassesDirs = sourceSets.test.get().output.classesDirs
33+
classpath = sourceSets.test.get().runtimeClasspath
3734
filter {
3835
includeTestsMatching("NatsInstrumentationExperimentalTest")
3936
}
4037
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
4138
jvmArgs("-Dotel.instrumentation.messaging.experimental.capture-headers=captured-header")
42-
43-
systemProperty("collectMetadata", collectMetadata)
4439
}
4540

4641
test {
4742
filter {
4843
excludeTestsMatching("NatsInstrumentationExperimentalTest")
4944
}
50-
51-
systemProperty("collectMetadata", collectMetadata)
5245
}
5346

5447
check {

instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionPublishInstrumentation.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public static boolean onEnter(
8181
@Advice.This Connection connection,
8282
@Advice.Argument(0) String subject,
8383
@Advice.Argument(1) byte[] body) {
84-
connection.publish(NatsMessageWritableHeaders.create(subject, body));
84+
connection.publish(subject, null, null, body);
8585
return true;
8686
}
8787
}
@@ -94,7 +94,7 @@ public static boolean onEnter(
9494
@Advice.Argument(0) String subject,
9595
@Advice.Argument(1) Headers headers,
9696
@Advice.Argument(2) byte[] body) {
97-
connection.publish(NatsMessageWritableHeaders.create(subject, headers, body));
97+
connection.publish(subject, null, headers, body);
9898
return true;
9999
}
100100
}
@@ -107,39 +107,28 @@ public static boolean onEnter(
107107
@Advice.Argument(0) String subject,
108108
@Advice.Argument(1) String replyTo,
109109
@Advice.Argument(2) byte[] body) {
110-
connection.publish(NatsMessageWritableHeaders.create(subject, replyTo, body));
110+
connection.publish(subject, replyTo, null, body);
111111
return true;
112112
}
113113
}
114114

115115
@SuppressWarnings("unused")
116116
public static class PublishReplyToHeadersBodyAdvice {
117-
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
118-
public static boolean onEnter(
119-
@Advice.This Connection connection,
120-
@Advice.Argument(0) String subject,
121-
@Advice.Argument(1) String replyTo,
122-
@Advice.Argument(2) Headers headers,
123-
@Advice.Argument(3) byte[] body) {
124-
connection.publish(NatsMessageWritableHeaders.create(subject, replyTo, headers, body));
125-
return true;
126-
}
127-
}
128-
129-
@SuppressWarnings("unused")
130-
public static class PublishMessageAdvice {
131117

132118
@Advice.OnMethodEnter(suppress = Throwable.class)
133119
public static void onEnter(
134120
@Advice.This Connection connection,
135-
@Advice.Argument(value = 0, readOnly = false) Message message,
121+
@Advice.Argument(0) String subject,
122+
@Advice.Argument(1) String replyTo,
123+
@Advice.Argument(value = 2, readOnly = false) Headers headers,
124+
@Advice.Argument(3) byte[] body,
136125
@Advice.Local("otelContext") Context otelContext,
137126
@Advice.Local("otelScope") Scope otelScope,
138127
@Advice.Local("natsRequest") NatsRequest natsRequest) {
139-
message = NatsMessageWritableHeaders.create(message);
128+
headers = NatsMessageWritableHeaders.create(headers);
140129

141130
Context parentContext = Context.current();
142-
natsRequest = NatsRequest.create(connection, message);
131+
natsRequest = NatsRequest.create(connection, subject, replyTo, headers, body);
143132

144133
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
145134
return;
@@ -163,4 +152,15 @@ public static void onExit(
163152
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
164153
}
165154
}
155+
156+
@SuppressWarnings("unused")
157+
public static class PublishMessageAdvice {
158+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
159+
public static boolean onEnter(
160+
@Advice.This Connection connection, @Advice.Argument(0) Message message) {
161+
connection.publish(
162+
message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData());
163+
return true;
164+
}
165+
}
166166
}

instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionRequestInstrumentation.java

Lines changed: 67 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public static Message onEnter(
129129
@Advice.Argument(2) Duration timeout,
130130
@Advice.Local("message") Message message)
131131
throws InterruptedException {
132-
message = connection.request(NatsMessageWritableHeaders.create(subject, body), timeout);
132+
message = connection.request(subject, null, body, timeout);
133133
return message;
134134
}
135135

@@ -145,42 +145,18 @@ public static Message onExit(
145145
@SuppressWarnings("unused")
146146
public static class RequestHeadersBodyAdvice {
147147

148-
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
149-
public static Message onEnter(
148+
@Advice.OnMethodEnter(suppress = Throwable.class)
149+
public static void onEnter(
150150
@Advice.This Connection connection,
151151
@Advice.Argument(0) String subject,
152-
@Advice.Argument(1) Headers headers,
152+
@Advice.Argument(value = 1, readOnly = false) Headers headers,
153153
@Advice.Argument(2) byte[] body,
154154
@Advice.Argument(3) Duration timeout,
155-
@Advice.Local("message") Message message)
156-
throws InterruptedException {
157-
message =
158-
connection.request(NatsMessageWritableHeaders.create(subject, headers, body), timeout);
159-
return message;
160-
}
161-
162-
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
163-
public static Message onExit(
164-
@Advice.Return(readOnly = false) Message returned,
165-
@Advice.Local("message") Message message) {
166-
returned = message;
167-
return returned;
168-
}
169-
}
170-
171-
@SuppressWarnings("unused")
172-
public static class RequestMessageAdvice {
173-
174-
@Advice.OnMethodEnter(suppress = Throwable.class)
175-
public static void onEnter(
176-
@Advice.This Connection connection,
177-
@Advice.Argument(value = 0, readOnly = false) Message message,
178155
@Advice.Local("otelContext") Context otelContext,
179156
@Advice.Local("otelScope") Scope otelScope,
180157
@Advice.Local("natsRequest") NatsRequest natsRequest) {
181-
message = NatsMessageWritableHeaders.create(message);
182-
183-
natsRequest = NatsRequest.create(connection, message);
158+
headers = NatsMessageWritableHeaders.create(headers);
159+
natsRequest = NatsRequest.create(connection, subject, null, headers, body);
184160
Context parentContext = Context.current();
185161

186162
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
@@ -214,38 +190,40 @@ public static void onExit(
214190
}
215191

216192
@SuppressWarnings("unused")
217-
public static class RequestFutureBodyAdvice {
193+
public static class RequestMessageAdvice {
218194

219195
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
220-
public static CompletableFuture<Message> onEnter(
196+
public static Message onEnter(
221197
@Advice.This Connection connection,
222-
@Advice.Argument(0) String subject,
223-
@Advice.Argument(1) byte[] body,
224-
@Advice.Local("future") CompletableFuture<Message> future) {
225-
future = connection.request(NatsMessageWritableHeaders.create(subject, body));
226-
return future;
198+
@Advice.Argument(0) Message request,
199+
@Advice.Argument(1) Duration timeout,
200+
@Advice.Local("response") Message response)
201+
throws InterruptedException {
202+
response =
203+
connection.request(
204+
request.getSubject(), request.getHeaders(), request.getData(), timeout);
205+
return response;
227206
}
228207

229208
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
230-
public static CompletableFuture<Message> onExit(
231-
@Advice.Return(readOnly = false) CompletableFuture<Message> messageFuture,
232-
@Advice.Local("future") CompletableFuture<Message> future) {
233-
messageFuture = future;
234-
return messageFuture;
209+
public static Message onExit(
210+
@Advice.Return(readOnly = false) Message returned,
211+
@Advice.Local("response") Message response) {
212+
returned = response;
213+
return returned;
235214
}
236215
}
237216

238217
@SuppressWarnings("unused")
239-
public static class RequestFutureHeadersBodyAdvice {
218+
public static class RequestFutureBodyAdvice {
240219

241220
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
242221
public static CompletableFuture<Message> onEnter(
243222
@Advice.This Connection connection,
244223
@Advice.Argument(0) String subject,
245-
@Advice.Argument(1) Headers headers,
246-
@Advice.Argument(2) byte[] body,
224+
@Advice.Argument(1) byte[] body,
247225
@Advice.Local("future") CompletableFuture<Message> future) {
248-
future = connection.request(NatsMessageWritableHeaders.create(subject, headers, body));
226+
future = connection.request(subject, null, body);
249227
return future;
250228
}
251229

@@ -259,19 +237,20 @@ public static CompletableFuture<Message> onExit(
259237
}
260238

261239
@SuppressWarnings("unused")
262-
public static class RequestFutureMessageAdvice {
240+
public static class RequestFutureHeadersBodyAdvice {
263241

264242
@Advice.OnMethodEnter(suppress = Throwable.class)
265243
public static void onEnter(
266244
@Advice.This Connection connection,
267-
@Advice.Argument(value = 0, readOnly = false) Message message,
245+
@Advice.Argument(0) String subject,
246+
@Advice.Argument(value = 1, readOnly = false) Headers headers,
247+
@Advice.Argument(2) byte[] body,
268248
@Advice.Local("otelContext") Context otelContext,
269249
@Advice.Local("otelParentContext") Context otelParentContext,
270250
@Advice.Local("otelScope") Scope otelScope,
271251
@Advice.Local("natsRequest") NatsRequest natsRequest) {
272-
message = NatsMessageWritableHeaders.create(message);
273-
274-
natsRequest = NatsRequest.create(connection, message);
252+
headers = NatsMessageWritableHeaders.create(headers);
253+
natsRequest = NatsRequest.create(connection, subject, null, headers, body);
275254
otelParentContext = Context.current();
276255

277256
if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
@@ -308,15 +287,14 @@ public static void onExit(
308287
}
309288

310289
@SuppressWarnings("unused")
311-
public static class RequestTimeoutFutureBodyAdvice {
290+
public static class RequestFutureMessageAdvice {
312291

313292
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
314293
public static CompletableFuture<Message> onEnter(
315294
@Advice.This Connection connection,
316-
@Advice.Argument(0) String subject,
317-
@Advice.Argument(1) byte[] body,
295+
@Advice.Argument(0) Message message,
318296
@Advice.Local("future") CompletableFuture<Message> future) {
319-
future = connection.request(NatsMessageWritableHeaders.create(subject, body));
297+
future = connection.request(message.getSubject(), message.getHeaders(), message.getData());
320298
return future;
321299
}
322300

@@ -330,16 +308,16 @@ public static CompletableFuture<Message> onExit(
330308
}
331309

332310
@SuppressWarnings("unused")
333-
public static class RequestTimeoutFutureHeadersBodyAdvice {
311+
public static class RequestTimeoutFutureBodyAdvice {
334312

335313
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
336314
public static CompletableFuture<Message> onEnter(
337315
@Advice.This Connection connection,
338316
@Advice.Argument(0) String subject,
339-
@Advice.Argument(1) Headers headers,
340-
@Advice.Argument(2) byte[] body,
317+
@Advice.Argument(1) byte[] body,
318+
@Advice.Argument(2) Duration timeout,
341319
@Advice.Local("future") CompletableFuture<Message> future) {
342-
future = connection.request(NatsMessageWritableHeaders.create(subject, headers, body));
320+
future = connection.requestWithTimeout(subject, null, body, timeout);
343321
return future;
344322
}
345323

@@ -353,19 +331,20 @@ public static CompletableFuture<Message> onExit(
353331
}
354332

355333
@SuppressWarnings("unused")
356-
public static class RequestTimeoutFutureMessageAdvice {
334+
public static class RequestTimeoutFutureHeadersBodyAdvice {
357335

358336
@Advice.OnMethodEnter(suppress = Throwable.class)
359337
public static void onEnter(
360338
@Advice.This Connection connection,
361-
@Advice.Argument(value = 0, readOnly = false) Message message,
339+
@Advice.Argument(0) String subject,
340+
@Advice.Argument(value = 1, readOnly = false) Headers headers,
341+
@Advice.Argument(2) byte[] body,
362342
@Advice.Local("otelContext") Context otelContext,
363343
@Advice.Local("otelParentContext") Context otelParentContext,
364344
@Advice.Local("otelScope") Scope otelScope,
365345
@Advice.Local("natsRequest") NatsRequest natsRequest) {
366-
message = NatsMessageWritableHeaders.create(message);
367-
368-
natsRequest = NatsRequest.create(connection, message);
346+
headers = NatsMessageWritableHeaders.create(headers);
347+
natsRequest = NatsRequest.create(connection, subject, null, headers, body);
369348
otelParentContext = Context.current();
370349

371350
if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) {
@@ -400,4 +379,28 @@ public static void onExit(
400379
}
401380
}
402381
}
382+
383+
@SuppressWarnings("unused")
384+
public static class RequestTimeoutFutureMessageAdvice {
385+
386+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
387+
public static CompletableFuture<Message> onEnter(
388+
@Advice.This Connection connection,
389+
@Advice.Argument(value = 0, readOnly = false) Message message,
390+
@Advice.Argument(1) Duration timeout,
391+
@Advice.Local("future") CompletableFuture<Message> future) {
392+
future =
393+
connection.requestWithTimeout(
394+
message.getSubject(), message.getHeaders(), message.getData(), timeout);
395+
return future;
396+
}
397+
398+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
399+
public static CompletableFuture<Message> onExit(
400+
@Advice.Return(readOnly = false) CompletableFuture<Message> messageFuture,
401+
@Advice.Local("future") CompletableFuture<Message> future) {
402+
messageFuture = future;
403+
return messageFuture;
404+
}
405+
}
403406
}

instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,14 @@
1616

1717
public final class NatsSingletons {
1818

19-
private static final boolean messagingReceiveInstrumentationEnabled =
20-
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
21-
2219
private static final List<String> capturedHeaders =
2320
ExperimentalConfig.get().getMessagingHeaders();
2421

2522
public static final Instrumenter<NatsRequest, NatsRequest> PRODUCER_INSTRUMENTER =
2623
createProducerInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders);
2724

2825
public static final Instrumenter<NatsRequest, Void> CONSUMER_PROCESS_INSTRUMENTER =
29-
createConsumerProcessInstrumenter(
30-
GlobalOpenTelemetry.get(), messagingReceiveInstrumentationEnabled, capturedHeaders);
26+
createConsumerProcessInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders);
3127

3228
private NatsSingletons() {}
3329
}

0 commit comments

Comments
 (0)