Skip to content

Commit a20de0f

Browse files
committed
Instrument NATS library 'dispatcher/subscribe' methods
1 parent 97b6cac commit a20de0f

File tree

15 files changed

+752
-75
lines changed

15 files changed

+752
-75
lines changed

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/ConnectionRequestInstrumentation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ public void transform(TypeTransformer transformer) {
107107
.and(takesArgument(2, byte[].class))
108108
.and(takesArgument(3, Duration.class))
109109
.and(returns(named("java.util.concurrent.CompletableFuture"))),
110-
ConnectionRequestInstrumentation.class.getName() + "$RequestFutureTimeoutBodyHeadersAdvice");
110+
ConnectionRequestInstrumentation.class.getName()
111+
+ "$RequestFutureTimeoutBodyHeadersAdvice");
111112
transformer.applyAdviceToMethod(
112113
isPublic()
113114
.and(named("requestWithTimeout"))

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsSingletons.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
77

88
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createClientInstrumenter;
9-
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createConsumerInstrumenter;
9+
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createConsumerReceiveInstrumenter;
1010
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createProducerInstrumenter;
1111

1212
import io.opentelemetry.api.GlobalOpenTelemetry;
@@ -19,7 +19,7 @@ public final class NatsSingletons {
1919
createProducerInstrumenter(GlobalOpenTelemetry.get());
2020

2121
public static final Instrumenter<NatsRequest, Void> CONSUMER_INSTRUMENTER =
22-
createConsumerInstrumenter(GlobalOpenTelemetry.get());
22+
createConsumerReceiveInstrumenter(GlobalOpenTelemetry.get());
2323

2424
public static final Instrumenter<NatsRequest, NatsRequest> CLIENT_INSTRUMENTER =
2525
createClientInstrumenter(GlobalOpenTelemetry.get());

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/internal/MessageConsumer.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,12 @@ public void accept(Message message, Throwable throwable) {
4343
}
4444
}
4545

46-
47-
/*messageFuture = messageFuture.whenComplete((message, exception) -> {
48-
if (message != null) {
49-
NatsRequest response = NatsRequest.create(connection, message);
50-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, response, throwable);
51-
} else {
52-
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, null, exception);
53-
}
54-
});*/
46+
/*messageFuture = messageFuture.whenComplete((message, exception) -> {
47+
if (message != null) {
48+
NatsRequest response = NatsRequest.create(connection, message);
49+
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, response, throwable);
50+
} else {
51+
CLIENT_INSTRUMENTER.end(otelContext, natsRequest, null, exception);
52+
}
53+
});*/
5554
}

instrumentation/nats/nats-2.21/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsInstrumentationRequestTest.java

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ void afterEach() throws InterruptedException, TimeoutException {
7878
connection.close();
7979
}
8080

81-
8281
@Test
8382
void testRequestTimeout() throws InterruptedException {
8483
// when
@@ -107,7 +106,9 @@ void testRequestFutureTimeoutBodyWithHeaders() throws InterruptedException {
107106
// when
108107
testing.runWithSpan(
109108
"parent",
110-
() -> connection.requestWithTimeout("sub", new Headers(), new byte[] {0}, Duration.ofSeconds(1)));
109+
() ->
110+
connection.requestWithTimeout(
111+
"sub", new Headers(), new byte[] {0}, Duration.ofSeconds(1)));
111112

112113
// then
113114
assertCancellationPublishSpan();
@@ -120,7 +121,8 @@ void testRequestFutureTimeoutMessageNoHeaders() throws InterruptedException {
120121
NatsMessage message = NatsMessage.builder().subject("sub").data("x").build();
121122

122123
// when
123-
testing.runWithSpan( "parent", () -> connection.requestWithTimeout(message, Duration.ofSeconds(1)));
124+
testing.runWithSpan(
125+
"parent", () -> connection.requestWithTimeout(message, Duration.ofSeconds(1)));
124126

125127
// then
126128
assertCancellationPublishSpan();
@@ -130,10 +132,12 @@ void testRequestFutureTimeoutMessageNoHeaders() throws InterruptedException {
130132
@Test
131133
void testRequestFutureTimeoutMessageWithHeaders() throws InterruptedException {
132134
// given
133-
NatsMessage message = NatsMessage.builder().subject("sub").headers(new Headers()).data("x").build();
135+
NatsMessage message =
136+
NatsMessage.builder().subject("sub").headers(new Headers()).data("x").build();
134137

135138
// when
136-
testing.runWithSpan( "parent", () -> connection.requestWithTimeout(message, Duration.ofSeconds(1)));
139+
testing.runWithSpan(
140+
"parent", () -> connection.requestWithTimeout(message, Duration.ofSeconds(1)));
137141

138142
// then
139143
assertCancellationPublishSpan();
@@ -143,12 +147,14 @@ void testRequestFutureTimeoutMessageWithHeaders() throws InterruptedException {
143147
@Test
144148
void testRequestBodyNoHeaders() throws InterruptedException {
145149
// given
146-
Dispatcher dispatcher = connection.createDispatcher(
147-
m -> connection.publish(m.getReplyTo(), m.getData())).subscribe("sub");
150+
Dispatcher dispatcher =
151+
connection
152+
.createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData()))
153+
.subscribe("sub");
148154

149155
// when
150-
testing.runWithSpan("parent",
151-
() -> connection.request("sub", new byte[] {0}, Duration.ofSeconds(1)));
156+
testing.runWithSpan(
157+
"parent", () -> connection.request("sub", new byte[] {0}, Duration.ofSeconds(1)));
152158
connection.closeDispatcher(dispatcher);
153159

154160
// then
@@ -159,11 +165,14 @@ void testRequestBodyNoHeaders() throws InterruptedException {
159165
@Test
160166
void testRequestFutureBodyNoHeaders() throws InterruptedException {
161167
// given
162-
Dispatcher dispatcher = connection.createDispatcher(
163-
m -> connection.publish(m.getReplyTo(), m.getData())).subscribe("sub");
168+
Dispatcher dispatcher =
169+
connection
170+
.createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData()))
171+
.subscribe("sub");
164172

165173
// when
166-
testing.runWithSpan("parent", () -> connection.request("sub", new byte[] {0}))
174+
testing
175+
.runWithSpan("parent", () -> connection.request("sub", new byte[] {0}))
167176
.whenComplete((m, e) -> connection.closeDispatcher(dispatcher));
168177

169178
// then
@@ -174,11 +183,14 @@ void testRequestFutureBodyNoHeaders() throws InterruptedException {
174183
@Test
175184
void testRequestBodyWithHeaders() throws InterruptedException {
176185
// given
177-
Dispatcher dispatcher = connection.createDispatcher(
178-
m -> connection.publish(m.getReplyTo(), m.getData())).subscribe("sub");
186+
Dispatcher dispatcher =
187+
connection
188+
.createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData()))
189+
.subscribe("sub");
179190

180191
// when
181-
testing.runWithSpan("parent",
192+
testing.runWithSpan(
193+
"parent",
182194
() -> connection.request("sub", new Headers(), new byte[] {0}, Duration.ofSeconds(1)));
183195
connection.closeDispatcher(dispatcher);
184196

@@ -190,11 +202,14 @@ void testRequestBodyWithHeaders() throws InterruptedException {
190202
@Test
191203
void testRequestFutureBodyWithHeaders() throws InterruptedException {
192204
// given
193-
Dispatcher dispatcher = connection.createDispatcher(
194-
m -> connection.publish(m.getReplyTo(), m.getData())).subscribe("sub");
205+
Dispatcher dispatcher =
206+
connection
207+
.createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData()))
208+
.subscribe("sub");
195209

196210
// when
197-
testing.runWithSpan("parent", () -> connection.request("sub", new Headers(), new byte[] {0}))
211+
testing
212+
.runWithSpan("parent", () -> connection.request("sub", new Headers(), new byte[] {0}))
198213
.whenComplete((m, e) -> connection.closeDispatcher(dispatcher));
199214

200215
// then
@@ -205,8 +220,10 @@ void testRequestFutureBodyWithHeaders() throws InterruptedException {
205220
@Test
206221
void testRequestMessageNoHeaders() throws InterruptedException {
207222
// given
208-
Dispatcher dispatcher = connection.createDispatcher(
209-
m -> connection.publish(m.getReplyTo(), m.getData())).subscribe("sub");
223+
Dispatcher dispatcher =
224+
connection
225+
.createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData()))
226+
.subscribe("sub");
210227
NatsMessage message = NatsMessage.builder().subject("sub").data("x").build();
211228

212229
// when
@@ -221,12 +238,15 @@ void testRequestMessageNoHeaders() throws InterruptedException {
221238
@Test
222239
void testRequestFutureMessageNoHeaders() throws InterruptedException {
223240
// given
224-
Dispatcher dispatcher = connection.createDispatcher(
225-
m -> connection.publish(m.getReplyTo(), m.getData())).subscribe("sub");
241+
Dispatcher dispatcher =
242+
connection
243+
.createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData()))
244+
.subscribe("sub");
226245
NatsMessage message = NatsMessage.builder().subject("sub").data("x").build();
227246

228247
// when
229-
testing.runWithSpan("parent", () -> connection.request(message))
248+
testing
249+
.runWithSpan("parent", () -> connection.request(message))
230250
.whenComplete((m, e) -> connection.closeDispatcher(dispatcher));
231251

232252
// then
@@ -237,8 +257,10 @@ void testRequestFutureMessageNoHeaders() throws InterruptedException {
237257
@Test
238258
void testRequestMessageWithHeaders() throws InterruptedException {
239259
// given
240-
Dispatcher dispatcher = connection.createDispatcher(
241-
m -> connection.publish(m.getReplyTo(), m.getData())).subscribe("sub");
260+
Dispatcher dispatcher =
261+
connection
262+
.createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData()))
263+
.subscribe("sub");
242264
NatsMessage message =
243265
NatsMessage.builder().subject("sub").headers(new Headers()).data("x").build();
244266

@@ -254,13 +276,16 @@ void testRequestMessageWithHeaders() throws InterruptedException {
254276
@Test
255277
void testRequestFutureMessageWithHeaders() throws InterruptedException {
256278
// given
257-
Dispatcher dispatcher = connection.createDispatcher(
258-
m -> connection.publish(m.getReplyTo(), m.getData())).subscribe("sub");
279+
Dispatcher dispatcher =
280+
connection
281+
.createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData()))
282+
.subscribe("sub");
259283
NatsMessage message =
260284
NatsMessage.builder().subject("sub").headers(new Headers()).data("x").build();
261285

262286
// when
263-
testing.runWithSpan("parent", () -> connection.request(message))
287+
testing
288+
.runWithSpan("parent", () -> connection.request(message))
264289
.whenComplete((m, e) -> connection.closeDispatcher(dispatcher));
265290

266291
// then
@@ -318,7 +343,9 @@ private static void assertCancellationPublishSpan() {
318343
span.hasName("sub publish")
319344
.hasKind(SpanKind.CLIENT)
320345
.hasParent(trace.getSpan(0))
321-
.hasException(new CancellationException("Future cancelled, response not registered in time, check connection status."))
346+
.hasException(
347+
new CancellationException(
348+
"Future cancelled, response not registered in time, check connection status."))
322349
.hasAttributesSatisfyingExactly(
323350
equalTo(MESSAGING_OPERATION, "publish"),
324351
equalTo(MESSAGING_SYSTEM, "nats"),

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/NatsTelemetry.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,27 @@ public static NatsTelemetryBuilder builder(OpenTelemetry openTelemetry) {
2121
}
2222

2323
private final Instrumenter<NatsRequest, Void> producerInstrumenter;
24-
private final Instrumenter<NatsRequest, Void> consumerInstrumenter;
24+
private final Instrumenter<NatsRequest, Void> consumerReceiveInstrumenter;
25+
private final Instrumenter<NatsRequest, Void> consumerProcessInstrumenter;
2526
private final Instrumenter<NatsRequest, NatsRequest> clientInstrumenter;
2627

2728
public NatsTelemetry(
2829
Instrumenter<NatsRequest, Void> producerInstrumenter,
29-
Instrumenter<NatsRequest, Void> consumerInstrumenter,
30+
Instrumenter<NatsRequest, Void> consumerReceiveInstrumenter,
31+
Instrumenter<NatsRequest, Void> consumerProcessInstrumenter,
3032
Instrumenter<NatsRequest, NatsRequest> clientInstrumenter) {
3133
this.producerInstrumenter = producerInstrumenter;
32-
this.consumerInstrumenter = consumerInstrumenter;
34+
this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
35+
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
3336
this.clientInstrumenter = clientInstrumenter;
3437
}
3538

3639
public OpenTelemetryConnection wrap(Connection connection) {
3740
return new OpenTelemetryConnection(
38-
connection, this.producerInstrumenter, this.consumerInstrumenter, this.clientInstrumenter);
41+
connection,
42+
producerInstrumenter,
43+
consumerReceiveInstrumenter,
44+
consumerProcessInstrumenter,
45+
clientInstrumenter);
3946
}
4047
}

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/NatsTelemetryBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ public final class NatsTelemetryBuilder {
1919
public NatsTelemetry build() {
2020
return new NatsTelemetry(
2121
NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry),
22-
NatsInstrumenterFactory.createConsumerInstrumenter(openTelemetry),
22+
NatsInstrumenterFactory.createConsumerReceiveInstrumenter(openTelemetry),
23+
NatsInstrumenterFactory.createConsumerProcessInstrumenter(openTelemetry),
2324
NatsInstrumenterFactory.createClientInstrumenter(openTelemetry));
2425
}
2526
}

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/OpenTelemetryConnection.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,20 @@ public class OpenTelemetryConnection implements Connection {
4646

4747
private final Connection delegate;
4848
private final Instrumenter<NatsRequest, Void> producerInstrumenter;
49-
private final Instrumenter<NatsRequest, Void> consumerInstrumenter;
49+
private final Instrumenter<NatsRequest, Void> consumerReceiveInstrumenter;
50+
private final Instrumenter<NatsRequest, Void> consumerProcessInstrumenter;
5051
private final Instrumenter<NatsRequest, NatsRequest> clientInstrumenter;
5152

5253
public OpenTelemetryConnection(
5354
Connection connection,
5455
Instrumenter<NatsRequest, Void> producerInstrumenter,
55-
Instrumenter<NatsRequest, Void> consumerInstrumenter,
56+
Instrumenter<NatsRequest, Void> consumerReceiveInstrumenter,
57+
Instrumenter<NatsRequest, Void> consumerProcessInstrumenter,
5658
Instrumenter<NatsRequest, NatsRequest> clientInstrumenter) {
5759
this.delegate = connection;
5860
this.producerInstrumenter = producerInstrumenter;
59-
this.consumerInstrumenter = consumerInstrumenter;
61+
this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
62+
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
6063
this.clientInstrumenter = clientInstrumenter;
6164
}
6265

@@ -153,23 +156,34 @@ public CompletableFuture<Message> requestWithTimeout(Message message, Duration t
153156
@Override
154157
public Subscription subscribe(String subject) {
155158
return new OpenTelemetrySubscription(
156-
this, delegate.subscribe(subject), this.consumerInstrumenter);
159+
this, delegate.subscribe(subject), this.consumerReceiveInstrumenter);
157160
}
158161

159162
@Override
160163
public Subscription subscribe(String subject, String queueName) {
161164
return new OpenTelemetrySubscription(
162-
this, delegate.subscribe(subject, queueName), this.consumerInstrumenter);
165+
this, delegate.subscribe(subject, queueName), this.consumerReceiveInstrumenter);
163166
}
164167

165168
@Override
166169
public Dispatcher createDispatcher(MessageHandler messageHandler) {
167-
return delegate.createDispatcher(messageHandler);
170+
OpenTelemetryMessageHandler otelHandler =
171+
new OpenTelemetryMessageHandler(
172+
this, messageHandler, consumerReceiveInstrumenter, consumerProcessInstrumenter);
173+
return new OpenTelemetryDispatcher(
174+
this,
175+
delegate.createDispatcher(otelHandler),
176+
consumerReceiveInstrumenter,
177+
consumerProcessInstrumenter);
168178
}
169179

170180
@Override
171181
public Dispatcher createDispatcher() {
172-
return delegate.createDispatcher();
182+
return new OpenTelemetryDispatcher(
183+
this,
184+
delegate.createDispatcher(),
185+
consumerReceiveInstrumenter,
186+
consumerProcessInstrumenter);
173187
}
174188

175189
@Override

0 commit comments

Comments
 (0)