Skip to content

Commit a2d15b4

Browse files
committed
review fixes
1 parent 84ee013 commit a2d15b4

File tree

12 files changed

+86
-46
lines changed

12 files changed

+86
-46
lines changed

docs/supported-libraries.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ These are the supported libraries and frameworks:
105105
| [Micrometer](https://micrometer.io/) | 1.5+ (disabled by default) | [opentelemetry-micrometer-1.5](../instrumentation/micrometer/micrometer-1.5/library) | none |
106106
| [MongoDB Driver](https://mongodb.github.io/mongo-java-driver/) | 3.1+ | [opentelemetry-mongo-3.1](../instrumentation/mongo/mongo-3.1/library) | [Database Client Spans], [Database Client Metrics] [6] |
107107
| [MyBatis](https://mybatis.org/mybatis-3/) | 3.2+ | N/A | none |
108-
| [NATS](https://github.com/nats-io/nats.java) | 2.17.7+ | [nats-2.17](../instrumentation/nats/nats-2.17/library) | [Messaging Spans] |
108+
| [NATS](https://github.com/nats-io/nats.java) | 2.17.2+ | [nats-2.17](../instrumentation/nats/nats-2.17/library) | [Messaging Spans] |
109109
| [Netty HTTP codec [5]](https://github.com/netty/netty) | 3.8+ | [opentelemetry-netty-4.1](../instrumentation/netty/netty-4.1/library) | [HTTP Client Spans], [HTTP Client Metrics], [HTTP Server Spans], [HTTP Server Metrics] |
110110
| [OpenAI Java SDK](https://github.com/openai/openai-java) | 1.1+ | [openai-java-1.1](../instrumentation/openai/openai-java-1.1/library) | [GenAI Client Spans], [GenAI Client Metrics] |
111111
| [OpenSearch Rest Client](https://github.com/opensearch-project/opensearch-java) | 1.0+ | | [Database Client Spans], [Database Client Metrics] [6] |

instrumentation/nats/nats-2.17/javaagent/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ muzzle {
1616
}
1717

1818
dependencies {
19-
library("io.nats:jnats:2.17.7")
19+
library("io.nats:jnats:2.17.2")
2020

2121
implementation(project(":instrumentation:nats:nats-2.17:library"))
2222
testImplementation(project(":instrumentation:nats:nats-2.17:testing"))

instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionPublishInstrumentation.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public static boolean onEnter(
8181
@Advice.This Connection connection,
8282
@Advice.Argument(0) String subject,
8383
@Advice.Argument(1) byte[] body) {
84+
// call the instrumented publish method
8485
connection.publish(subject, null, null, body);
8586
return true;
8687
}
@@ -94,6 +95,7 @@ public static boolean onEnter(
9495
@Advice.Argument(0) String subject,
9596
@Advice.Argument(1) Headers headers,
9697
@Advice.Argument(2) byte[] body) {
98+
// call the instrumented publish method
9799
connection.publish(subject, null, headers, body);
98100
return true;
99101
}
@@ -107,6 +109,7 @@ public static boolean onEnter(
107109
@Advice.Argument(0) String subject,
108110
@Advice.Argument(1) String replyTo,
109111
@Advice.Argument(2) byte[] body) {
112+
// call the instrumented publish method
110113
connection.publish(subject, replyTo, null, body);
111114
return true;
112115
}
@@ -158,6 +161,11 @@ public static class PublishMessageAdvice {
158161
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
159162
public static boolean onEnter(
160163
@Advice.This Connection connection, @Advice.Argument(0) Message message) {
164+
if (message == null) {
165+
return false;
166+
}
167+
168+
// call the instrumented publish method
161169
connection.publish(
162170
message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData());
163171
return true;

instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionRequestInstrumentation.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public static Message onEnter(
129129
@Advice.Argument(2) Duration timeout,
130130
@Advice.Local("message") Message message)
131131
throws InterruptedException {
132+
// call the instrumented request method
132133
message = connection.request(subject, null, body, timeout);
133134
return message;
134135
}
@@ -151,7 +152,6 @@ public static void onEnter(
151152
@Advice.Argument(0) String subject,
152153
@Advice.Argument(value = 1, readOnly = false) Headers headers,
153154
@Advice.Argument(2) byte[] body,
154-
@Advice.Argument(3) Duration timeout,
155155
@Advice.Local("otelContext") Context otelContext,
156156
@Advice.Local("otelScope") Scope otelScope,
157157
@Advice.Local("natsRequest") NatsRequest natsRequest) {
@@ -199,6 +199,11 @@ public static Message onEnter(
199199
@Advice.Argument(1) Duration timeout,
200200
@Advice.Local("response") Message response)
201201
throws InterruptedException {
202+
if (request == null) {
203+
return null;
204+
}
205+
206+
// call the instrumented request method
202207
response =
203208
connection.request(
204209
request.getSubject(), request.getHeaders(), request.getData(), timeout);
@@ -223,6 +228,7 @@ public static CompletableFuture<Message> onEnter(
223228
@Advice.Argument(0) String subject,
224229
@Advice.Argument(1) byte[] body,
225230
@Advice.Local("future") CompletableFuture<Message> future) {
231+
// call the instrumented request method
226232
future = connection.request(subject, null, body);
227233
return future;
228234
}
@@ -294,6 +300,11 @@ public static CompletableFuture<Message> onEnter(
294300
@Advice.This Connection connection,
295301
@Advice.Argument(0) Message message,
296302
@Advice.Local("future") CompletableFuture<Message> future) {
303+
if (message == null) {
304+
return null;
305+
}
306+
307+
// call the instrumented request method
297308
future = connection.request(message.getSubject(), message.getHeaders(), message.getData());
298309
return future;
299310
}
@@ -317,6 +328,7 @@ public static CompletableFuture<Message> onEnter(
317328
@Advice.Argument(1) byte[] body,
318329
@Advice.Argument(2) Duration timeout,
319330
@Advice.Local("future") CompletableFuture<Message> future) {
331+
// call the instrumented requestWithTimeout method
320332
future = connection.requestWithTimeout(subject, null, body, timeout);
321333
return future;
322334
}
@@ -389,6 +401,11 @@ public static CompletableFuture<Message> onEnter(
389401
@Advice.Argument(value = 0, readOnly = false) Message message,
390402
@Advice.Argument(1) Duration timeout,
391403
@Advice.Local("future") CompletableFuture<Message> future) {
404+
if (message == null) {
405+
return null;
406+
}
407+
408+
// call the instrumented requestWithTimeout method
392409
future =
393410
connection.requestWithTimeout(
394411
message.getSubject(), message.getHeaders(), message.getData(), timeout);

instrumentation/nats/nats-2.17/library/README.md

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,15 @@ implementation("io.opentelemetry.instrumentation:opentelemetry-nats-2.17:OPENTEL
3131

3232
The instrumentation library provides the class `NatsTelemetry` that has a builder
3333
method and allows the creation of an instance of the `Connection` to provide
34-
OpenTelemetry-based spans and context propagation:
34+
OpenTelemetry-based instrumentation:
3535

3636
```java
3737
import io.nats.client.Connection;
3838
import io.nats.client.Nats;
3939
import io.nats.client.Options;
4040
import io.opentelemetry.api.OpenTelemetry;
4141
import io.opentelemetry.instrumentation.nats.v2_17.NatsTelemetry;
42+
import java.io.IOException;
4243

4344
public class OpenTelemetryNatsConnection {
4445

@@ -50,19 +51,13 @@ public class OpenTelemetryNatsConnection {
5051
this.telemetry = NatsTelemetry.create(openTelemetry);
5152
}
5253

53-
// creates a new connection with opentelemetry instrumentation.
54-
// This will *not* instrument the connection's main inbox
55-
// if you're using the default NatsConnection implementation
56-
public Connection wrap(Connection connection) {
57-
return telemetry.wrap(connection);
54+
public Connection newConnection() throws IOException, InterruptedException {
55+
return newConnection(Options.builder());
5856
}
5957

60-
// prefer wrapping the Options.Builder to get the full instrumentation
61-
// when using the default NatsConnection implementation
62-
public Connection create(Options.Builder builder) throws IOException, InterruptedException {
63-
Options options = telemetry.configureDispatcher(builder).build();
64-
Connection connection = Nats.connect(options);
65-
return wrap(connection);
58+
public Connection newConnection(Options.Builder options) throws IOException, InterruptedException {
59+
return telemetry.newConnection(options.build(), Nats::connect);
6660
}
61+
6762
}
6863
```

instrumentation/nats/nats-2.17/library/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
dependencies {
6-
library("io.nats:jnats:2.17.7")
6+
library("io.nats:jnats:2.17.2")
77

88
compileOnly("com.google.auto.value:auto-value-annotations")
99
annotationProcessor("com.google.auto.value:auto-value")

instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetry.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,23 @@ public NatsTelemetry(
3434
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
3535
}
3636

37-
Connection wrap(Connection connection) {
37+
/**
38+
* Returns a {@link Connection} with telemetry instrumentation.
39+
*
40+
* <p>This does *not* monitor the main inbox of the default dispatcher. {@link
41+
* #configure(Options.Builder)} {@link #newConnection(Options.Builder, ConnectionFactory)} {@link
42+
* #newConnection(Options, ConnectionFactory)}
43+
*/
44+
public Connection wrap(Connection connection) {
3845
return OpenTelemetryConnection.wrap(
3946
connection, producerInstrumenter, consumerProcessInstrumenter);
4047
}
4148

42-
Options.Builder configure(Options.Builder options) {
49+
/**
50+
* Returns a {@link Options.Builder} with the main inbox from the default dispatcher monitored
51+
* with telemetry instrumentation.
52+
*/
53+
public Options.Builder configure(Options.Builder options) {
4354
DispatcherFactory factory = options.build().getDispatcherFactory();
4455

4556
if (factory == null) {
@@ -50,13 +61,13 @@ Options.Builder configure(Options.Builder options) {
5061
new OpenTelemetryDispatcherFactory(factory, consumerProcessInstrumenter));
5162
}
5263

53-
/** Returns a {@link Connection} with messaging spans instrumentation. */
64+
/** Returns a {@link Connection} with telemetry instrumentation. */
5465
public Connection newConnection(Options options, ConnectionFactory<Options> connectionFactory)
5566
throws IOException, InterruptedException {
5667
return wrap(connectionFactory.create(configure(new Options.Builder(options)).build()));
5768
}
5869

59-
/** Returns a {@link Connection} with messaging spans instrumentation. */
70+
/** Returns a {@link Connection} with telemetry instrumentation. */
6071
public Connection newConnection(
6172
Options.Builder builder, ConnectionFactory<Options.Builder> connectionFactory)
6273
throws IOException, InterruptedException {

instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/OpenTelemetryConnection.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ private void publish(Method method, Object[] args) throws Throwable {
124124
replyTo = (String) args[1];
125125
headers = (Headers) args[2];
126126
body = (byte[]) args[3];
127-
} else if (method.getParameterCount() == 1 && method.getParameterTypes()[0] == Message.class) {
127+
} else if (method.getParameterCount() == 1
128+
&& method.getParameterTypes()[0] == Message.class
129+
&& args[0] != null) {
128130
subject = ((Message) args[0]).getSubject();
129131
replyTo = ((Message) args[0]).getReplyTo();
130132
headers = ((Message) args[0]).getHeaders();
@@ -145,17 +147,22 @@ private void publish(Method method, Object[] args) throws Throwable {
145147
}
146148

147149
Context context = producerInstrumenter.start(parentContext, natsRequest);
150+
Throwable throwable = null;
148151
try (Scope ignored = context.makeCurrent()) {
149152
delegate.publish(subject, replyTo, headers, body);
153+
} catch (Throwable t) {
154+
throwable = t;
155+
throw t;
150156
} finally {
151-
producerInstrumenter.end(context, natsRequest, null, null);
157+
producerInstrumenter.end(context, natsRequest, null, throwable);
152158
}
153159
}
154160

155161
// Message request(String subject, byte[] body, Duration timeout) throws InterruptedException;
156162
// Message request(String subject, Headers headers, byte[] body, Duration timeout) throws
157163
// InterruptedException;
158164
// Message request(Message message, Duration timeout) throws InterruptedException;
165+
@SuppressWarnings("InterruptedExceptionSwallowed")
159166
private Message request(Method method, Object[] args) throws Throwable {
160167
String subject = null;
161168
Headers headers = null;
@@ -176,7 +183,9 @@ private Message request(Method method, Object[] args) throws Throwable {
176183
headers = (Headers) args[1];
177184
body = (byte[]) args[2];
178185
timeout = (Duration) args[3];
179-
} else if (method.getParameterCount() == 2 && method.getParameterTypes()[0] == Message.class) {
186+
} else if (method.getParameterCount() == 2
187+
&& method.getParameterTypes()[0] == Message.class
188+
&& args[0] != null) {
180189
subject = ((Message) args[0]).getSubject();
181190
headers = ((Message) args[0]).getHeaders();
182191
body = ((Message) args[0]).getData();
@@ -199,8 +208,8 @@ private Message request(Method method, Object[] args) throws Throwable {
199208

200209
Context context = producerInstrumenter.start(parentContext, natsRequest);
201210
NatsRequest response = null;
202-
203211
Throwable throwable = null;
212+
204213
try (Scope ignored = context.makeCurrent()) {
205214
Message result = delegate.request(subject, headers, body, timeout);
206215

@@ -209,7 +218,7 @@ private Message request(Method method, Object[] args) throws Throwable {
209218
}
210219

211220
return result;
212-
} catch (InterruptedException t) {
221+
} catch (Throwable t) {
213222
throwable = t;
214223
throw t;
215224
} finally {
@@ -258,12 +267,14 @@ private CompletableFuture<Message> requestAsync(Method method, Object[] args) th
258267
body = (byte[]) args[2];
259268
timeout = (Duration) args[3];
260269
} else if ((method.getParameterCount() == 1)
261-
&& method.getParameterTypes()[0] == Message.class) {
270+
&& method.getParameterTypes()[0] == Message.class
271+
&& args[0] != null) {
262272
subject = ((Message) args[0]).getSubject();
263273
headers = ((Message) args[0]).getHeaders();
264274
body = ((Message) args[0]).getData();
265275
} else if ((method.getParameterCount() == 2)
266-
&& method.getParameterTypes()[0] == Message.class) {
276+
&& method.getParameterTypes()[0] == Message.class
277+
&& args[0] != null) {
267278
subject = ((Message) args[0]).getSubject();
268279
headers = ((Message) args[0]).getHeaders();
269280
body = ((Message) args[0]).getData();
@@ -286,10 +297,15 @@ private CompletableFuture<Message> requestAsync(Method method, Object[] args) th
286297
Context context = producerInstrumenter.start(parentContext, notNullNatsRequest);
287298

288299
CompletableFuture<Message> future;
289-
if (timeout != null) {
290-
future = delegate.requestWithTimeout(subject, headers, body, timeout);
291-
} else {
292-
future = delegate.request(subject, headers, body);
300+
try {
301+
if (timeout != null) {
302+
future = delegate.requestWithTimeout(subject, headers, body, timeout);
303+
} else {
304+
future = delegate.request(subject, headers, body);
305+
}
306+
} catch (Throwable t) {
307+
producerInstrumenter.end(context, notNullNatsRequest, null, t);
308+
throw t;
293309
}
294310

295311
return future.whenComplete(

instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/OpenTelemetryDispatcher.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
4848
}
4949
}
5050

51-
private static Object invokeProxyMethod(Method method, Object target, Object[] args)
52-
throws Throwable {
51+
private static Object invokeMethod(Method method, Object target, Object[] args) throws Throwable {
5352
try {
5453
return method.invoke(target, args);
5554
} catch (InvocationTargetException exception) {
@@ -69,7 +68,7 @@ private Subscription subscribe(Method method, Object[] args) throws Throwable {
6968
new OpenTelemetryMessageHandler((MessageHandler) args[2], consumerProcessInstrumenter);
7069
}
7170

72-
return (Subscription) invokeProxyMethod(method, delegate, args);
71+
return (Subscription) invokeMethod(method, delegate, args);
7372
}
7473

7574
Dispatcher getDelegate() {

instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/OpenTelemetryMessageHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ public void onMessage(Message message) throws InterruptedException {
3838
}
3939

4040
Context processContext = consumerProcessInstrumenter.start(parentContext, natsRequest);
41-
InterruptedException exception = null;
41+
Throwable error = null;
4242

4343
try (Scope ignored = processContext.makeCurrent()) {
4444
delegate.onMessage(message);
45-
} catch (InterruptedException e) {
46-
exception = e;
47-
throw e;
45+
} catch (Throwable t) {
46+
error = t;
47+
throw t;
4848
} finally {
49-
consumerProcessInstrumenter.end(processContext, natsRequest, null, exception);
49+
consumerProcessInstrumenter.end(processContext, natsRequest, null, error);
5050
}
5151
}
5252
}

0 commit comments

Comments
 (0)