Skip to content

Commit 4accfc9

Browse files
committed
Bootstrap NATS instrumentation with Connection.publish(Message) tracing
1 parent 8af76e8 commit 4accfc9

File tree

20 files changed

+1402
-0
lines changed

20 files changed

+1402
-0
lines changed

.fossa.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,12 @@ targets:
691691
- type: gradle
692692
path: ./
693693
target: ':instrumentation:mongo:mongo-async-3.3:javaagent'
694+
- type: gradle
695+
path: ./
696+
target: ':instrumentation:nats:nats-2.21:javaagent'
697+
- type: gradle
698+
path: ./
699+
target: ':instrumentation:nats:nats-2.21:library'
694700
- type: gradle
695701
path: ./
696702
target: ':instrumentation:netty:netty-3.8:javaagent'

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ out/
4545
######################
4646
.vscode
4747
**/bin/
48+
.metals
4849

4950
# Others #
5051
##########
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("io.nats")
8+
module.set("jnats")
9+
versions.set("[2.21.0,)")
10+
assertInverse.set(true)
11+
}
12+
}
13+
14+
dependencies {
15+
library("io.nats:jnats:2.21.0")
16+
17+
implementation(project(":instrumentation:nats:nats-2.21:library"))
18+
}
19+
20+
tasks {
21+
test {
22+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
9+
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.PRODUCER_INSTRUMENTER;
10+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
11+
import static net.bytebuddy.matcher.ElementMatchers.named;
12+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
14+
15+
import io.nats.client.Connection;
16+
import io.nats.client.Message;
17+
import io.opentelemetry.context.Context;
18+
import io.opentelemetry.context.Scope;
19+
import io.opentelemetry.instrumentation.nats.v2_21.OpenTelemetryMessage;
20+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
21+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
22+
import net.bytebuddy.asm.Advice;
23+
import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument;
24+
import net.bytebuddy.description.type.TypeDescription;
25+
import net.bytebuddy.matcher.ElementMatcher;
26+
27+
public class ConnectionInstrumentation implements TypeInstrumentation {
28+
29+
@Override
30+
public ElementMatcher<TypeDescription> typeMatcher() {
31+
return implementsInterface(named("io.nats.client.Connection"));
32+
}
33+
34+
@Override
35+
public void transform(TypeTransformer transformer) {
36+
transformer.applyAdviceToMethod(
37+
isPublic()
38+
.and(named("publish"))
39+
.and(takesArguments(1))
40+
.and(takesArgument(0, named("io.nats.client.Message"))),
41+
ConnectionInstrumentation.class.getName() + "$PublishAdvice");
42+
}
43+
44+
@SuppressWarnings("unused")
45+
public static class PublishAdvice {
46+
47+
@Advice.OnMethodEnter(suppress = Throwable.class)
48+
@Advice.AssignReturned.ToArguments(@ToArgument(0))
49+
public static Message onEnter(
50+
@Advice.This Connection connection,
51+
@Advice.Argument(0) Message message,
52+
@Advice.Local("otelContext") Context otelContext,
53+
@Advice.Local("otelScope") Scope otelScope) {
54+
Context parentContext = Context.current();
55+
56+
if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, message)) {
57+
return message;
58+
}
59+
60+
OpenTelemetryMessage otelMessage = new OpenTelemetryMessage(connection, message);
61+
62+
otelContext = PRODUCER_INSTRUMENTER.start(parentContext, otelMessage);
63+
otelScope = otelContext.makeCurrent();
64+
65+
return otelMessage;
66+
}
67+
68+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
69+
public static void onExit(
70+
@Advice.Thrown Throwable throwable,
71+
@Advice.This Connection connection,
72+
@Advice.Argument(0) Message message,
73+
@Advice.Local("otelContext") Context otelContext,
74+
@Advice.Local("otelScope") Scope otelScope) {
75+
if (otelScope == null) {
76+
return;
77+
}
78+
79+
otelScope.close();
80+
PRODUCER_INSTRUMENTER.end(otelContext, message, null, throwable);
81+
}
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import com.google.auto.service.AutoService;
9+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
10+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
11+
import java.util.Collections;
12+
import java.util.List;
13+
14+
@AutoService(InstrumentationModule.class)
15+
public class NatsInstrumentationModule extends InstrumentationModule {
16+
17+
public NatsInstrumentationModule() {
18+
super("nats", "nats-2.21");
19+
}
20+
21+
// TODO classLoaderMatcher
22+
23+
@Override
24+
public List<TypeInstrumentation> typeInstrumentations() {
25+
return Collections.singletonList(new ConnectionInstrumentation());
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createProducerInstrumenter;
9+
10+
import io.nats.client.Message;
11+
import io.opentelemetry.api.GlobalOpenTelemetry;
12+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
13+
14+
public final class NatsSingletons {
15+
16+
public static final Instrumenter<Message, Void> PRODUCER_INSTRUMENTER =
17+
createProducerInstrumenter(GlobalOpenTelemetry.get());
18+
19+
private NatsSingletons() {}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
9+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
10+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
11+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
12+
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
13+
import static org.assertj.core.api.Assertions.assertThat;
14+
15+
import io.nats.client.Connection;
16+
import io.nats.client.Message;
17+
import io.nats.client.Nats;
18+
import io.nats.client.Subscription;
19+
import io.nats.client.impl.NatsMessage;
20+
import io.opentelemetry.api.common.AttributeKey;
21+
import io.opentelemetry.api.trace.SpanKind;
22+
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
23+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
24+
import java.io.IOException;
25+
import java.time.Duration;
26+
import java.util.LinkedList;
27+
import org.junit.jupiter.api.AfterAll;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.extension.RegisterExtension;
31+
import org.testcontainers.containers.GenericContainer;
32+
import org.testcontainers.utility.DockerImageName;
33+
34+
@SuppressWarnings("deprecation") // using deprecated semconv
35+
class NatsInstrumentationTest {
36+
37+
@RegisterExtension
38+
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
39+
40+
static final DockerImageName natsImage = DockerImageName.parse("nats:2.11.2-alpine3.21");
41+
42+
static final GenericContainer<?> natsContainer =
43+
new GenericContainer<>(natsImage).withExposedPorts(4222);
44+
45+
static String natsUrl;
46+
static Connection connection;
47+
static Subscription subscription;
48+
49+
static LinkedList<Message> publishedMessages = new LinkedList<>();
50+
51+
@BeforeAll
52+
static void beforeAll() throws IOException, InterruptedException {
53+
natsContainer.start();
54+
natsUrl = "nats://" + natsContainer.getHost() + ":" + natsContainer.getMappedPort(4222);
55+
connection = Nats.connect(natsUrl);
56+
subscription = connection.subscribe("*");
57+
}
58+
59+
@AfterAll
60+
static void afterAll() throws InterruptedException {
61+
subscription.drain(Duration.ofSeconds(10));
62+
connection.close();
63+
natsContainer.close();
64+
}
65+
66+
@Test
67+
void testPublishMessage() throws InterruptedException {
68+
// given
69+
NatsMessage message = NatsMessage.builder().subject("sub").data("x").build();
70+
int clientId = connection.getServerInfo().getClientId();
71+
72+
// when
73+
testing.runWithSpan("testPublishMessage", () -> connection.publish(message));
74+
75+
// then
76+
testing.waitAndAssertTraces(
77+
trace ->
78+
trace.hasSpansSatisfyingExactly(
79+
span -> span.hasName("testPublishMessage").hasNoParent(),
80+
span ->
81+
span.hasName("sub publish")
82+
.hasKind(SpanKind.PRODUCER)
83+
.hasParent(trace.getSpan(0))
84+
.hasAttributesSatisfyingExactly(
85+
equalTo(MESSAGING_OPERATION, "publish"),
86+
equalTo(MESSAGING_SYSTEM, "nats"),
87+
equalTo(MESSAGING_DESTINATION_NAME, "sub"),
88+
equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1),
89+
equalTo(
90+
AttributeKey.stringKey("messaging.client_id"),
91+
String.valueOf(clientId)))));
92+
93+
// and
94+
Message published = subscription.nextMessage(Duration.ofSeconds(1));
95+
assertThat(published.getHeaders().get("traceparent")).isNotEmpty();
96+
}
97+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
plugins {
2+
id("otel.library-instrumentation")
3+
}
4+
5+
dependencies {
6+
library("io.nats:jnats:2.21.0")
7+
8+
testImplementation(project(":instrumentation:nats:nats-2.21:testing"))
9+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.nats.v2_21;
7+
8+
import io.nats.client.Connection;
9+
import io.nats.client.Message;
10+
import io.opentelemetry.api.OpenTelemetry;
11+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
12+
13+
public final class NatsTelemetry {
14+
15+
public static NatsTelemetry create(OpenTelemetry openTelemetry) {
16+
return new NatsTelemetryBuilder(openTelemetry).build();
17+
}
18+
19+
public static NatsTelemetryBuilder builder(OpenTelemetry openTelemetry) {
20+
return new NatsTelemetryBuilder(openTelemetry);
21+
}
22+
23+
private final Instrumenter<Message, Void> producerInstrumenter;
24+
25+
public NatsTelemetry(Instrumenter<Message, Void> producerInstrumenter) {
26+
this.producerInstrumenter = producerInstrumenter;
27+
}
28+
29+
public OpenTelemetryConnection wrap(Connection connection) {
30+
return new OpenTelemetryConnection(connection, this.producerInstrumenter);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.nats.v2_21;
7+
8+
import io.opentelemetry.api.OpenTelemetry;
9+
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory;
10+
11+
public final class NatsTelemetryBuilder {
12+
13+
private final OpenTelemetry openTelemetry;
14+
15+
NatsTelemetryBuilder(OpenTelemetry openTelemetry) {
16+
this.openTelemetry = openTelemetry;
17+
}
18+
19+
public NatsTelemetry build() {
20+
return new NatsTelemetry(NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry));
21+
}
22+
}

0 commit comments

Comments
 (0)