Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ public NatsInstrumentationModule() {
super("nats", "nats-2.17");
}

@Override
public boolean isHelperClass(String className) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do I understand correctly that this isn't really needed for the javaagent instrumentation to function since this class is only used by the library instrumentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but Muzzle complained because the package of the OpenTelemetryDispatcherFactory is set to io.nats.client.impl. to access package-private classes.

[otel.javaagent 2025-12-26 18:28:06:156 +0000] [Test worker] 
WARN io.opentelemetry.javaagent.tooling.instrumentation.MuzzleMatcher - 
Instrumentation skipped, mismatched references were found: nats 
[class io.opentelemetry.javaagent.instrumentation.nats.v2_17.NatsInstrumentationModule] on sun.misc.Launcher$AppClassLoader@73d16e93


[otel.javaagent 2025-12-26 18:28:06:176 +0000] [Test worker] 
WARN io.opentelemetry.javaagent.tooling.instrumentation.MuzzleMatcher - 
-- io.opentelemetry.javaagent.shaded.instrumentation.nats.v2_17.NatsTelemetry:66 
Missing method io.nats.client.impl.OpenTelemetryDispatcherFactory#<init>(
  Lio/nats/client/impl/DispatcherFactory;
  Lio/opentelemetry/javaagent/shaded/instrumentation/api/instrumenter/Instrumenter;
)V

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just curios why it wasn't needed previously. Can't spot what in the current PR requires adding it, but otherwise I'm fine with adding it.

return className.startsWith("io.nats.client.impl.OpenTelemetryDispatcherFactory");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.nats.v2_17;

import static io.opentelemetry.instrumentation.nats.v2_17.NatsTelemetryBuilder.DEFAULT_TEMPORARY_PATTERNS;
import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createConsumerProcessInstrumenter;
import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createProducerInstrumenter;

Expand All @@ -20,10 +21,12 @@ public final class NatsSingletons {
ExperimentalConfig.get().getMessagingHeaders();

public static final Instrumenter<NatsRequest, NatsRequest> PRODUCER_INSTRUMENTER =
createProducerInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders);
createProducerInstrumenter(
GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PATTERNS);

public static final Instrumenter<NatsRequest, Void> CONSUMER_PROCESS_INSTRUMENTER =
createConsumerProcessInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders);
createConsumerProcessInstrumenter(
GlobalOpenTelemetry.get(), capturedHeaders, DEFAULT_TEMPORARY_PATTERNS);

private NatsSingletons() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory;
import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsSubjectPattern;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.regex.Pattern;

/** A builder of {@link NatsTelemetry}. */
public final class NatsTelemetryBuilder {

private final OpenTelemetry openTelemetry;

private List<String> capturedHeaders = emptyList();
private List<Pattern> temporaryPatterns = DEFAULT_TEMPORARY_PATTERNS;

public static final List<Pattern> DEFAULT_TEMPORARY_PATTERNS =
Arrays.asList(NatsSubjectPattern.compile("_INBOX.*"), NatsSubjectPattern.compile("_R_.*"));

NatsTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
Expand All @@ -35,10 +43,37 @@ public NatsTelemetryBuilder setCapturedHeaders(Collection<String> capturedHeader
return this;
}

/**
* Configures the patterns used for temporary subjects. Eg: `^_INBOX\\..+$`
*
* @param temporaryPatterns A list of patterns.
*/
@CanIgnoreReturnValue
public NatsTelemetryBuilder setTemporaryPatterns(Collection<Pattern> temporaryPatterns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be better to let users specify prefixes for temporary subjects? Or do you believe there are use cases where the more powerful matching capability of regex is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know all usages but it might not always be prefixes. I guess one could use some.*.temporary.subject.> as the doc allows usage of both * and >. We can start with prefixes if it's simpler.

this.temporaryPatterns = new ArrayList<>(temporaryPatterns);
return this;
}

/**
* Configures the subjects used for temporary subjects. Eg: `_INBOX.*`
*
* @param temporarySubjects A list of subjects.
*/
@CanIgnoreReturnValue
public NatsTelemetryBuilder setTemporarySubjects(Collection<String> temporarySubjects) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo in its current for too much effort is needed to understand what this method does and how it differs from setTemporaryPatterns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted a nats-friendly API where you specify the temp subjects the same way you'd do them in NATS. But This can be removed of course

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be best to document stuff like that. You should not expect people reading the code really know what kind of wildcards nats uses and to me it feels very unlikely that anybody looking at this method can figure out that it accepts that kind of wildcards.

this.temporaryPatterns = new ArrayList<>();
for (String subject : temporarySubjects) {
this.temporaryPatterns.add(NatsSubjectPattern.compile(subject));
}
return this;
}

/** Returns a new {@link NatsTelemetry} with the settings of this {@link NatsTelemetryBuilder}. */
public NatsTelemetry build() {
return new NatsTelemetry(
NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry, capturedHeaders),
NatsInstrumenterFactory.createConsumerProcessInstrumenter(openTelemetry, capturedHeaders));
NatsInstrumenterFactory.createProducerInstrumenter(
openTelemetry, capturedHeaders, temporaryPatterns),
NatsInstrumenterFactory.createConsumerProcessInstrumenter(
openTelemetry, capturedHeaders, temporaryPatterns));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import java.util.List;
import java.util.regex.Pattern;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
Expand All @@ -21,31 +22,35 @@ public final class NatsInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.nats-2.17";

public static Instrumenter<NatsRequest, NatsRequest> createProducerInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders) {
OpenTelemetry openTelemetry, List<String> capturedHeaders, List<Pattern> temporaryPatterns) {
return Instrumenter.<NatsRequest, NatsRequest>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH))
new NatsRequestMessagingAttributesGetter(temporaryPatterns),
MessageOperation.PUBLISH))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH)
new NatsRequestMessagingAttributesGetter(temporaryPatterns),
MessageOperation.PUBLISH)
.setCapturedHeaders(capturedHeaders)
.build())
.buildProducerInstrumenter(NatsRequestTextMapSetter.INSTANCE);
}

public static Instrumenter<NatsRequest, Void> createConsumerProcessInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders) {
OpenTelemetry openTelemetry, List<String> capturedHeaders, List<Pattern> temporaryPatterns) {
InstrumenterBuilder<NatsRequest, Void> builder =
Instrumenter.<NatsRequest, Void>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS))
new NatsRequestMessagingAttributesGetter(temporaryPatterns),
MessageOperation.PROCESS))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(
NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS)
new NatsRequestMessagingAttributesGetter(temporaryPatterns),
MessageOperation.PROCESS)
.setCapturedHeaders(capturedHeaders)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

enum NatsRequestMessagingAttributesGetter
class NatsRequestMessagingAttributesGetter
implements MessagingAttributesGetter<NatsRequest, Object> {
INSTANCE;

private final List<Pattern> temporaryPatterns;

public NatsRequestMessagingAttributesGetter(List<Pattern> temporaryPatterns) {
this.temporaryPatterns = temporaryPatterns;
}

@Override
public String getSystem(NatsRequest request) {
Expand All @@ -29,15 +35,13 @@ public String getDestination(NatsRequest request) {
@Nullable
@Override
public String getDestinationTemplate(NatsRequest request) {
if (isTemporaryDestination(request)) {
return request.getInboxPrefix();
}
return null;
Pattern pattern = getTemporaryPattern(request);
return pattern == null ? null : pattern.pattern();
}

@Override
public boolean isTemporaryDestination(NatsRequest request) {
return request.getSubject().startsWith(request.getInboxPrefix());
return getTemporaryPattern(request) != null;
}

@Override
Expand Down Expand Up @@ -88,4 +92,21 @@ public List<String> getMessageHeader(NatsRequest request, String name) {
List<String> result = headers.get(name);
return result == null ? Collections.emptyList() : result;
}

/**
* @return the temporary pattern used for this request, or null
*/
private Pattern getTemporaryPattern(NatsRequest request) {
if (request.getSubject().startsWith(request.getInboxPrefix())) {
return NatsSubjectPattern.compile(request.getInboxPrefix() + "*");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels inefficient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfomance wise?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I suspect you could avoid compiling the regular expression

}

for (Pattern pattern : temporaryPatterns) {
if (pattern.matcher(request.getSubject()).matches()) {
return pattern;
}
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.nats.v2_17.internal;

import java.util.regex.Pattern;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time. Exposed for {@link io.nats.client.impl.OpenTelemetryDispatcherFactory}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand Exposed for {@link io.nats.client.impl.OpenTelemetryDispatcherFactory}.. This class isn't used by OpenTelemetryDispatcherFactory as far as I can tell.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad copy/paste 😬

*/
public class NatsSubjectPattern {

private NatsSubjectPattern() {}

public static Pattern compile(String subject) {
return Pattern.compile(
"^" + subject.replace(".", "\\.").replace(">", "*").replace("*", ".*") + "$");
Copy link
Contributor

@laurit laurit Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

none of the tests fail when replace(">", "*") is removed

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.assertTraceparentHeader;
import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.messagingAttributes;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_TEMPORARY;

import io.nats.client.Subscription;
import io.nats.client.impl.Headers;
Expand Down Expand Up @@ -101,6 +103,17 @@ void testPublishMessageWithHeaders() throws InterruptedException {
assertTraceparentHeader(subscription);
}

@Test
void testPublishMessageTemporarySubject() throws InterruptedException {
NatsMessage message = NatsMessage.builder().subject("_R_.qQzIWn.0JuCnu").data("x").build();

// when
testing().runWithSpan("parent", () -> connection.publish(message));

// then
assertTemporaryPublishSpan();
}

private void assertPublishSpan() {
testing()
.waitAndAssertTraces(
Expand All @@ -114,4 +127,22 @@ private void assertPublishSpan() {
.hasAttributesSatisfyingExactly(
messagingAttributes("publish", "sub", clientId))));
}

private void assertTemporaryPublishSpan() {
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent(),
span ->
span.hasName("(temporary) publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
messagingAttributes(
"publish",
"(temporary)",
clientId,
equalTo(MESSAGING_DESTINATION_TEMPORARY, true)))));
}
}
Loading