Skip to content

Commit 01a48a8

Browse files
committed
<publish/request>(args) to <publish/request>(Message)
use Docker for library testing
1 parent fefeebd commit 01a48a8

28 files changed

+416
-1686
lines changed

instrumentation/nats/nats-2.17/javaagent/README.md

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,3 @@
22

33
Provides OpenTelemetry auto-instrumentation for [NATS 2.17](https://github.com/nats-io/nats.java).
44

5-
### Trace propagation
6-
7-
It's recommended to provide `Message` with a writable `Header` structure
8-
to allow propagation between publishers and subscribers. Without headers,
9-
the tracing context will not be propagated in the headers.
10-
11-
```java
12-
import io.nats.client.impl.Headers;
13-
import io.nats.client.impl.NatsMessage;
14-
15-
// don't
16-
Message msg = NatsMessage.builder().subject("sub").build();
17-
18-
// do
19-
Message msg = NatsMessage.builder().subject("sub").headers(new Headers()).build();
20-
```

instrumentation/nats/nats-2.17/javaagent/build.gradle.kts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ dependencies {
2525
val collectMetadata = findProperty("collectMetadata")?.toString() ?: "false"
2626

2727
tasks {
28+
withType<Test>().configureEach {
29+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
30+
}
31+
2832
val testExperimental by registering(Test::class) {
2933
filter {
3034
includeTestsMatching("NatsInstrumentationExperimentalTest")
@@ -36,7 +40,6 @@ tasks {
3640
}
3741

3842
test {
39-
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
4043
filter {
4144
excludeTestsMatching("NatsInstrumentationExperimentalTest")
4245
}

instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionPublishInstrumentation.java

Lines changed: 26 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.nats.client.impl.Headers;
1818
import io.opentelemetry.context.Context;
1919
import io.opentelemetry.context.Scope;
20+
import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsMessageWritableHeaders;
2021
import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest;
2122
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
2223
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
@@ -75,153 +76,53 @@ public void transform(TypeTransformer transformer) {
7576

7677
@SuppressWarnings("unused")
7778
public static class PublishBodyAdvice {
78-
79-
@Advice.OnMethodEnter(suppress = Throwable.class)
80-
public static void onEnter(
79+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
80+
public static boolean onEnter(
8181
@Advice.This Connection connection,
8282
@Advice.Argument(0) String subject,
83-
@Advice.Argument(1) byte[] body,
84-
@Advice.Local("otelContext") Context otelContext,
85-
@Advice.Local("otelScope") Scope otelScope,
86-
@Advice.Local("natsRequest") NatsRequest natsRequest) {
87-
Context parentContext = Context.current();
88-
natsRequest = NatsRequest.create(connection, null, subject, null, body);
89-
90-
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
91-
return;
92-
}
93-
94-
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
95-
otelScope = otelContext.makeCurrent();
96-
}
97-
98-
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
99-
public static void onExit(
100-
@Advice.Thrown Throwable throwable,
101-
@Advice.Local("otelContext") Context otelContext,
102-
@Advice.Local("otelScope") Scope otelScope,
103-
@Advice.Local("natsRequest") NatsRequest natsRequest) {
104-
if (otelScope == null) {
105-
return;
106-
}
107-
108-
otelScope.close();
109-
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
83+
@Advice.Argument(1) byte[] body) {
84+
connection.publish( NatsMessageWritableHeaders.create(subject, body));
85+
return true;
11086
}
11187
}
11288

11389
@SuppressWarnings("unused")
11490
public static class PublishHeadersBodyAdvice {
115-
116-
@Advice.OnMethodEnter(suppress = Throwable.class)
117-
public static void onEnter(
91+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
92+
public static boolean onEnter(
11893
@Advice.This Connection connection,
11994
@Advice.Argument(0) String subject,
120-
@Advice.Argument(1) Headers headers,
121-
@Advice.Argument(2) byte[] body,
122-
@Advice.Local("otelContext") Context otelContext,
123-
@Advice.Local("otelScope") Scope otelScope,
124-
@Advice.Local("natsRequest") NatsRequest natsRequest) {
125-
Context parentContext = Context.current();
126-
natsRequest = NatsRequest.create(connection, null, subject, headers, body);
127-
128-
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
129-
return;
130-
}
131-
132-
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
133-
otelScope = otelContext.makeCurrent();
134-
}
135-
136-
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
137-
public static void onExit(
138-
@Advice.Thrown Throwable throwable,
139-
@Advice.Local("otelContext") Context otelContext,
140-
@Advice.Local("otelScope") Scope otelScope,
141-
@Advice.Local("natsRequest") NatsRequest natsRequest) {
142-
if (otelScope == null) {
143-
return;
144-
}
145-
146-
otelScope.close();
147-
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
95+
@Advice.Argument(value = 1, readOnly = false) Headers headers,
96+
@Advice.Argument(2) byte[] body) {
97+
connection.publish( NatsMessageWritableHeaders.create(subject, headers, body));
98+
return true;
14899
}
149100
}
150101

151102
@SuppressWarnings("unused")
152103
public static class PublishReplyToBodyAdvice {
153-
154-
@Advice.OnMethodEnter(suppress = Throwable.class)
155-
public static void onEnter(
104+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
105+
public static boolean onEnter(
156106
@Advice.This Connection connection,
157107
@Advice.Argument(0) String subject,
158108
@Advice.Argument(1) String replyTo,
159-
@Advice.Argument(2) byte[] body,
160-
@Advice.Local("otelContext") Context otelContext,
161-
@Advice.Local("otelScope") Scope otelScope,
162-
@Advice.Local("natsRequest") NatsRequest natsRequest) {
163-
Context parentContext = Context.current();
164-
natsRequest = NatsRequest.create(connection, replyTo, subject, null, body);
165-
166-
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
167-
return;
168-
}
169-
170-
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
171-
otelScope = otelContext.makeCurrent();
172-
}
173-
174-
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
175-
public static void onExit(
176-
@Advice.Thrown Throwable throwable,
177-
@Advice.Local("otelContext") Context otelContext,
178-
@Advice.Local("otelScope") Scope otelScope,
179-
@Advice.Local("natsRequest") NatsRequest natsRequest) {
180-
if (otelScope == null) {
181-
return;
182-
}
183-
184-
otelScope.close();
185-
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
109+
@Advice.Argument(2) byte[] body) {
110+
connection.publish(NatsMessageWritableHeaders.create(subject, replyTo, body));
111+
return true;
186112
}
187113
}
188114

189115
@SuppressWarnings("unused")
190116
public static class PublishReplyToHeadersBodyAdvice {
191-
192-
@Advice.OnMethodEnter(suppress = Throwable.class)
193-
public static void onEnter(
117+
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
118+
public static boolean onEnter(
194119
@Advice.This Connection connection,
195120
@Advice.Argument(0) String subject,
196121
@Advice.Argument(1) String replyTo,
197-
@Advice.Argument(2) Headers headers,
198-
@Advice.Argument(3) byte[] body,
199-
@Advice.Local("otelContext") Context otelContext,
200-
@Advice.Local("otelScope") Scope otelScope,
201-
@Advice.Local("natsRequest") NatsRequest natsRequest) {
202-
Context parentContext = Context.current();
203-
natsRequest = NatsRequest.create(connection, replyTo, subject, headers, body);
204-
205-
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
206-
return;
207-
}
208-
209-
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest);
210-
otelScope = otelContext.makeCurrent();
211-
}
212-
213-
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
214-
public static void onExit(
215-
@Advice.Thrown Throwable throwable,
216-
@Advice.Local("otelContext") Context otelContext,
217-
@Advice.Local("otelScope") Scope otelScope,
218-
@Advice.Local("natsRequest") NatsRequest natsRequest) {
219-
if (otelScope == null) {
220-
return;
221-
}
222-
223-
otelScope.close();
224-
PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable);
122+
@Advice.Argument(value = 2, readOnly = false) Headers headers,
123+
@Advice.Argument(3) byte[] body) {
124+
connection.publish(NatsMessageWritableHeaders.create(subject, replyTo, headers, body));
125+
return true;
225126
}
226127
}
227128

@@ -231,10 +132,12 @@ public static class PublishMessageAdvice {
231132
@Advice.OnMethodEnter(suppress = Throwable.class)
232133
public static void onEnter(
233134
@Advice.This Connection connection,
234-
@Advice.Argument(0) Message message,
135+
@Advice.Argument(value = 0, readOnly = false) Message message,
235136
@Advice.Local("otelContext") Context otelContext,
236137
@Advice.Local("otelScope") Scope otelScope,
237138
@Advice.Local("natsRequest") NatsRequest natsRequest) {
139+
message = NatsMessageWritableHeaders.create(message);
140+
238141
Context parentContext = Context.current();
239142
natsRequest = NatsRequest.create(connection, message);
240143

0 commit comments

Comments
 (0)