From 5f843ac9c8aab0e552c2b073422ecbfde02c4a23 Mon Sep 17 00:00:00 2001 From: Alix Date: Fri, 26 Dec 2025 19:19:51 +0100 Subject: [PATCH 1/5] Allow custom temporary NATS subjects detection --- .../nats/v2_17/NatsSingletons.java | 7 ++-- .../nats/v2_17/NatsTelemetryBuilder.java | 27 ++++++++++++++-- .../internal/NatsInstrumenterFactory.java | 16 ++++++---- .../NatsRequestMessagingAttributesGetter.java | 32 +++++++++++++++---- .../nats/v2_17/AbstractNatsPublishTest.java | 31 ++++++++++++++++++ 5 files changed, 96 insertions(+), 17 deletions(-) diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java index 13e95063e94c..42f2c52f4bec 100644 --- a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java @@ -5,6 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.nats.v2_17; +import static io.opentelemetry.instrumentation.nats.v2_17.NatsTelemetryBuilder.DEFAULT_TEMPORARY_PREFIXES; import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createConsumerProcessInstrumenter; import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createProducerInstrumenter; @@ -20,10 +21,12 @@ public final class NatsSingletons { ExperimentalConfig.get().getMessagingHeaders(); public static final Instrumenter PRODUCER_INSTRUMENTER = - createProducerInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders); + createProducerInstrumenter( + GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PREFIXES); public static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER = - createConsumerProcessInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders); + createConsumerProcessInstrumenter( + GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PREFIXES); private NatsSingletons() {} } diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java index c125d371e64e..1252e70e8d3d 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java @@ -11,6 +11,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -18,7 +19,16 @@ public final class NatsTelemetryBuilder { private final OpenTelemetry openTelemetry; + private List capturedHeaders = emptyList(); + private List temporaryPrefixes = DEFAULT_TEMPORARY_PREFIXES; + + /** + * _INBOX. is the prefix used in the NATS Java client for request/reply. Usually one-off + * subscription is used for each request. _R_. is the prefix used in NodeJS environments for + * request/reply. There is only one shared subscription per connection for all requests. + */ + public static final List DEFAULT_TEMPORARY_PREFIXES = Arrays.asList("_INBOX.", "_R_."); NatsTelemetryBuilder(OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; @@ -35,10 +45,23 @@ public NatsTelemetryBuilder setCapturedHeaders(Collection capturedHeader return this; } + /** + * Configures the prefixes used for temporary subjects. + * + * @param temporaryPrefixes A list of prefixes. + */ + @CanIgnoreReturnValue + public NatsTelemetryBuilder setTemporaryPrefixes(Collection temporaryPrefixes) { + this.temporaryPrefixes = new ArrayList<>(temporaryPrefixes); + return this; + } + /** Returns a new {@link NatsTelemetry} with the settings of this {@link NatsTelemetryBuilder}. */ public NatsTelemetry build() { return new NatsTelemetry( - NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry, capturedHeaders), - NatsInstrumenterFactory.createConsumerProcessInstrumenter(openTelemetry, capturedHeaders)); + NatsInstrumenterFactory.createProducerInstrumenter( + openTelemetry, capturedHeaders, temporaryPrefixes), + NatsInstrumenterFactory.createConsumerProcessInstrumenter( + openTelemetry, capturedHeaders, temporaryPrefixes)); } } diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java index 303335627e01..3c29ce35dee7 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java @@ -21,31 +21,35 @@ public final class NatsInstrumenterFactory { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.nats-2.17"; public static Instrumenter createProducerInstrumenter( - OpenTelemetry openTelemetry, List capturedHeaders) { + OpenTelemetry openTelemetry, List capturedHeaders, List temporaryPrefixes) { return Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create( - NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH)) + new NatsRequestMessagingAttributesGetter(temporaryPrefixes), + MessageOperation.PUBLISH)) .addAttributesExtractor( MessagingAttributesExtractor.builder( - NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH) + new NatsRequestMessagingAttributesGetter(temporaryPrefixes), + MessageOperation.PUBLISH) .setCapturedHeaders(capturedHeaders) .build()) .buildProducerInstrumenter(NatsRequestTextMapSetter.INSTANCE); } public static Instrumenter createConsumerProcessInstrumenter( - OpenTelemetry openTelemetry, List capturedHeaders) { + OpenTelemetry openTelemetry, List capturedHeaders, List temporaryPrefixes) { InstrumenterBuilder builder = Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create( - NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS)) + new NatsRequestMessagingAttributesGetter(temporaryPrefixes), + MessageOperation.PROCESS)) .addAttributesExtractor( MessagingAttributesExtractor.builder( - NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS) + new NatsRequestMessagingAttributesGetter(temporaryPrefixes), + MessageOperation.PROCESS) .setCapturedHeaders(capturedHeaders) .build()); diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java index c1daa68711f4..04205c723708 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java @@ -11,9 +11,13 @@ import java.util.List; import javax.annotation.Nullable; -enum NatsRequestMessagingAttributesGetter +class NatsRequestMessagingAttributesGetter implements MessagingAttributesGetter { - INSTANCE; + private final List temporaryPrefixes; + + public NatsRequestMessagingAttributesGetter(List temporaryPrefixes) { + this.temporaryPrefixes = temporaryPrefixes; + } @Override public String getSystem(NatsRequest request) { @@ -29,15 +33,12 @@ public String getDestination(NatsRequest request) { @Nullable @Override public String getDestinationTemplate(NatsRequest request) { - if (isTemporaryDestination(request)) { - return request.getInboxPrefix(); - } - return null; + return getTemporaryPrefix(request); } @Override public boolean isTemporaryDestination(NatsRequest request) { - return request.getSubject().startsWith(request.getInboxPrefix()); + return getTemporaryPrefix(request) != null; } @Override @@ -88,4 +89,21 @@ public List getMessageHeader(NatsRequest request, String name) { List result = headers.get(name); return result == null ? Collections.emptyList() : result; } + + /** + * @return the temporary prefix used for this request, or null + */ + private String getTemporaryPrefix(NatsRequest request) { + if (request.getSubject().startsWith(request.getInboxPrefix())) { + return request.getInboxPrefix(); + } + + for (String prefix : temporaryPrefixes) { + if (request.getSubject().startsWith(prefix)) { + return prefix; + } + } + + return null; + } } diff --git a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java index 9b89d7bae129..3d5d83d1b2a6 100644 --- a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java +++ b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java @@ -7,6 +7,8 @@ import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.assertTraceparentHeader; import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.messagingAttributes; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_TEMPORARY; import io.nats.client.Subscription; import io.nats.client.impl.Headers; @@ -101,6 +103,17 @@ void testPublishMessageWithHeaders() throws InterruptedException { assertTraceparentHeader(subscription); } + @Test + void testPublishMessageTemporarySubject() throws InterruptedException { + NatsMessage message = NatsMessage.builder().subject("_R_.qQzIWn.0JuCnu").data("x").build(); + + // when + testing().runWithSpan("parent", () -> connection.publish(message)); + + // then + assertTemporaryPublishSpan(); + } + private void assertPublishSpan() { testing() .waitAndAssertTraces( @@ -114,4 +127,22 @@ private void assertPublishSpan() { .hasAttributesSatisfyingExactly( messagingAttributes("publish", "sub", clientId)))); } + + private void assertTemporaryPublishSpan() { + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("(temporary) publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + messagingAttributes( + "publish", + "(temporary)", + clientId, + equalTo(MESSAGING_DESTINATION_TEMPORARY, true))))); + } } From bf1f4f9b7ef7d8e892300db08e989c8310a9cab8 Mon Sep 17 00:00:00 2001 From: Alix Date: Fri, 26 Dec 2025 22:25:47 +0100 Subject: [PATCH 2/5] fix Muzzle --- .../nats/v2_17/NatsInstrumentationModule.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java index 3c48286120d2..c1da2d23345a 100644 --- a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java @@ -21,6 +21,11 @@ public NatsInstrumentationModule() { super("nats", "nats-2.17"); } + @Override + public boolean isHelperClass(String className) { + return className.startsWith("io.nats.client.impl.OpenTelemetryDispatcherFactory"); + } + @Override public List typeInstrumentations() { return asList( From c661b22e1513af2bce4f2564738b4a16c3a96529 Mon Sep 17 00:00:00 2001 From: Alix Date: Mon, 29 Dec 2025 12:37:00 +0100 Subject: [PATCH 3/5] use pattern instead of prefix --- .../nats/v2_17/NatsSingletons.java | 6 ++-- .../nats/v2_17/NatsTelemetryBuilder.java | 28 +++++++++++++------ .../internal/NatsInstrumenterFactory.java | 13 +++++---- .../NatsRequestMessagingAttributesGetter.java | 25 +++++++++-------- .../v2_17/internal/NatsSubjectPattern.java | 22 +++++++++++++++ 5 files changed, 66 insertions(+), 28 deletions(-) create mode 100644 instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsSubjectPattern.java diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java index 42f2c52f4bec..d0614fedd3c3 100644 --- a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java @@ -5,7 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.nats.v2_17; -import static io.opentelemetry.instrumentation.nats.v2_17.NatsTelemetryBuilder.DEFAULT_TEMPORARY_PREFIXES; +import static io.opentelemetry.instrumentation.nats.v2_17.NatsTelemetryBuilder.DEFAULT_TEMPORARY_PATTERNS; import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createConsumerProcessInstrumenter; import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createProducerInstrumenter; @@ -22,11 +22,11 @@ public final class NatsSingletons { public static final Instrumenter PRODUCER_INSTRUMENTER = createProducerInstrumenter( - GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PREFIXES); + GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PATTERNS); public static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER = createConsumerProcessInstrumenter( - GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PREFIXES); + GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PATTERNS); private NatsSingletons() {} } diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java index 1252e70e8d3d..7371be39b284 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java @@ -10,10 +10,12 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsSubjectPattern; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.regex.Pattern; /** A builder of {@link NatsTelemetry}. */ public final class NatsTelemetryBuilder { @@ -21,14 +23,15 @@ public final class NatsTelemetryBuilder { private final OpenTelemetry openTelemetry; private List capturedHeaders = emptyList(); - private List temporaryPrefixes = DEFAULT_TEMPORARY_PREFIXES; + private List temporaryPatterns = DEFAULT_TEMPORARY_PATTERNS; /** * _INBOX. is the prefix used in the NATS Java client for request/reply. Usually one-off * subscription is used for each request. _R_. is the prefix used in NodeJS environments for * request/reply. There is only one shared subscription per connection for all requests. */ - public static final List DEFAULT_TEMPORARY_PREFIXES = Arrays.asList("_INBOX.", "_R_."); + public static final List DEFAULT_TEMPORARY_PATTERNS = + Arrays.asList(NatsSubjectPattern.compile("_INBOX.*"), NatsSubjectPattern.compile("_R_.*")); NatsTelemetryBuilder(OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; @@ -46,13 +49,22 @@ public NatsTelemetryBuilder setCapturedHeaders(Collection capturedHeader } /** - * Configures the prefixes used for temporary subjects. + * Configures the patterns used for temporary subjects. * - * @param temporaryPrefixes A list of prefixes. + * @param temporaryPatterns A list of patterns. */ @CanIgnoreReturnValue - public NatsTelemetryBuilder setTemporaryPrefixes(Collection temporaryPrefixes) { - this.temporaryPrefixes = new ArrayList<>(temporaryPrefixes); + public NatsTelemetryBuilder setTemporaryPatterns(Collection temporaryPatterns) { + this.temporaryPatterns = new ArrayList<>(temporaryPatterns); + return this; + } + + @CanIgnoreReturnValue + public NatsTelemetryBuilder setTemporarySubjects(Collection temporarySubjects) { + this.temporaryPatterns = new ArrayList<>(); + for (String subject : temporarySubjects) { + this.temporaryPatterns.add(NatsSubjectPattern.compile(subject)); + } return this; } @@ -60,8 +72,8 @@ public NatsTelemetryBuilder setTemporaryPrefixes(Collection temporaryPre public NatsTelemetry build() { return new NatsTelemetry( NatsInstrumenterFactory.createProducerInstrumenter( - openTelemetry, capturedHeaders, temporaryPrefixes), + openTelemetry, capturedHeaders, temporaryPatterns), NatsInstrumenterFactory.createConsumerProcessInstrumenter( - openTelemetry, capturedHeaders, temporaryPrefixes)); + openTelemetry, capturedHeaders, temporaryPatterns)); } } diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java index 3c29ce35dee7..298801680677 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java @@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import java.util.List; +import java.util.regex.Pattern; /** * This class is internal and is hence not for public use. Its APIs are unstable and can change at @@ -21,16 +22,16 @@ public final class NatsInstrumenterFactory { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.nats-2.17"; public static Instrumenter createProducerInstrumenter( - OpenTelemetry openTelemetry, List capturedHeaders, List temporaryPrefixes) { + OpenTelemetry openTelemetry, List capturedHeaders, List temporaryPatterns) { return Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create( - new NatsRequestMessagingAttributesGetter(temporaryPrefixes), + new NatsRequestMessagingAttributesGetter(temporaryPatterns), MessageOperation.PUBLISH)) .addAttributesExtractor( MessagingAttributesExtractor.builder( - new NatsRequestMessagingAttributesGetter(temporaryPrefixes), + new NatsRequestMessagingAttributesGetter(temporaryPatterns), MessageOperation.PUBLISH) .setCapturedHeaders(capturedHeaders) .build()) @@ -38,17 +39,17 @@ public static Instrumenter createProducerInstrumenter( } public static Instrumenter createConsumerProcessInstrumenter( - OpenTelemetry openTelemetry, List capturedHeaders, List temporaryPrefixes) { + OpenTelemetry openTelemetry, List capturedHeaders, List temporaryPatterns) { InstrumenterBuilder builder = Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create( - new NatsRequestMessagingAttributesGetter(temporaryPrefixes), + new NatsRequestMessagingAttributesGetter(temporaryPatterns), MessageOperation.PROCESS)) .addAttributesExtractor( MessagingAttributesExtractor.builder( - new NatsRequestMessagingAttributesGetter(temporaryPrefixes), + new NatsRequestMessagingAttributesGetter(temporaryPatterns), MessageOperation.PROCESS) .setCapturedHeaders(capturedHeaders) .build()); diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java index 04205c723708..c84833bf6f5b 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java @@ -9,14 +9,16 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; import java.util.Collections; import java.util.List; +import java.util.regex.Pattern; import javax.annotation.Nullable; class NatsRequestMessagingAttributesGetter implements MessagingAttributesGetter { - private final List temporaryPrefixes; - public NatsRequestMessagingAttributesGetter(List temporaryPrefixes) { - this.temporaryPrefixes = temporaryPrefixes; + private final List temporaryPatterns; + + public NatsRequestMessagingAttributesGetter(List temporaryPatterns) { + this.temporaryPatterns = temporaryPatterns; } @Override @@ -33,12 +35,13 @@ public String getDestination(NatsRequest request) { @Nullable @Override public String getDestinationTemplate(NatsRequest request) { - return getTemporaryPrefix(request); + Pattern pattern = getTemporaryPattern(request); + return pattern == null ? null : pattern.pattern(); } @Override public boolean isTemporaryDestination(NatsRequest request) { - return getTemporaryPrefix(request) != null; + return getTemporaryPattern(request) != null; } @Override @@ -91,16 +94,16 @@ public List getMessageHeader(NatsRequest request, String name) { } /** - * @return the temporary prefix used for this request, or null + * @return the temporary pattern used for this request, or null */ - private String getTemporaryPrefix(NatsRequest request) { + private Pattern getTemporaryPattern(NatsRequest request) { if (request.getSubject().startsWith(request.getInboxPrefix())) { - return request.getInboxPrefix(); + return NatsSubjectPattern.compile(request.getInboxPrefix() + "*"); } - for (String prefix : temporaryPrefixes) { - if (request.getSubject().startsWith(prefix)) { - return prefix; + for (Pattern pattern : temporaryPatterns) { + if (pattern.matcher(request.getSubject()).matches()) { + return pattern; } } diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsSubjectPattern.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsSubjectPattern.java new file mode 100644 index 000000000000..f7339b21943b --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsSubjectPattern.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17.internal; + +import java.util.regex.Pattern; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. Exposed for {@link io.nats.client.impl.OpenTelemetryDispatcherFactory}. + */ +public class NatsSubjectPattern { + + private NatsSubjectPattern() {} + + public static Pattern compile(String subject) { + return Pattern.compile( + "^" + subject.replace(".", "\\.").replace(">", "*").replace("*", ".*") + "$"); + } +} From 7945c04dbcf2e41ad2bb7186d4e4968f5df473d9 Mon Sep 17 00:00:00 2001 From: Alix Date: Mon, 29 Dec 2025 20:58:12 +0100 Subject: [PATCH 4/5] remove wrong comment --- .../instrumentation/nats/v2_17/NatsTelemetryBuilder.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java index 7371be39b284..1770333ce751 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java @@ -25,11 +25,6 @@ public final class NatsTelemetryBuilder { private List capturedHeaders = emptyList(); private List temporaryPatterns = DEFAULT_TEMPORARY_PATTERNS; - /** - * _INBOX. is the prefix used in the NATS Java client for request/reply. Usually one-off - * subscription is used for each request. _R_. is the prefix used in NodeJS environments for - * request/reply. There is only one shared subscription per connection for all requests. - */ public static final List DEFAULT_TEMPORARY_PATTERNS = Arrays.asList(NatsSubjectPattern.compile("_INBOX.*"), NatsSubjectPattern.compile("_R_.*")); From 281c5a3eae36c60d6981a3ce74786f1297430686 Mon Sep 17 00:00:00 2001 From: Alix Date: Tue, 30 Dec 2025 08:24:04 +0100 Subject: [PATCH 5/5] add comment on setTemporarySubjects --- .../instrumentation/nats/v2_17/NatsTelemetryBuilder.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java index 1770333ce751..9a1a62cf03e2 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java @@ -44,7 +44,7 @@ public NatsTelemetryBuilder setCapturedHeaders(Collection capturedHeader } /** - * Configures the patterns used for temporary subjects. + * Configures the patterns used for temporary subjects. Eg: `^_INBOX\\..+$` * * @param temporaryPatterns A list of patterns. */ @@ -54,6 +54,11 @@ public NatsTelemetryBuilder setTemporaryPatterns(Collection temporaryPa return this; } + /** + * Configures the subjects used for temporary subjects. Eg: `_INBOX.*` + * + * @param temporarySubjects A list of subjects. + */ @CanIgnoreReturnValue public NatsTelemetryBuilder setTemporarySubjects(Collection temporarySubjects) { this.temporaryPatterns = new ArrayList<>();