diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java index 3c48286120d2..c1da2d23345a 100644 --- a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java @@ -21,6 +21,11 @@ public NatsInstrumentationModule() { super("nats", "nats-2.17"); } + @Override + public boolean isHelperClass(String className) { + return className.startsWith("io.nats.client.impl.OpenTelemetryDispatcherFactory"); + } + @Override public List typeInstrumentations() { return asList( diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java index 13e95063e94c..d0614fedd3c3 100644 --- a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java @@ -5,6 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.nats.v2_17; +import static io.opentelemetry.instrumentation.nats.v2_17.NatsTelemetryBuilder.DEFAULT_TEMPORARY_PATTERNS; import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createConsumerProcessInstrumenter; import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createProducerInstrumenter; @@ -20,10 +21,12 @@ public final class NatsSingletons { ExperimentalConfig.get().getMessagingHeaders(); public static final Instrumenter PRODUCER_INSTRUMENTER = - createProducerInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders); + createProducerInstrumenter( + GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PATTERNS); public static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER = - createConsumerProcessInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders); + createConsumerProcessInstrumenter( + GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PATTERNS); private NatsSingletons() {} } diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java index c125d371e64e..9a1a62cf03e2 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java @@ -10,15 +10,23 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsSubjectPattern; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.regex.Pattern; /** A builder of {@link NatsTelemetry}. */ public final class NatsTelemetryBuilder { private final OpenTelemetry openTelemetry; + private List capturedHeaders = emptyList(); + private List temporaryPatterns = DEFAULT_TEMPORARY_PATTERNS; + + public static final List DEFAULT_TEMPORARY_PATTERNS = + Arrays.asList(NatsSubjectPattern.compile("_INBOX.*"), NatsSubjectPattern.compile("_R_.*")); NatsTelemetryBuilder(OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; @@ -35,10 +43,37 @@ public NatsTelemetryBuilder setCapturedHeaders(Collection capturedHeader return this; } + /** + * Configures the patterns used for temporary subjects. Eg: `^_INBOX\\..+$` + * + * @param temporaryPatterns A list of patterns. + */ + @CanIgnoreReturnValue + public NatsTelemetryBuilder setTemporaryPatterns(Collection temporaryPatterns) { + this.temporaryPatterns = new ArrayList<>(temporaryPatterns); + return this; + } + + /** + * Configures the subjects used for temporary subjects. Eg: `_INBOX.*` + * + * @param temporarySubjects A list of subjects. + */ + @CanIgnoreReturnValue + public NatsTelemetryBuilder setTemporarySubjects(Collection temporarySubjects) { + this.temporaryPatterns = new ArrayList<>(); + for (String subject : temporarySubjects) { + this.temporaryPatterns.add(NatsSubjectPattern.compile(subject)); + } + return this; + } + /** Returns a new {@link NatsTelemetry} with the settings of this {@link NatsTelemetryBuilder}. */ public NatsTelemetry build() { return new NatsTelemetry( - NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry, capturedHeaders), - NatsInstrumenterFactory.createConsumerProcessInstrumenter(openTelemetry, capturedHeaders)); + NatsInstrumenterFactory.createProducerInstrumenter( + openTelemetry, capturedHeaders, temporaryPatterns), + NatsInstrumenterFactory.createConsumerProcessInstrumenter( + openTelemetry, capturedHeaders, temporaryPatterns)); } } diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java index 303335627e01..298801680677 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java @@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import java.util.List; +import java.util.regex.Pattern; /** * This class is internal and is hence not for public use. Its APIs are unstable and can change at @@ -21,31 +22,35 @@ public final class NatsInstrumenterFactory { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.nats-2.17"; public static Instrumenter createProducerInstrumenter( - OpenTelemetry openTelemetry, List capturedHeaders) { + OpenTelemetry openTelemetry, List capturedHeaders, List temporaryPatterns) { return Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create( - NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH)) + new NatsRequestMessagingAttributesGetter(temporaryPatterns), + MessageOperation.PUBLISH)) .addAttributesExtractor( MessagingAttributesExtractor.builder( - NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH) + new NatsRequestMessagingAttributesGetter(temporaryPatterns), + MessageOperation.PUBLISH) .setCapturedHeaders(capturedHeaders) .build()) .buildProducerInstrumenter(NatsRequestTextMapSetter.INSTANCE); } public static Instrumenter createConsumerProcessInstrumenter( - OpenTelemetry openTelemetry, List capturedHeaders) { + OpenTelemetry openTelemetry, List capturedHeaders, List temporaryPatterns) { InstrumenterBuilder builder = Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create( - NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS)) + new NatsRequestMessagingAttributesGetter(temporaryPatterns), + MessageOperation.PROCESS)) .addAttributesExtractor( MessagingAttributesExtractor.builder( - NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS) + new NatsRequestMessagingAttributesGetter(temporaryPatterns), + MessageOperation.PROCESS) .setCapturedHeaders(capturedHeaders) .build()); diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java index c1daa68711f4..c84833bf6f5b 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java @@ -9,11 +9,17 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; import java.util.Collections; import java.util.List; +import java.util.regex.Pattern; import javax.annotation.Nullable; -enum NatsRequestMessagingAttributesGetter +class NatsRequestMessagingAttributesGetter implements MessagingAttributesGetter { - INSTANCE; + + private final List temporaryPatterns; + + public NatsRequestMessagingAttributesGetter(List temporaryPatterns) { + this.temporaryPatterns = temporaryPatterns; + } @Override public String getSystem(NatsRequest request) { @@ -29,15 +35,13 @@ public String getDestination(NatsRequest request) { @Nullable @Override public String getDestinationTemplate(NatsRequest request) { - if (isTemporaryDestination(request)) { - return request.getInboxPrefix(); - } - return null; + Pattern pattern = getTemporaryPattern(request); + return pattern == null ? null : pattern.pattern(); } @Override public boolean isTemporaryDestination(NatsRequest request) { - return request.getSubject().startsWith(request.getInboxPrefix()); + return getTemporaryPattern(request) != null; } @Override @@ -88,4 +92,21 @@ public List getMessageHeader(NatsRequest request, String name) { List result = headers.get(name); return result == null ? Collections.emptyList() : result; } + + /** + * @return the temporary pattern used for this request, or null + */ + private Pattern getTemporaryPattern(NatsRequest request) { + if (request.getSubject().startsWith(request.getInboxPrefix())) { + return NatsSubjectPattern.compile(request.getInboxPrefix() + "*"); + } + + for (Pattern pattern : temporaryPatterns) { + if (pattern.matcher(request.getSubject()).matches()) { + return pattern; + } + } + + return null; + } } diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsSubjectPattern.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsSubjectPattern.java new file mode 100644 index 000000000000..f7339b21943b --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsSubjectPattern.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17.internal; + +import java.util.regex.Pattern; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. Exposed for {@link io.nats.client.impl.OpenTelemetryDispatcherFactory}. + */ +public class NatsSubjectPattern { + + private NatsSubjectPattern() {} + + public static Pattern compile(String subject) { + return Pattern.compile( + "^" + subject.replace(".", "\\.").replace(">", "*").replace("*", ".*") + "$"); + } +} diff --git a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java index 9b89d7bae129..3d5d83d1b2a6 100644 --- a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java +++ b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java @@ -7,6 +7,8 @@ import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.assertTraceparentHeader; import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.messagingAttributes; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_TEMPORARY; import io.nats.client.Subscription; import io.nats.client.impl.Headers; @@ -101,6 +103,17 @@ void testPublishMessageWithHeaders() throws InterruptedException { assertTraceparentHeader(subscription); } + @Test + void testPublishMessageTemporarySubject() throws InterruptedException { + NatsMessage message = NatsMessage.builder().subject("_R_.qQzIWn.0JuCnu").data("x").build(); + + // when + testing().runWithSpan("parent", () -> connection.publish(message)); + + // then + assertTemporaryPublishSpan(); + } + private void assertPublishSpan() { testing() .waitAndAssertTraces( @@ -114,4 +127,22 @@ private void assertPublishSpan() { .hasAttributesSatisfyingExactly( messagingAttributes("publish", "sub", clientId)))); } + + private void assertTemporaryPublishSpan() { + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("(temporary) publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + messagingAttributes( + "publish", + "(temporary)", + clientId, + equalTo(MESSAGING_DESTINATION_TEMPORARY, true))))); + } }