Skip to content

Commit 4260eff

Browse files
committed
Instrument NATS agent 'dispatcher' methods
1 parent 1552cd2 commit 4260eff

File tree

8 files changed

+212
-36
lines changed

8 files changed

+212
-36
lines changed

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/ConnectionPublishInstrumentation.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ public static void onEnter(
9898
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
9999
public static void onExit(
100100
@Advice.Thrown Throwable throwable,
101-
@Advice.Argument(0) String subject,
102-
@Advice.Argument(1) byte[] body,
103101
@Advice.Local("otelContext") Context otelContext,
104102
@Advice.Local("otelScope") Scope otelScope,
105103
@Advice.Local("natsRequest") NatsRequest natsRequest) {
@@ -138,9 +136,6 @@ public static void onEnter(
138136
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
139137
public static void onExit(
140138
@Advice.Thrown Throwable throwable,
141-
@Advice.Argument(0) String subject,
142-
@Advice.Argument(1) Headers headers,
143-
@Advice.Argument(2) byte[] body,
144139
@Advice.Local("otelContext") Context otelContext,
145140
@Advice.Local("otelScope") Scope otelScope,
146141
@Advice.Local("natsRequest") NatsRequest natsRequest) {
@@ -179,9 +174,6 @@ public static void onEnter(
179174
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
180175
public static void onExit(
181176
@Advice.Thrown Throwable throwable,
182-
@Advice.Argument(0) String subject,
183-
@Advice.Argument(1) String replyTo,
184-
@Advice.Argument(2) byte[] body,
185177
@Advice.Local("otelContext") Context otelContext,
186178
@Advice.Local("otelScope") Scope otelScope,
187179
@Advice.Local("natsRequest") NatsRequest natsRequest) {
@@ -221,10 +213,6 @@ public static void onEnter(
221213
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
222214
public static void onExit(
223215
@Advice.Thrown Throwable throwable,
224-
@Advice.Argument(0) String subject,
225-
@Advice.Argument(1) String replyTo,
226-
@Advice.Argument(2) Headers headers,
227-
@Advice.Argument(3) byte[] body,
228216
@Advice.Local("otelContext") Context otelContext,
229217
@Advice.Local("otelScope") Scope otelScope,
230218
@Advice.Local("natsRequest") NatsRequest natsRequest) {
@@ -261,8 +249,6 @@ public static void onEnter(
261249
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
262250
public static void onExit(
263251
@Advice.Thrown Throwable throwable,
264-
@Advice.This Connection connection,
265-
@Advice.Argument(0) Message message,
266252
@Advice.Local("otelContext") Context otelContext,
267253
@Advice.Local("otelScope") Scope otelScope,
268254
@Advice.Local("natsRequest") NatsRequest natsRequest) {

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/ConnectionSubscribeInstrumentation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public void transform(TypeTransformer transformer) {
4141
public static class SubscribeAdvice {
4242
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
4343
public static void onExit(
44-
@Advice.This Connection connection, @Advice.Return Subscription subscription) {
44+
@Advice.This Connection connection,
45+
@Advice.Return Subscription subscription
46+
) {
4547
NatsData.addSubscription(subscription, connection);
4648
}
4749
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
9+
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.CONSUMER_PROCESS_INSTRUMENTER;
10+
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.CONSUMER_RECEIVE_INSTRUMENTER;
11+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
12+
import static net.bytebuddy.matcher.ElementMatchers.named;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
14+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
15+
16+
import io.nats.client.Message;
17+
import io.opentelemetry.context.Context;
18+
import io.opentelemetry.context.Scope;
19+
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
20+
import io.opentelemetry.instrumentation.api.internal.Timer;
21+
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
22+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
23+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
24+
import net.bytebuddy.asm.Advice;
25+
import net.bytebuddy.description.type.TypeDescription;
26+
import net.bytebuddy.matcher.ElementMatcher;
27+
28+
public class MessageHandlerInstrumentation implements TypeInstrumentation {
29+
30+
@Override
31+
public ElementMatcher<TypeDescription> typeMatcher() {
32+
return implementsInterface(named("io.nats.client.MessageHandler"));
33+
}
34+
35+
@Override
36+
public void transform(TypeTransformer transformer) {
37+
transformer.applyAdviceToMethod(
38+
isPublic()
39+
.and(named("onMessage"))
40+
.and(takesArguments(1))
41+
.and(takesArgument(0, named("io.nats.client.Message"))),
42+
MessageHandlerInstrumentation.class.getName() + "$OnMessageAdvice");
43+
}
44+
45+
@SuppressWarnings("unused")
46+
public static class OnMessageAdvice {
47+
48+
@Advice.OnMethodEnter(suppress = Throwable.class)
49+
public static void onEnter(
50+
@Advice.Argument(0) Message message,
51+
@Advice.Local("otelContext") Context otelContext,
52+
@Advice.Local("otelScope") Scope otelScope,
53+
@Advice.Local("natsRequest") NatsRequest natsRequest
54+
) {
55+
Timer timer = Timer.start();
56+
57+
Context parentContext = Context.current();
58+
natsRequest = NatsRequest.create(message.getConnection(), message);
59+
60+
if (!CONSUMER_RECEIVE_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
61+
return;
62+
}
63+
64+
Context receiveContext = InstrumenterUtil.startAndEnd(CONSUMER_RECEIVE_INSTRUMENTER, parentContext, natsRequest, null,
65+
null, timer.startTime(), timer.now());
66+
67+
if (!CONSUMER_PROCESS_INSTRUMENTER.shouldStart(receiveContext, natsRequest)) {
68+
return;
69+
}
70+
71+
otelContext = CONSUMER_PROCESS_INSTRUMENTER.start(receiveContext, natsRequest);
72+
otelScope = otelContext.makeCurrent();
73+
}
74+
75+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
76+
public static void onExit(
77+
@Advice.Thrown Throwable throwable,
78+
@Advice.Local("otelContext") Context otelContext,
79+
@Advice.Local("otelScope") Scope otelScope,
80+
@Advice.Local("natsRequest") NatsRequest natsRequest
81+
) {
82+
if (otelScope == null) {
83+
return;
84+
}
85+
86+
otelScope.close();
87+
CONSUMER_PROCESS_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
88+
}
89+
}
90+
91+
}

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsInstrumentationModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
2626
new ConnectionSubscribeInstrumentation(),
2727
new ConnectionPublishInstrumentation(),
2828
new ConnectionRequestInstrumentation(),
29-
new SubscriptionInstrumentation());
29+
new SubscriptionInstrumentation(),
30+
new MessageHandlerInstrumentation());
3031
}
3132
}

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsSingletons.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
77

88
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createClientInstrumenter;
9+
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createConsumerProcessInstrumenter;
910
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createConsumerReceiveInstrumenter;
1011
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createProducerInstrumenter;
1112

@@ -18,9 +19,12 @@ public final class NatsSingletons {
1819
public static final Instrumenter<NatsRequest, Void> PRODUCER_INSTRUMENTER =
1920
createProducerInstrumenter(GlobalOpenTelemetry.get());
2021

21-
public static final Instrumenter<NatsRequest, Void> CONSUMER_INSTRUMENTER =
22+
public static final Instrumenter<NatsRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER =
2223
createConsumerReceiveInstrumenter(GlobalOpenTelemetry.get());
2324

25+
public static final Instrumenter<NatsRequest, Void> CONSUMER_PROCESS_INSTRUMENTER =
26+
createConsumerProcessInstrumenter(GlobalOpenTelemetry.get());
27+
2428
public static final Instrumenter<NatsRequest, NatsRequest> CLIENT_INSTRUMENTER =
2529
createClientInstrumenter(GlobalOpenTelemetry.get());
2630

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/SubscriptionInstrumentation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
77

88
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
9-
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.CONSUMER_INSTRUMENTER;
9+
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.CONSUMER_RECEIVE_INSTRUMENTER;
1010
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1111
import static net.bytebuddy.matcher.ElementMatchers.named;
1212
import static net.bytebuddy.matcher.ElementMatchers.returns;
@@ -81,12 +81,12 @@ public static void onExit(
8181
natsRequest = NatsRequest.create(connection, message);
8282
}
8383

84-
if (!CONSUMER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
84+
if (!CONSUMER_RECEIVE_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
8585
return;
8686
}
8787

8888
InstrumenterUtil.startAndEnd(
89-
CONSUMER_INSTRUMENTER,
89+
CONSUMER_RECEIVE_INSTRUMENTER,
9090
parentContext,
9191
natsRequest,
9292
null,

instrumentation/nats/nats-2.21/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsInstrumentationRequestTest.java

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
77

88
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
9+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
910
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
1011
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
1112
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
@@ -27,6 +28,7 @@
2728
import java.time.Duration;
2829
import java.util.concurrent.CancellationException;
2930
import java.util.concurrent.TimeoutException;
31+
import org.assertj.core.api.Condition;
3032
import org.junit.jupiter.api.AfterAll;
3133
import org.junit.jupiter.api.AfterEach;
3234
import org.junit.jupiter.api.BeforeAll;
@@ -122,7 +124,7 @@ void testRequestHeadersBody() throws InterruptedException {
122124
connection.closeDispatcher(dispatcher);
123125

124126
// then
125-
assertPublishSpan();
127+
assertPublishSpanSameTrace();
126128
assertTraceparentHeader();
127129
}
128130

@@ -159,7 +161,7 @@ void testRequestMessageHeaders() throws InterruptedException {
159161
connection.closeDispatcher(dispatcher);
160162

161163
// then
162-
assertPublishSpan();
164+
assertPublishSpanSameTrace();
163165
assertTraceparentHeader();
164166
}
165167

@@ -195,7 +197,7 @@ void testRequestFutureHeadersBody() throws InterruptedException {
195197
.whenComplete((m, e) -> connection.closeDispatcher(dispatcher));
196198

197199
// then
198-
assertPublishSpan();
200+
assertPublishSpanSameTrace();
199201
assertTraceparentHeader();
200202
}
201203

@@ -234,7 +236,7 @@ void testRequestFutureMessageHeaders() throws InterruptedException {
234236
.whenComplete((m, e) -> connection.closeDispatcher(dispatcher));
235237

236238
// then
237-
assertPublishSpan();
239+
assertPublishSpanSameTrace();
238240
assertTraceparentHeader();
239241
}
240242

@@ -309,9 +311,106 @@ private static void assertPublishSpan() {
309311
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
310312
equalTo(
311313
AttributeKey.stringKey("messaging.client_id"),
312-
String.valueOf(clientId)))),
313-
// dispatcher publish
314-
trace -> trace.hasSpansSatisfyingExactly(span -> span.hasKind(SpanKind.PRODUCER)));
314+
String.valueOf(clientId))),
315+
span -> span
316+
.has(new Condition<>(
317+
data -> data.getName().startsWith("_INBOX.") && data.getName()
318+
.endsWith(" receive"), "Name condition"))
319+
.hasKind(SpanKind.CONSUMER)
320+
.hasParent(trace.getSpan(1))
321+
.hasAttributesSatisfyingExactly(
322+
equalTo(MESSAGING_OPERATION, "receive"),
323+
equalTo(MESSAGING_SYSTEM, "nats"),
324+
satisfies(MESSAGING_DESTINATION_NAME, name -> name.startsWith("_INBOX.")),
325+
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
326+
equalTo(
327+
AttributeKey.stringKey("messaging.client_id"),
328+
String.valueOf(clientId))),
329+
span -> span
330+
.has(new Condition<>(
331+
data -> data.getName().startsWith("_INBOX.") && data.getName()
332+
.endsWith(" process"), "Name condition"))
333+
.hasKind(SpanKind.INTERNAL)
334+
.hasParent(trace.getSpan(2))
335+
.hasAttributesSatisfyingExactly(
336+
equalTo(MESSAGING_OPERATION, "process"),
337+
equalTo(MESSAGING_SYSTEM, "nats"),
338+
satisfies(MESSAGING_DESTINATION_NAME, name -> name.startsWith("_INBOX.")),
339+
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
340+
equalTo(
341+
AttributeKey.stringKey("messaging.client_id"),
342+
String.valueOf(clientId)))),
343+
// dispatcher receive, process, publish, not retesting all properties
344+
trace -> trace.hasSpansSatisfyingExactly(
345+
span -> span.hasName("sub receive").hasKind(SpanKind.CONSUMER).hasNoParent(),
346+
span -> span.hasName("sub process").hasKind(SpanKind.INTERNAL)
347+
.hasParent(trace.getSpan(0)),
348+
span -> span
349+
.has(new Condition<>(
350+
data -> data.getName().startsWith("_INBOX.") && data.getName()
351+
.endsWith(" publish"), "Name condition"))
352+
.hasKind(SpanKind.PRODUCER).hasParent(trace.getSpan(1))
353+
));
354+
}
355+
356+
private static void assertPublishSpanSameTrace() {
357+
testing.waitAndAssertTraces(
358+
trace ->
359+
trace.hasSpansSatisfyingExactly(
360+
span -> span.hasName("parent").hasNoParent(),
361+
span ->
362+
span.hasName("sub publish")
363+
.hasKind(SpanKind.CLIENT)
364+
.hasParent(trace.getSpan(0))
365+
.hasAttributesSatisfyingExactly(
366+
equalTo(MESSAGING_OPERATION, "publish"),
367+
equalTo(MESSAGING_SYSTEM, "nats"),
368+
equalTo(MESSAGING_DESTINATION_NAME, "sub"),
369+
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
370+
equalTo(
371+
AttributeKey.stringKey("messaging.client_id"),
372+
String.valueOf(clientId))),
373+
374+
// dispatcher receive, process, publish, not retesting all properties
375+
span -> span.hasName("sub receive").hasKind(SpanKind.CONSUMER)
376+
.hasParent(trace.getSpan(1)),
377+
span -> span.hasName("sub process").hasKind(SpanKind.INTERNAL)
378+
.hasParent(trace.getSpan(2)),
379+
span -> span
380+
.has(new Condition<>(
381+
data -> data.getName().startsWith("_INBOX.") && data.getName()
382+
.endsWith(" publish"), "Name condition"))
383+
.hasKind(SpanKind.PRODUCER).hasParent(trace.getSpan(3)),
384+
// end dispatcher
385+
386+
span -> span
387+
.has(new Condition<>(
388+
data -> data.getName().startsWith("_INBOX.") && data.getName()
389+
.endsWith(" receive"), "Name condition"))
390+
.hasKind(SpanKind.CONSUMER)
391+
.hasParent(trace.getSpan(1))
392+
.hasAttributesSatisfyingExactly(
393+
equalTo(MESSAGING_OPERATION, "receive"),
394+
equalTo(MESSAGING_SYSTEM, "nats"),
395+
satisfies(MESSAGING_DESTINATION_NAME, name -> name.startsWith("_INBOX.")),
396+
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
397+
equalTo(
398+
AttributeKey.stringKey("messaging.client_id"),
399+
String.valueOf(clientId))),
400+
span -> span
401+
.has(new Condition<>(
402+
data -> data.getName().startsWith("_INBOX.") && data.getName()
403+
.endsWith(" process"), "Name condition"))
404+
.hasKind(SpanKind.INTERNAL)
405+
.hasParent(trace.getSpan(5))
406+
.hasAttributesSatisfyingExactly(
407+
equalTo(MESSAGING_OPERATION, "process"),
408+
equalTo(MESSAGING_SYSTEM, "nats"),
409+
satisfies(MESSAGING_DESTINATION_NAME, name -> name.startsWith("_INBOX.")),
410+
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
411+
equalTo(
412+
AttributeKey.stringKey("messaging.client_id"),
413+
String.valueOf(clientId)))));
315414
}
316415

317416
private static void assertTimeoutPublishSpan() {

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/OpenTelemetryMessageHandler.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,8 @@ public void onMessage(Message message) throws InterruptedException {
4545
return;
4646
}
4747

48-
Context receiveContext =
49-
InstrumenterUtil.startAndEnd(
50-
consumerReceiveInstrumenter,
51-
parentContext,
52-
natsRequest,
53-
null,
54-
null,
55-
timer.startTime(),
56-
timer.now());
48+
Context receiveContext = InstrumenterUtil.startAndEnd(consumerReceiveInstrumenter,
49+
parentContext, natsRequest, null, null, timer.startTime(), timer.now());
5750

5851
if (!consumerProcessInstrumenter.shouldStart(receiveContext, natsRequest)) {
5952
delegate.onMessage(message);

0 commit comments

Comments
 (0)