Skip to content

Commit df2b1fa

Browse files
committed
cover all publish methods with tests
1 parent 396cada commit df2b1fa

File tree

5 files changed

+433
-145
lines changed

5 files changed

+433
-145
lines changed

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/ConnectionInstrumentation.java

Lines changed: 0 additions & 80 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
9+
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.PRODUCER_INSTRUMENTER;
10+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
11+
import static net.bytebuddy.matcher.ElementMatchers.named;
12+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
14+
15+
import io.nats.client.Connection;
16+
import io.nats.client.Message;
17+
import io.nats.client.impl.Headers;
18+
import io.opentelemetry.context.Context;
19+
import io.opentelemetry.context.Scope;
20+
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
21+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
22+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
23+
import net.bytebuddy.asm.Advice;
24+
import net.bytebuddy.description.type.TypeDescription;
25+
import net.bytebuddy.matcher.ElementMatcher;
26+
27+
public class ConnectionPublishInstrumentation implements TypeInstrumentation {
28+
29+
@Override
30+
public ElementMatcher<TypeDescription> typeMatcher() {
31+
return implementsInterface(named("io.nats.client.Connection"));
32+
}
33+
34+
@Override
35+
public void transform(TypeTransformer transformer) {
36+
transformer.applyAdviceToMethod(
37+
isPublic()
38+
.and(named("publish"))
39+
.and(takesArguments(2))
40+
.and(takesArgument(0, String.class))
41+
.and(takesArgument(1, byte[].class)),
42+
ConnectionPublishInstrumentation.class.getName() + "$PublishBodyAdvice");
43+
transformer.applyAdviceToMethod(
44+
isPublic()
45+
.and(named("publish"))
46+
.and(takesArguments(3))
47+
.and(takesArgument(0, String.class))
48+
.and(takesArgument(1, named("io.nats.client.impl.Headers")))
49+
.and(takesArgument(2, byte[].class)),
50+
ConnectionPublishInstrumentation.class.getName() + "$PublishBodyHeadersAdvice");
51+
transformer.applyAdviceToMethod(
52+
isPublic()
53+
.and(named("publish"))
54+
.and(takesArguments(3))
55+
.and(takesArgument(0, String.class))
56+
.and(takesArgument(1, String.class))
57+
.and(takesArgument(2, byte[].class)),
58+
ConnectionPublishInstrumentation.class.getName() + "$PublishBodyReplyToAdvice");
59+
transformer.applyAdviceToMethod(
60+
isPublic()
61+
.and(named("publish"))
62+
.and(takesArguments(4))
63+
.and(takesArgument(0, String.class))
64+
.and(takesArgument(1, String.class))
65+
.and(takesArgument(2, named("io.nats.client.impl.Headers")))
66+
.and(takesArgument(3, byte[].class)),
67+
ConnectionPublishInstrumentation.class.getName() + "$PublishBodyReplyToHeadersAdvice");
68+
transformer.applyAdviceToMethod(
69+
isPublic()
70+
.and(named("publish"))
71+
.and(takesArguments(1))
72+
.and(takesArgument(0, named("io.nats.client.Message"))),
73+
ConnectionPublishInstrumentation.class.getName() + "$PublishMessageAdvice");
74+
}
75+
76+
@SuppressWarnings("unused")
77+
public static class PublishBodyAdvice {
78+
79+
@Advice.OnMethodEnter(suppress = Throwable.class)
80+
public static void onEnter(
81+
@Advice.This Connection connection,
82+
@Advice.Argument(0) String subject,
83+
@Advice.Argument(1) byte[] body,
84+
@Advice.Local("otelContext") Context otelContext,
85+
@Advice.Local("otelScope") Scope otelScope,
86+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
87+
Context parentContext = Context.current();
88+
natsRequest = NatsRequest.create(connection, subject, body);
89+
90+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
91+
return;
92+
}
93+
94+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
95+
otelScope = otelContext.makeCurrent();
96+
}
97+
98+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
99+
public static void onExit(
100+
@Advice.Thrown Throwable throwable,
101+
@Advice.Argument(0) String subject,
102+
@Advice.Argument(1) byte[] body,
103+
@Advice.Local("otelContext") Context otelContext,
104+
@Advice.Local("otelScope") Scope otelScope,
105+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
106+
if (otelScope == null) {
107+
return;
108+
}
109+
110+
otelScope.close();
111+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
112+
}
113+
}
114+
115+
@SuppressWarnings("unused")
116+
public static class PublishBodyHeadersAdvice {
117+
118+
@Advice.OnMethodEnter(suppress = Throwable.class)
119+
public static void onEnter(
120+
@Advice.This Connection connection,
121+
@Advice.Argument(0) String subject,
122+
@Advice.Argument(1) Headers headers,
123+
@Advice.Argument(2) byte[] body,
124+
@Advice.Local("otelContext") Context otelContext,
125+
@Advice.Local("otelScope") Scope otelScope,
126+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
127+
Context parentContext = Context.current();
128+
natsRequest = NatsRequest.create(connection, subject, headers, body);
129+
130+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
131+
return;
132+
}
133+
134+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
135+
otelScope = otelContext.makeCurrent();
136+
}
137+
138+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
139+
public static void onExit(
140+
@Advice.Thrown Throwable throwable,
141+
@Advice.Argument(0) String subject,
142+
@Advice.Argument(1) Headers headers,
143+
@Advice.Argument(2) byte[] body,
144+
@Advice.Local("otelContext") Context otelContext,
145+
@Advice.Local("otelScope") Scope otelScope,
146+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
147+
if (otelScope == null) {
148+
return;
149+
}
150+
151+
otelScope.close();
152+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
153+
}
154+
}
155+
156+
@SuppressWarnings("unused")
157+
public static class PublishBodyReplyToAdvice {
158+
159+
@Advice.OnMethodEnter(suppress = Throwable.class)
160+
public static void onEnter(
161+
@Advice.This Connection connection,
162+
@Advice.Argument(0) String subject,
163+
@Advice.Argument(1) String replyTo,
164+
@Advice.Argument(2) byte[] body,
165+
@Advice.Local("otelContext") Context otelContext,
166+
@Advice.Local("otelScope") Scope otelScope,
167+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
168+
Context parentContext = Context.current();
169+
natsRequest = NatsRequest.create(connection, subject, body);
170+
171+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
172+
return;
173+
}
174+
175+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
176+
otelScope = otelContext.makeCurrent();
177+
}
178+
179+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
180+
public static void onExit(
181+
@Advice.Thrown Throwable throwable,
182+
@Advice.Argument(0) String subject,
183+
@Advice.Argument(1) String replyTo,
184+
@Advice.Argument(2) byte[] body,
185+
@Advice.Local("otelContext") Context otelContext,
186+
@Advice.Local("otelScope") Scope otelScope,
187+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
188+
if (otelScope == null) {
189+
return;
190+
}
191+
192+
otelScope.close();
193+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
194+
}
195+
}
196+
197+
@SuppressWarnings("unused")
198+
public static class PublishBodyReplyToHeadersAdvice {
199+
200+
@Advice.OnMethodEnter(suppress = Throwable.class)
201+
public static void onEnter(
202+
@Advice.This Connection connection,
203+
@Advice.Argument(0) String subject,
204+
@Advice.Argument(1) String replyTo,
205+
@Advice.Argument(2) Headers headers,
206+
@Advice.Argument(3) byte[] body,
207+
@Advice.Local("otelContext") Context otelContext,
208+
@Advice.Local("otelScope") Scope otelScope,
209+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
210+
Context parentContext = Context.current();
211+
natsRequest = NatsRequest.create(connection, subject, headers, body);
212+
213+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
214+
return;
215+
}
216+
217+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
218+
otelScope = otelContext.makeCurrent();
219+
}
220+
221+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
222+
public static void onExit(
223+
@Advice.Thrown Throwable throwable,
224+
@Advice.Argument(0) String subject,
225+
@Advice.Argument(1) String replyTo,
226+
@Advice.Argument(2) Headers headers,
227+
@Advice.Argument(3) byte[] body,
228+
@Advice.Local("otelContext") Context otelContext,
229+
@Advice.Local("otelScope") Scope otelScope,
230+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
231+
if (otelScope == null) {
232+
return;
233+
}
234+
235+
otelScope.close();
236+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
237+
}
238+
}
239+
240+
@SuppressWarnings("unused")
241+
public static class PublishMessageAdvice {
242+
243+
@Advice.OnMethodEnter(suppress = Throwable.class)
244+
public static void onEnter(
245+
@Advice.This Connection connection,
246+
@Advice.Argument(0) Message message,
247+
@Advice.Local("otelContext") Context otelContext,
248+
@Advice.Local("otelScope") Scope otelScope,
249+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
250+
Context parentContext = Context.current();
251+
natsRequest = NatsRequest.create(connection, message);
252+
253+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
254+
return;
255+
}
256+
257+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
258+
otelScope = otelContext.makeCurrent();
259+
}
260+
261+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
262+
public static void onExit(
263+
@Advice.Thrown Throwable throwable,
264+
@Advice.This Connection connection,
265+
@Advice.Argument(0) Message message,
266+
@Advice.Local("otelContext") Context otelContext,
267+
@Advice.Local("otelScope") Scope otelScope,
268+
@Advice.Local("natsRequest") NatsRequest natsRequest) {
269+
if (otelScope == null) {
270+
return;
271+
}
272+
273+
otelScope.close();
274+
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
275+
}
276+
}
277+
}

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsInstrumentationModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,6 @@ public NatsInstrumentationModule() {
2222

2323
@Override
2424
public List<TypeInstrumentation> typeInstrumentations() {
25-
return Collections.singletonList(new ConnectionInstrumentation());
25+
return Collections.singletonList(new ConnectionPublishInstrumentation());
2626
}
2727
}

0 commit comments

Comments
 (0)