Skip to content

Commit 065007d

Browse files
committed
Instrument NATS library 'request' method
1 parent 0ce44fb commit 065007d

File tree

12 files changed

+538
-114
lines changed

12 files changed

+538
-114
lines changed

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/NatsTelemetry.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,19 @@ public static NatsTelemetryBuilder builder(OpenTelemetry openTelemetry) {
2222

2323
private final Instrumenter<NatsRequest, Void> producerInstrumenter;
2424
private final Instrumenter<NatsRequest, Void> consumerInstrumenter;
25+
private final Instrumenter<NatsRequest, NatsRequest> clientInstrumenter;
2526

2627
public NatsTelemetry(
2728
Instrumenter<NatsRequest, Void> producerInstrumenter,
28-
Instrumenter<NatsRequest, Void> consumerInstrumenter) {
29+
Instrumenter<NatsRequest, Void> consumerInstrumenter,
30+
Instrumenter<NatsRequest, NatsRequest> clientInstrumenter) {
2931
this.producerInstrumenter = producerInstrumenter;
3032
this.consumerInstrumenter = consumerInstrumenter;
33+
this.clientInstrumenter = clientInstrumenter;
3134
}
3235

3336
public OpenTelemetryConnection wrap(Connection connection) {
3437
return new OpenTelemetryConnection(
35-
connection, this.producerInstrumenter, this.consumerInstrumenter);
38+
connection, this.producerInstrumenter, this.consumerInstrumenter, this.clientInstrumenter);
3639
}
3740
}

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/NatsTelemetryBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public final class NatsTelemetryBuilder {
1919
public NatsTelemetry build() {
2020
return new NatsTelemetry(
2121
NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry),
22-
NatsInstrumenterFactory.createConsumerInstrumenter(openTelemetry));
22+
NatsInstrumenterFactory.createConsumerInstrumenter(openTelemetry),
23+
NatsInstrumenterFactory.createClientInstrumenter(openTelemetry));
2324
}
2425
}

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/OpenTelemetryConnection.java

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,31 @@
3333
import io.opentelemetry.context.Scope;
3434
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
3535
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
36+
import io.opentelemetry.instrumentation.nats.v2_21.internal.ThrowingSupplier;
3637
import java.io.IOException;
3738
import java.net.InetAddress;
3839
import java.time.Duration;
3940
import java.util.Collection;
4041
import java.util.concurrent.CompletableFuture;
4142
import java.util.concurrent.TimeoutException;
43+
import java.util.function.Supplier;
4244

4345
public class OpenTelemetryConnection implements Connection {
4446

4547
private final Connection delegate;
4648
private final Instrumenter<NatsRequest, Void> producerInstrumenter;
4749
private final Instrumenter<NatsRequest, Void> consumerInstrumenter;
50+
private final Instrumenter<NatsRequest, NatsRequest> clientInstrumenter;
4851

4952
public OpenTelemetryConnection(
5053
Connection connection,
5154
Instrumenter<NatsRequest, Void> producerInstrumenter,
52-
Instrumenter<NatsRequest, Void> consumerInstrumenter) {
55+
Instrumenter<NatsRequest, Void> consumerInstrumenter,
56+
Instrumenter<NatsRequest, NatsRequest> clientInstrumenter) {
5357
this.delegate = connection;
5458
this.producerInstrumenter = producerInstrumenter;
5559
this.consumerInstrumenter = consumerInstrumenter;
60+
this.clientInstrumenter = clientInstrumenter;
5661
}
5762

5863
@Override
@@ -87,51 +92,62 @@ public void publish(Message message) {
8792

8893
@Override
8994
public CompletableFuture<Message> request(String subject, byte[] body) {
90-
return delegate.request(subject, body);
95+
return wrapRequest(
96+
NatsRequest.create(this, subject, body), () -> delegate.request(subject, body));
9197
}
9298

9399
@Override
94100
public Message request(String subject, byte[] body, Duration timeout)
95101
throws InterruptedException {
96-
return delegate.request(subject, body, timeout);
102+
return wrapRequest(
103+
NatsRequest.create(this, subject, body), () -> delegate.request(subject, body, timeout));
97104
}
98105

99106
@Override
100107
public CompletableFuture<Message> request(String subject, Headers headers, byte[] body) {
101-
return delegate.request(subject, headers, body);
108+
return wrapRequest(
109+
NatsRequest.create(this, subject, headers, body),
110+
() -> delegate.request(subject, headers, body));
102111
}
103112

104113
@Override
105114
public Message request(String subject, Headers headers, byte[] body, Duration timeout)
106115
throws InterruptedException {
107-
return delegate.request(subject, headers, body, timeout);
116+
return wrapRequest(
117+
NatsRequest.create(this, subject, headers, body),
118+
() -> delegate.request(subject, headers, body, timeout));
108119
}
109120

110121
@Override
111122
public CompletableFuture<Message> request(Message message) {
112-
return delegate.request(message);
123+
return wrapRequest(NatsRequest.create(this, message), () -> delegate.request(message));
113124
}
114125

115126
@Override
116127
public Message request(Message message, Duration timeout) throws InterruptedException {
117-
return delegate.request(message, timeout);
128+
return wrapRequest(NatsRequest.create(this, message), () -> delegate.request(message, timeout));
118129
}
119130

120131
@Override
121132
public CompletableFuture<Message> requestWithTimeout(
122133
String subject, byte[] body, Duration timeout) {
123-
return delegate.requestWithTimeout(subject, body, timeout);
134+
return wrapRequest(
135+
NatsRequest.create(this, subject, body),
136+
() -> delegate.requestWithTimeout(subject, body, timeout));
124137
}
125138

126139
@Override
127140
public CompletableFuture<Message> requestWithTimeout(
128141
String subject, Headers headers, byte[] body, Duration timeout) {
129-
return delegate.requestWithTimeout(subject, headers, body, timeout);
142+
return wrapRequest(
143+
NatsRequest.create(this, subject, headers, body),
144+
() -> delegate.requestWithTimeout(subject, headers, body, timeout));
130145
}
131146

132147
@Override
133148
public CompletableFuture<Message> requestWithTimeout(Message message, Duration timeout) {
134-
return delegate.requestWithTimeout(message, timeout);
149+
return wrapRequest(
150+
NatsRequest.create(this, message), () -> delegate.requestWithTimeout(message, timeout));
135151
}
136152

137153
@Override
@@ -367,4 +383,59 @@ private void wrapPublish(NatsRequest natsRequest, Runnable publish) {
367383
producerInstrumenter.end(context, natsRequest, null, null);
368384
}
369385
}
386+
387+
private Message wrapRequest(
388+
NatsRequest natsRequest, ThrowingSupplier<Message, InterruptedException> request)
389+
throws InterruptedException {
390+
Context parentContext = Context.current();
391+
392+
if (!Span.fromContext(parentContext).getSpanContext().isValid()
393+
|| !clientInstrumenter.shouldStart(parentContext, natsRequest)) {
394+
return request.call();
395+
}
396+
397+
Context context = clientInstrumenter.start(parentContext, natsRequest);
398+
TimeoutException timeout = null;
399+
NatsRequest response = null;
400+
401+
try (Scope ignored = context.makeCurrent()) {
402+
Message message = request.call();
403+
404+
if (message == null) {
405+
timeout = new TimeoutException("Timed out waiting for message");
406+
} else {
407+
response = NatsRequest.create(this, message);
408+
}
409+
410+
return message;
411+
} finally {
412+
clientInstrumenter.end(context, natsRequest, response, timeout);
413+
}
414+
}
415+
416+
private CompletableFuture<Message> wrapRequest(
417+
NatsRequest natsRequest, Supplier<CompletableFuture<Message>> request) {
418+
Context parentContext = Context.current();
419+
420+
if (!Span.fromContext(parentContext).getSpanContext().isValid()
421+
|| !clientInstrumenter.shouldStart(parentContext, natsRequest)) {
422+
return request.get();
423+
}
424+
425+
Context context = clientInstrumenter.start(parentContext, natsRequest);
426+
427+
try (Scope ignored = context.makeCurrent()) {
428+
return request
429+
.get()
430+
.whenComplete(
431+
(message, exception) -> {
432+
if (message != null) {
433+
NatsRequest response = NatsRequest.create(this, message);
434+
clientInstrumenter.end(context, natsRequest, response, exception);
435+
} else {
436+
clientInstrumenter.end(context, natsRequest, null, exception);
437+
}
438+
});
439+
}
440+
}
370441
}

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/OpenTelemetrySubscription.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
1616
import io.opentelemetry.instrumentation.api.internal.Timer;
1717
import io.opentelemetry.instrumentation.nats.v2_21.internal.NatsRequest;
18-
import io.opentelemetry.instrumentation.nats.v2_21.internal.ThrowingSupplier;
18+
import io.opentelemetry.instrumentation.nats.v2_21.internal.ThrowingSupplier2;
1919
import java.time.Duration;
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.TimeoutException;
@@ -124,7 +124,7 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedExce
124124
}
125125

126126
private Message wrapNextMessage(
127-
ThrowingSupplier<Message, InterruptedException, IllegalStateException> nextMessage)
127+
ThrowingSupplier2<Message, InterruptedException, IllegalStateException> nextMessage)
128128
throws InterruptedException {
129129
Timer timer = Timer.start();
130130
Message message = nextMessage.call();

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/internal/NatsInstrumenterFactory.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,23 @@ public final class NatsInstrumenterFactory {
2525

2626
public static final SpanNameExtractor<NatsRequest> PRODUCER_SPAN_NAME_EXTRACTOR =
2727
MessagingSpanNameExtractor.create(
28-
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH);
28+
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.PUBLISH);
2929

3030
public static final AttributesExtractor<NatsRequest, Void> PRODUCER_ATTRIBUTES_EXTRACTOR =
3131
MessagingAttributesExtractor.create(
32-
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH);
32+
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.PUBLISH);
3333

3434
public static final SpanNameExtractor<NatsRequest> CONSUMER_SPAN_NAME_EXTRACTOR =
3535
MessagingSpanNameExtractor.create(
36-
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.RECEIVE);
36+
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.RECEIVE);
3737

3838
public static final AttributesExtractor<NatsRequest, Void> CONSUMER_ATTRIBUTES_EXTRACTOR =
3939
MessagingAttributesExtractor.create(
40-
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.RECEIVE);
40+
NatsRequestMessagingAttributesGetter.VOID_INSTANCE, MessageOperation.RECEIVE);
41+
42+
public static final AttributesExtractor<NatsRequest, NatsRequest> CLIENT_ATTRIBUTES_EXTRACTOR =
43+
MessagingAttributesExtractor.create(
44+
NatsRequestMessagingAttributesGetter.NATS_REQUEST_INSTANCE, MessageOperation.PUBLISH);
4145

4246
public static Instrumenter<NatsRequest, Void> createProducerInstrumenter(
4347
OpenTelemetry openTelemetry) {
@@ -59,5 +63,17 @@ public static Instrumenter<NatsRequest, Void> createConsumerInstrumenter(
5963
.buildConsumerInstrumenter(NatsRequestTextMapGetter.INSTANCE);
6064
}
6165

66+
public static Instrumenter<NatsRequest, NatsRequest> createClientInstrumenter(
67+
OpenTelemetry openTelemetry) {
68+
return Instrumenter.<NatsRequest, NatsRequest>builder(
69+
openTelemetry, INSTRUMENTATION_NAME, PRODUCER_SPAN_NAME_EXTRACTOR)
70+
.addAttributesExtractor(CLIENT_ATTRIBUTES_EXTRACTOR)
71+
.addSpanLinksExtractor(
72+
new PropagatorBasedSpanLinksExtractor<>(
73+
openTelemetry.getPropagators().getTextMapPropagator(),
74+
NatsRequestTextMapGetter.INSTANCE))
75+
.buildClientInstrumenter(NatsRequestTextMapSetter.INSTANCE);
76+
}
77+
6278
private NatsInstrumenterFactory() {}
6379
}

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/internal/NatsRequest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ public static NatsRequest create(Connection connection, String subject, byte[] d
2828
return create(connection, subject, null, data);
2929
}
3030

31-
public static NatsRequest create(Message message) {
32-
return create(message.getConnection(), message);
33-
}
34-
3531
public static NatsRequest create(Connection connection, Message message) {
3632
return create(
3733
message.getConnection() == null ? connection : message.getConnection(),

instrumentation/nats/nats-2.21/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_21/internal/NatsRequestMessagingAttributesGetter.java

Lines changed: 6 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -5,82 +5,15 @@
55

66
package io.opentelemetry.instrumentation.nats.v2_21.internal;
77

8-
import io.nats.client.impl.Headers;
98
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
10-
import java.util.Collections;
11-
import java.util.List;
12-
import javax.annotation.Nullable;
139

14-
enum NatsRequestMessagingAttributesGetter implements MessagingAttributesGetter<NatsRequest, Void> {
15-
INSTANCE;
10+
class NatsRequestMessagingAttributesGetter {
1611

17-
@Nullable
18-
@Override
19-
public String getSystem(NatsRequest request) {
20-
return "nats";
21-
}
12+
static final MessagingAttributesGetter<NatsRequest, Void> VOID_INSTANCE =
13+
NatsRequestMessagingAttributesGetterFactory.create();
2214

23-
@Nullable
24-
@Override
25-
public String getDestination(NatsRequest request) {
26-
return request.getSubject();
27-
}
15+
static final MessagingAttributesGetter<NatsRequest, NatsRequest> NATS_REQUEST_INSTANCE =
16+
NatsRequestMessagingAttributesGetterFactory.create();
2817

29-
@Nullable
30-
@Override
31-
public String getDestinationTemplate(NatsRequest request) {
32-
return null;
33-
}
34-
35-
@Override
36-
public boolean isTemporaryDestination(NatsRequest request) {
37-
return false;
38-
}
39-
40-
@Override
41-
public boolean isAnonymousDestination(NatsRequest request) {
42-
return false;
43-
}
44-
45-
@Nullable
46-
@Override
47-
public String getConversationId(NatsRequest request) {
48-
return null;
49-
}
50-
51-
@Nullable
52-
@Override
53-
public Long getMessageBodySize(NatsRequest request) {
54-
return request.getDataSize();
55-
}
56-
57-
@Nullable
58-
@Override
59-
public Long getMessageEnvelopeSize(NatsRequest request) {
60-
return null;
61-
}
62-
63-
@Nullable
64-
@Override
65-
public String getMessageId(NatsRequest request, @Nullable Void unused) {
66-
return null;
67-
}
68-
69-
@Nullable
70-
@Override
71-
public String getClientId(NatsRequest request) {
72-
return String.valueOf(request.getClientId());
73-
}
74-
75-
@Nullable
76-
@Override
77-
public Long getBatchMessageCount(NatsRequest request, @Nullable Void unused) {
78-
return null;
79-
}
80-
81-
@Override
82-
public List<String> getMessageHeader(NatsRequest request, String name) {
83-
Headers headers = request.getHeaders();
84-
return headers == null ? Collections.emptyList() : headers.get(name);
85-
}
18+
private NatsRequestMessagingAttributesGetter() {}
8619
}

0 commit comments

Comments
 (0)