Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.nats.v2_17;

import static io.opentelemetry.instrumentation.nats.v2_17.NatsTelemetryBuilder.DEFAULT_TEMPORARY_PREFIXES;
import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createConsumerProcessInstrumenter;
import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createProducerInstrumenter;

Expand All @@ -20,10 +21,12 @@ public final class NatsSingletons {
ExperimentalConfig.get().getMessagingHeaders();

public static final Instrumenter<NatsRequest, NatsRequest> PRODUCER_INSTRUMENTER =
createProducerInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders);
createProducerInstrumenter(
GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PREFIXES);

public static final Instrumenter<NatsRequest, Void> CONSUMER_PROCESS_INSTRUMENTER =
createConsumerProcessInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders);
createConsumerProcessInstrumenter(
GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PREFIXES);

private NatsSingletons() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,24 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

/** A builder of {@link NatsTelemetry}. */
public final class NatsTelemetryBuilder {

private final OpenTelemetry openTelemetry;

private List<String> capturedHeaders = emptyList();
private List<String> temporaryPrefixes = DEFAULT_TEMPORARY_PREFIXES;

/**
* _INBOX. is the prefix used in the NATS Java client for request/reply. Usually one-off
* subscription is used for each request. _R_. is the prefix used in NodeJS environments for
* request/reply. There is only one shared subscription per connection for all requests.
*/
public static final List<String> DEFAULT_TEMPORARY_PREFIXES = Arrays.asList("_INBOX.", "_R_.");

NatsTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
Expand All @@ -35,10 +45,23 @@ public NatsTelemetryBuilder setCapturedHeaders(Collection<String> capturedHeader
return this;
}

/**
* Configures the prefixes used for temporary subjects.
*
* @param temporaryPrefixes A list of prefixes.
*/
@CanIgnoreReturnValue
public NatsTelemetryBuilder setTemporaryPrefixes(Collection<String> temporaryPrefixes) {
this.temporaryPrefixes = new ArrayList<>(temporaryPrefixes);
return this;
}

/** Returns a new {@link NatsTelemetry} with the settings of this {@link NatsTelemetryBuilder}. */
public NatsTelemetry build() {
return new NatsTelemetry(
NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry, capturedHeaders),
NatsInstrumenterFactory.createConsumerProcessInstrumenter(openTelemetry, capturedHeaders));
NatsInstrumenterFactory.createProducerInstrumenter(
openTelemetry, capturedHeaders, temporaryPrefixes),
NatsInstrumenterFactory.createConsumerProcessInstrumenter(
openTelemetry, capturedHeaders, temporaryPrefixes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,35 @@ public final class NatsInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.nats-2.17";

public static Instrumenter<NatsRequest, NatsRequest> createProducerInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders) {
OpenTelemetry openTelemetry, List<String> capturedHeaders, List<String> temporaryPrefixes) {
return Instrumenter.<NatsRequest, NatsRequest>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH))
new NatsRequestMessagingAttributesGetter(temporaryPrefixes),
MessageOperation.PUBLISH))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH)
new NatsRequestMessagingAttributesGetter(temporaryPrefixes),
MessageOperation.PUBLISH)
.setCapturedHeaders(capturedHeaders)
.build())
.buildProducerInstrumenter(NatsRequestTextMapSetter.INSTANCE);
}

public static Instrumenter<NatsRequest, Void> createConsumerProcessInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders) {
OpenTelemetry openTelemetry, List<String> capturedHeaders, List<String> temporaryPrefixes) {
InstrumenterBuilder<NatsRequest, Void> builder =
Instrumenter.<NatsRequest, Void>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS))
new NatsRequestMessagingAttributesGetter(temporaryPrefixes),
MessageOperation.PROCESS))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS)
new NatsRequestMessagingAttributesGetter(temporaryPrefixes),
MessageOperation.PROCESS)
.setCapturedHeaders(capturedHeaders)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import java.util.List;
import javax.annotation.Nullable;

enum NatsRequestMessagingAttributesGetter
class NatsRequestMessagingAttributesGetter
implements MessagingAttributesGetter<NatsRequest, Object> {
INSTANCE;
private final List<String> temporaryPrefixes;

public NatsRequestMessagingAttributesGetter(List<String> temporaryPrefixes) {
this.temporaryPrefixes = temporaryPrefixes;
}

@Override
public String getSystem(NatsRequest request) {
Expand All @@ -29,15 +33,12 @@ public String getDestination(NatsRequest request) {
@Nullable
@Override
public String getDestinationTemplate(NatsRequest request) {
if (isTemporaryDestination(request)) {
return request.getInboxPrefix();
}
return null;
return getTemporaryPrefix(request);
}

@Override
public boolean isTemporaryDestination(NatsRequest request) {
return request.getSubject().startsWith(request.getInboxPrefix());
return getTemporaryPrefix(request) != null;
}

@Override
Expand Down Expand Up @@ -88,4 +89,21 @@ public List<String> getMessageHeader(NatsRequest request, String name) {
List<String> result = headers.get(name);
return result == null ? Collections.emptyList() : result;
}

/**
* @return the temporary prefix used for this request, or null
*/
private String getTemporaryPrefix(NatsRequest request) {
if (request.getSubject().startsWith(request.getInboxPrefix())) {
return request.getInboxPrefix();
}

for (String prefix : temporaryPrefixes) {
if (request.getSubject().startsWith(prefix)) {
return prefix;
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.assertTraceparentHeader;
import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.messagingAttributes;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_TEMPORARY;

import io.nats.client.Subscription;
import io.nats.client.impl.Headers;
Expand Down Expand Up @@ -101,6 +103,17 @@ void testPublishMessageWithHeaders() throws InterruptedException {
assertTraceparentHeader(subscription);
}

@Test
void testPublishMessageTemporarySubject() throws InterruptedException {
NatsMessage message = NatsMessage.builder().subject("_R_.qQzIWn.0JuCnu").data("x").build();

// when
testing().runWithSpan("parent", () -> connection.publish(message));

// then
assertTemporaryPublishSpan();
}

private void assertPublishSpan() {
testing()
.waitAndAssertTraces(
Expand All @@ -114,4 +127,22 @@ private void assertPublishSpan() {
.hasAttributesSatisfyingExactly(
messagingAttributes("publish", "sub", clientId))));
}

private void assertTemporaryPublishSpan() {
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent(),
span ->
span.hasName("(temporary) publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
messagingAttributes(
"publish",
"(temporary)",
clientId,
equalTo(MESSAGING_DESTINATION_TEMPORARY, true)))));
}
}
Loading