Skip to content

Commit 0ce44fb

Browse files
committed
Instrument NATS Subscription.nextMessage
1 parent df2b1fa commit 0ce44fb

File tree

22 files changed

+1133
-65
lines changed

22 files changed

+1133
-65
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
9+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
10+
import static net.bytebuddy.matcher.ElementMatchers.named;
11+
import static net.bytebuddy.matcher.ElementMatchers.returns;
12+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
13+
14+
import io.nats.client.Connection;
15+
import io.nats.client.Subscription;
16+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
17+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
18+
import io.opentelemetry.javaagent.instrumentation.nats.v2_21.internal.NatsData;
19+
import net.bytebuddy.asm.Advice;
20+
import net.bytebuddy.description.type.TypeDescription;
21+
import net.bytebuddy.matcher.ElementMatcher;
22+
23+
public class ConnectionSubscribeInstrumentation implements TypeInstrumentation {
24+
25+
@Override
26+
public ElementMatcher<TypeDescription> typeMatcher() {
27+
return implementsInterface(named("io.nats.client.Connection"));
28+
}
29+
30+
@Override
31+
public void transform(TypeTransformer transformer) {
32+
transformer.applyAdviceToMethod(
33+
isPublic()
34+
.and(named("subscribe"))
35+
.and(takesArgument(0, String.class))
36+
.and(returns(named("io.nats.client.Subscription"))),
37+
ConnectionSubscribeInstrumentation.class.getName() + "$SubscribeAdvice");
38+
}
39+
40+
@SuppressWarnings("unused")
41+
public static class SubscribeAdvice {
42+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
43+
public static void onExit(
44+
@Advice.This Connection connection, @Advice.Return Subscription subscription) {
45+
NatsData.addSubscription(subscription, connection);
46+
}
47+
}
48+
}

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsInstrumentationModule.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import com.google.auto.service.AutoService;
99
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1010
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
11-
import java.util.Collections;
11+
import java.util.Arrays;
1212
import java.util.List;
1313

1414
@AutoService(InstrumentationModule.class)
@@ -22,6 +22,9 @@ public NatsInstrumentationModule() {
2222

2323
@Override
2424
public List<TypeInstrumentation> typeInstrumentations() {
25-
return Collections.singletonList(new ConnectionPublishInstrumentation());
25+
return Arrays.asList(
26+
new ConnectionSubscribeInstrumentation(),
27+
new ConnectionPublishInstrumentation(),
28+
new SubscriptionInstrumentation());
2629
}
2730
}

instrumentation/nats/nats-2.21/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsSingletons.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
77

8+
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createConsumerInstrumenter;
89
import static io.opentelemetry.instrumentation.nats.v2_21.internal.NatsInstrumenterFactory.createProducerInstrumenter;
910

1011
import io.opentelemetry.api.GlobalOpenTelemetry;
@@ -16,5 +17,8 @@ public final class NatsSingletons {
1617
public static final Instrumenter<NatsRequest, Void> PRODUCER_INSTRUMENTER =
1718
createProducerInstrumenter(GlobalOpenTelemetry.get());
1819

20+
public static final Instrumenter<NatsRequest, Void> CONSUMER_INSTRUMENTER =
21+
createConsumerInstrumenter(GlobalOpenTelemetry.get());
22+
1923
private NatsSingletons() {}
2024
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
9+
import static io.opentelemetry.javaagent.instrumentation.nats.v2_21.NatsSingletons.CONSUMER_INSTRUMENTER;
10+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
11+
import static net.bytebuddy.matcher.ElementMatchers.named;
12+
import static net.bytebuddy.matcher.ElementMatchers.returns;
13+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
14+
15+
import io.nats.client.Connection;
16+
import io.nats.client.Message;
17+
import io.nats.client.Subscription;
18+
import io.opentelemetry.context.Context;
19+
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
20+
import io.opentelemetry.instrumentation.api.internal.Timer;
21+
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
22+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
23+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
24+
import io.opentelemetry.javaagent.instrumentation.nats.v2_21.internal.NatsData;
25+
import java.util.concurrent.TimeoutException;
26+
import javax.annotation.Nullable;
27+
import net.bytebuddy.asm.Advice;
28+
import net.bytebuddy.description.type.TypeDescription;
29+
import net.bytebuddy.matcher.ElementMatcher;
30+
31+
public class SubscriptionInstrumentation implements TypeInstrumentation {
32+
33+
@Override
34+
public ElementMatcher<TypeDescription> typeMatcher() {
35+
return implementsInterface(named("io.nats.client.Subscription"));
36+
}
37+
38+
@Override
39+
public void transform(TypeTransformer transformer) {
40+
transformer.applyAdviceToMethod(
41+
isPublic()
42+
.and(named("nextMessage"))
43+
.and(takesArguments(1))
44+
.and(returns(named("io.nats.client.Message"))),
45+
SubscriptionInstrumentation.class.getName() + "$NextMessageAdvice");
46+
transformer.applyAdviceToMethod(
47+
isPublic().and(named("unsubscribe")),
48+
SubscriptionInstrumentation.class.getName() + "$UnsubscribeAdvice");
49+
}
50+
51+
@SuppressWarnings("unused")
52+
public static class NextMessageAdvice {
53+
54+
@Advice.OnMethodEnter(suppress = Throwable.class)
55+
public static Timer onEnter() {
56+
return Timer.start();
57+
}
58+
59+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
60+
public static void onExit(
61+
@Advice.Thrown Throwable throwable,
62+
@Advice.This Subscription subscription,
63+
@Advice.Enter Timer timer,
64+
@Advice.Return @Nullable Message message) {
65+
Context parentContext = Context.current();
66+
TimeoutException timeout = null;
67+
68+
Connection connection = NatsData.getConnection(subscription);
69+
70+
// connection should always be non-null at this stage
71+
if (connection == null) {
72+
return;
73+
}
74+
75+
NatsRequest natsRequest = NatsRequest.create(connection, subscription.getSubject());
76+
if (message == null) {
77+
timeout = new TimeoutException("Timed out waiting for message");
78+
} else {
79+
natsRequest = NatsRequest.create(connection, message);
80+
}
81+
82+
if (!CONSUMER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) {
83+
return;
84+
}
85+
86+
InstrumenterUtil.startAndEnd(
87+
CONSUMER_INSTRUMENTER,
88+
parentContext,
89+
natsRequest,
90+
null,
91+
timeout,
92+
timer.startTime(),
93+
timer.now());
94+
}
95+
}
96+
97+
@SuppressWarnings("unused")
98+
public static class UnsubscribeAdvice {
99+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
100+
public static void onExit(@Advice.This Subscription subscription) {
101+
NatsData.removeSubscription(subscription);
102+
}
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.nats.v2_21.internal;
7+
8+
import io.nats.client.Connection;
9+
import io.nats.client.Subscription;
10+
import io.opentelemetry.instrumentation.api.util.VirtualField;
11+
12+
/**
13+
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
14+
* any time.", or "This class is internal and experimental. Its APIs are unstable and can change at
15+
* any time. Its APIs (or a version of them) may be promoted to the public stable API in the future,
16+
* but no guarantees are made.
17+
*/
18+
public final class NatsData {
19+
20+
private static final VirtualField<Subscription, Connection> subscriptionConnection =
21+
VirtualField.find(Subscription.class, Connection.class);
22+
23+
public static void addSubscription(Subscription subscription, Connection connection) {
24+
subscriptionConnection.set(subscription, connection);
25+
}
26+
27+
public static void removeSubscription(Subscription subscription) {
28+
subscriptionConnection.set(subscription, null);
29+
}
30+
31+
public static Connection getConnection(Subscription subscription) {
32+
return subscriptionConnection.get(subscription);
33+
}
34+
35+
private NatsData() {}
36+
}

instrumentation/nats/nats-2.21/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_21/NatsInstrumentationPublishTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ static void afterAll() {
6363
@BeforeEach
6464
void beforeEach() throws IOException, InterruptedException {
6565
connection = Nats.connect(natsUrl);
66-
subscription = connection.subscribe("*");
66+
subscription = connection.subscribe("sub");
6767
clientId = connection.getServerInfo().getClientId();
6868
}
6969

0 commit comments

Comments
 (0)