diff --git a/.fossa.yml b/.fossa.yml index 37fb0c02624a..5b1f524cc4f9 100644 --- a/.fossa.yml +++ b/.fossa.yml @@ -709,6 +709,12 @@ targets: - type: gradle path: ./ target: ':instrumentation:mongo:mongo-async-3.3:javaagent' + - type: gradle + path: ./ + target: ':instrumentation:nats:nats-2.17:javaagent' + - type: gradle + path: ./ + target: ':instrumentation:nats:nats-2.17:library' - type: gradle path: ./ target: ':instrumentation:netty:netty-3.8:javaagent' diff --git a/.github/scripts/check-package-names.sh b/.github/scripts/check-package-names.sh index ce53c483fb44..6db819de2342 100755 --- a/.github/scripts/check-package-names.sh +++ b/.github/scripts/check-package-names.sh @@ -29,6 +29,9 @@ for dir in $(find instrumentation -name "*.java" | grep library/src/main/java | if [[ "$dir" == "instrumentation/lettuce/lettuce-5.1/library/src/main/java/io/lettuce/core/protocol" ]]; then continue fi + if [[ "$dir" == "instrumentation/nats/nats-2.17/library/src/main/java/io/nats/client/impl" ]]; then + continue + fi # some common modules don't have any base version # - lettuce-common diff --git a/.gitignore b/.gitignore index 308ea9e1651f..9b3f30e67a10 100644 --- a/.gitignore +++ b/.gitignore @@ -45,6 +45,7 @@ out/ ###################### .vscode **/bin/ +.metals # Others # ########## diff --git a/docs/instrumentation-list.yaml b/docs/instrumentation-list.yaml index e98fa479d489..a0610389d60f 100644 --- a/docs/instrumentation-list.yaml +++ b/docs/instrumentation-list.yaml @@ -5274,6 +5274,59 @@ libraries: target_versions: javaagent: - org.mybatis:mybatis:[3.2.0,) + nats: + - name: nats-2.17 + description: This instrumentation provides messaging spans for NATS + disabled_by_default: false + source_path: instrumentation/nats/nats-2.17 + scope: + name: io.opentelemetry.nats-2.17 + target_versions: + javaagent: + - io.nats:jnats:[2.17.7,) + library: + - io.nats:jnats:2.17.7 + configurations: + - name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled + description: | + Enables experimental receive telemetry, which will cause consumers to start a new trace, with only a span link connecting it to the producer trace. + type: boolean + default: false + - name: otel.instrumentation.messaging.experimental.capture-headers + description: Allows configuring headers to capture as span attributes. + type: list + default: '' + telemetry: + - when: default + spans: + - span_kind: CONSUMER + attributes: + - name: messaging.client_id + type: STRING + - name: messaging.destination.name + type: STRING + - name: messaging.header.captured_header + type: STRING_ARRAY + - name: messaging.message.body.size + type: LONG + - name: messaging.operation + type: STRING + - name: messaging.system + type: STRING + - span_kind: PRODUCER + attributes: + - name: messaging.client_id + type: STRING + - name: messaging.destination.name + type: STRING + - name: messaging.header.captured_header + type: STRING_ARRAY + - name: messaging.message.body.size + type: LONG + - name: messaging.operation + type: STRING + - name: messaging.system + type: STRING netty: - name: netty-3.8 source_path: instrumentation/netty/netty-3.8 diff --git a/docs/supported-libraries.md b/docs/supported-libraries.md index b9b686070ffd..cf191ead0d82 100644 --- a/docs/supported-libraries.md +++ b/docs/supported-libraries.md @@ -107,6 +107,7 @@ These are the supported libraries and frameworks: | [Micrometer](https://micrometer.io/) | 1.5+ (disabled by default) | [opentelemetry-micrometer-1.5](../instrumentation/micrometer/micrometer-1.5/library) | none | | [MongoDB Driver](https://mongodb.github.io/mongo-java-driver/) | 3.1+ | [opentelemetry-mongo-3.1](../instrumentation/mongo/mongo-3.1/library) | [Database Client Spans], [Database Client Metrics] [6] | | [MyBatis](https://mybatis.org/mybatis-3/) | 3.2+ | N/A | none | +| [NATS Client](https://github.com/nats-io/nats.java) | 2.17.2+ | [nats-2.17](../instrumentation/nats/nats-2.17/library) | [Messaging Spans] | | [Netty HTTP codec [5]](https://github.com/netty/netty) | 3.8+ | [opentelemetry-netty-4.1](../instrumentation/netty/netty-4.1/library) | [HTTP Client Spans], [HTTP Client Metrics], [HTTP Server Spans], [HTTP Server Metrics] | | [OpenAI Java SDK](https://github.com/openai/openai-java) | 1.1+ | [openai-java-1.1](../instrumentation/openai/openai-java-1.1/library) | [GenAI Client Spans], [GenAI Client Metrics] | | [OpenSearch Rest Client](https://github.com/opensearch-project/opensearch-java) | 1.0+ | | [Database Client Spans], [Database Client Metrics] [6] | diff --git a/instrumentation-docs/instrumentations.sh b/instrumentation-docs/instrumentations.sh index 01c160f97c16..5345d4164e00 100755 --- a/instrumentation-docs/instrumentations.sh +++ b/instrumentation-docs/instrumentations.sh @@ -121,6 +121,8 @@ readonly INSTRUMENTATIONS=( "jetty-httpclient:jetty-httpclient-12.0:javaagent:test" "jetty-httpclient:jetty-httpclient-9.2:javaagent:test" "jodd-http-4.2:javaagent:test" + "nats:nats-2.17:javaagent:test" + "nats:nats-2.17:javaagent:testExperimental" "netty:netty-3.8:javaagent:test" "netty:netty-4.0:javaagent:test" "netty:netty-4.1:javaagent:test" diff --git a/instrumentation/nats/nats-2.17/javaagent/build.gradle.kts b/instrumentation/nats/nats-2.17/javaagent/build.gradle.kts new file mode 100644 index 000000000000..ea0dcf8ad4c5 --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/build.gradle.kts @@ -0,0 +1,49 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("io.nats") + module.set("jnats") + versions.set("[2.17.2,)") + + // Could not find io.nats:nats-parent:1.0-SNAPSHOT + skip("0.5.0", "0.5.1") + + assertInverse.set(true) + } +} + +dependencies { + library("io.nats:jnats:2.17.2") + + implementation(project(":instrumentation:nats:nats-2.17:library")) + testImplementation(project(":instrumentation:nats:nats-2.17:testing")) +} + +tasks { + withType().configureEach { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false") + } + + val testExperimental by registering(Test::class) { + testClassesDirs = sourceSets.test.get().output.classesDirs + classpath = sourceSets.test.get().runtimeClasspath + filter { + includeTestsMatching("NatsExperimentalTest") + } + jvmArgs("-Dotel.instrumentation.messaging.experimental.capture-headers=captured-header") + } + + test { + filter { + excludeTestsMatching("NatsExperimentalTest") + } + } + + check { + dependsOn(testExperimental) + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/CompletableFutureWrapper.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/CompletableFutureWrapper.java new file mode 100644 index 000000000000..6fb6b176e301 --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/CompletableFutureWrapper.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.concurrent.CompletableFuture; + +public final class CompletableFutureWrapper { + + private CompletableFutureWrapper() {} + + public static CompletableFuture wrap(CompletableFuture future, Context context) { + CompletableFuture result = new CompletableFuture<>(); + future.whenComplete( + (T value, Throwable throwable) -> { + try (Scope ignored = context.makeCurrent()) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(value); + } + } + }); + + return result; + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionPublishInstrumentation.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionPublishInstrumentation.java new file mode 100644 index 000000000000..3dabbac45c2d --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionPublishInstrumentation.java @@ -0,0 +1,174 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static io.opentelemetry.javaagent.instrumentation.nats.v2_17.NatsSingletons.PRODUCER_INSTRUMENTER; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.nats.client.Connection; +import io.nats.client.Message; +import io.nats.client.impl.Headers; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsMessageWritableHeaders; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class ConnectionPublishInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("io.nats.client.Connection")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isPublic() + .and(named("publish")) + .and(takesArguments(2)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, byte[].class)), + ConnectionPublishInstrumentation.class.getName() + "$PublishBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("publish")) + .and(takesArguments(3)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, named("io.nats.client.impl.Headers"))) + .and(takesArgument(2, byte[].class)), + ConnectionPublishInstrumentation.class.getName() + "$PublishHeadersBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("publish")) + .and(takesArguments(3)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, String.class)) + .and(takesArgument(2, byte[].class)), + ConnectionPublishInstrumentation.class.getName() + "$PublishReplyToBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("publish")) + .and(takesArguments(4)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, String.class)) + .and(takesArgument(2, named("io.nats.client.impl.Headers"))) + .and(takesArgument(3, byte[].class)), + ConnectionPublishInstrumentation.class.getName() + "$PublishReplyToHeadersBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("publish")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.nats.client.Message"))), + ConnectionPublishInstrumentation.class.getName() + "$PublishMessageAdvice"); + } + + @SuppressWarnings("unused") + public static class PublishBodyAdvice { + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static boolean onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(1) byte[] body) { + // call the instrumented publish method + connection.publish(subject, null, null, body); + return true; + } + } + + @SuppressWarnings("unused") + public static class PublishHeadersBodyAdvice { + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static boolean onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(1) Headers headers, + @Advice.Argument(2) byte[] body) { + // call the instrumented publish method + connection.publish(subject, null, headers, body); + return true; + } + } + + @SuppressWarnings("unused") + public static class PublishReplyToBodyAdvice { + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static boolean onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(1) String replyTo, + @Advice.Argument(2) byte[] body) { + // call the instrumented publish method + connection.publish(subject, replyTo, null, body); + return true; + } + } + + @SuppressWarnings("unused") + public static class PublishReplyToHeadersBodyAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(1) String replyTo, + @Advice.Argument(value = 2, readOnly = false) Headers headers, + @Advice.Argument(3) byte[] body, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + headers = NatsMessageWritableHeaders.create(headers); + + Context parentContext = Context.current(); + natsRequest = NatsRequest.create(connection, subject, replyTo, headers, body); + + if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) { + return; + } + + otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest); + otelScope = otelContext.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Thrown Throwable throwable, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + if (otelScope == null) { + return; + } + + otelScope.close(); + PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable); + } + } + + @SuppressWarnings("unused") + public static class PublishMessageAdvice { + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static boolean onEnter( + @Advice.This Connection connection, @Advice.Argument(0) Message message) { + if (message == null) { + return false; + } + + // call the instrumented publish method + connection.publish( + message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData()); + return true; + } + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionRequestInstrumentation.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionRequestInstrumentation.java new file mode 100644 index 000000000000..bf4eaf2e2355 --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/ConnectionRequestInstrumentation.java @@ -0,0 +1,405 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static io.opentelemetry.javaagent.instrumentation.nats.v2_17.NatsSingletons.PRODUCER_INSTRUMENTER; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.nats.client.Connection; +import io.nats.client.Message; +import io.nats.client.impl.Headers; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsMessageWritableHeaders; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class ConnectionRequestInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("io.nats.client.Connection")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isPublic() + .and(named("request")) + .and(takesArguments(3)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, byte[].class)) + .and(takesArgument(2, Duration.class)) + .and(returns(named("io.nats.client.Message"))), + ConnectionRequestInstrumentation.class.getName() + "$RequestBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("request")) + .and(takesArguments(4)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, named("io.nats.client.impl.Headers"))) + .and(takesArgument(2, byte[].class)) + .and(takesArgument(3, Duration.class)) + .and(returns(named("io.nats.client.Message"))), + ConnectionRequestInstrumentation.class.getName() + "$RequestHeadersBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("request")) + .and(takesArguments(2)) + .and(takesArgument(0, named("io.nats.client.Message"))) + .and(takesArgument(1, Duration.class)) + .and(returns(named("io.nats.client.Message"))), + ConnectionRequestInstrumentation.class.getName() + "$RequestMessageAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("request")) + .and(takesArguments(2)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, byte[].class)) + .and(returns(named("java.util.concurrent.CompletableFuture"))), + ConnectionRequestInstrumentation.class.getName() + "$RequestFutureBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("request")) + .and(takesArguments(3)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, named("io.nats.client.impl.Headers"))) + .and(takesArgument(2, byte[].class)) + .and(returns(named("java.util.concurrent.CompletableFuture"))), + ConnectionRequestInstrumentation.class.getName() + "$RequestFutureHeadersBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("request")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.nats.client.Message"))) + .and(returns(named("java.util.concurrent.CompletableFuture"))), + ConnectionRequestInstrumentation.class.getName() + "$RequestFutureMessageAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("requestWithTimeout")) + .and(takesArguments(3)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, byte[].class)) + .and(takesArgument(2, Duration.class)) + .and(returns(named("java.util.concurrent.CompletableFuture"))), + ConnectionRequestInstrumentation.class.getName() + "$RequestTimeoutFutureBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("requestWithTimeout")) + .and(takesArguments(4)) + .and(takesArgument(0, String.class)) + .and(takesArgument(1, named("io.nats.client.impl.Headers"))) + .and(takesArgument(2, byte[].class)) + .and(takesArgument(3, Duration.class)) + .and(returns(named("java.util.concurrent.CompletableFuture"))), + ConnectionRequestInstrumentation.class.getName() + + "$RequestTimeoutFutureHeadersBodyAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("requestWithTimeout")) + .and(takesArguments(2)) + .and(takesArgument(0, named("io.nats.client.Message"))) + .and(takesArgument(1, Duration.class)) + .and(returns(named("java.util.concurrent.CompletableFuture"))), + ConnectionRequestInstrumentation.class.getName() + "$RequestTimeoutFutureMessageAdvice"); + } + + @SuppressWarnings("unused") + public static class RequestBodyAdvice { + + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static Message onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(1) byte[] body, + @Advice.Argument(2) Duration timeout) + throws InterruptedException { + // call the instrumented request method + return connection.request(subject, null, body, timeout); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) Message result, @Advice.Enter Message message) { + result = message; + } + } + + @SuppressWarnings("unused") + public static class RequestHeadersBodyAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(value = 1, readOnly = false) Headers headers, + @Advice.Argument(2) byte[] body, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + headers = NatsMessageWritableHeaders.create(headers); + natsRequest = NatsRequest.create(connection, subject, null, headers, body); + Context parentContext = Context.current(); + + if (!PRODUCER_INSTRUMENTER.shouldStart(parentContext, natsRequest)) { + return; + } + + otelContext = PRODUCER_INSTRUMENTER.start(parentContext, natsRequest); + otelScope = otelContext.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.This Connection connection, + @Advice.Thrown Throwable throwable, + @Advice.Return Message message, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + if (otelScope == null) { + return; + } + + NatsRequest natsResponse = null; + if (message != null) { + natsResponse = NatsRequest.create(connection, message); + } + + otelScope.close(); + PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, natsResponse, throwable); + } + } + + @SuppressWarnings("unused") + public static class RequestMessageAdvice { + + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static Message onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) Message request, + @Advice.Argument(1) Duration timeout) + throws InterruptedException { + if (request == null) { + return null; + } + + // call the instrumented request method + return connection.request( + request.getSubject(), request.getHeaders(), request.getData(), timeout); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) Message result, @Advice.Enter Message response) { + result = response; + } + } + + @SuppressWarnings("unused") + public static class RequestFutureBodyAdvice { + + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static CompletableFuture onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(1) byte[] body) { + // call the instrumented request method + return connection.request(subject, null, body); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) CompletableFuture result, + @Advice.Enter CompletableFuture future) { + result = future; + } + } + + @SuppressWarnings("unused") + public static class RequestFutureHeadersBodyAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(value = 1, readOnly = false) Headers headers, + @Advice.Argument(2) byte[] body, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelParentContext") Context otelParentContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + headers = NatsMessageWritableHeaders.create(headers); + natsRequest = NatsRequest.create(connection, subject, null, headers, body); + otelParentContext = Context.current(); + + if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) { + return; + } + + otelContext = PRODUCER_INSTRUMENTER.start(otelParentContext, natsRequest); + otelScope = otelContext.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.This Connection connection, + @Advice.Thrown Throwable throwable, + @Advice.Return(readOnly = false) CompletableFuture messageFuture, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelParentContext") Context otelParentContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + if (otelScope == null) { + return; + } + + otelScope.close(); + if (throwable != null) { + PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable); + } else { + messageFuture = + messageFuture.whenComplete( + new SpanFinisher(PRODUCER_INSTRUMENTER, otelContext, connection, natsRequest)); + messageFuture = CompletableFutureWrapper.wrap(messageFuture, otelParentContext); + } + } + } + + @SuppressWarnings("unused") + public static class RequestFutureMessageAdvice { + + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static CompletableFuture onEnter( + @Advice.This Connection connection, @Advice.Argument(0) Message message) { + // execute original method body to handle null message + if (message == null) { + return null; + } + + // call the instrumented request method + return connection.request(message.getSubject(), message.getHeaders(), message.getData()); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) CompletableFuture result, + @Advice.Enter CompletableFuture future) { + if (future != null) { + result = future; + } + } + } + + @SuppressWarnings("unused") + public static class RequestTimeoutFutureBodyAdvice { + + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static CompletableFuture onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(1) byte[] body, + @Advice.Argument(2) Duration timeout) { + // call the instrumented requestWithTimeout method + return connection.requestWithTimeout(subject, null, body, timeout); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) CompletableFuture result, + @Advice.Enter CompletableFuture future) { + result = future; + } + } + + @SuppressWarnings("unused") + public static class RequestTimeoutFutureHeadersBodyAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This Connection connection, + @Advice.Argument(0) String subject, + @Advice.Argument(value = 1, readOnly = false) Headers headers, + @Advice.Argument(2) byte[] body, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelParentContext") Context otelParentContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + headers = NatsMessageWritableHeaders.create(headers); + natsRequest = NatsRequest.create(connection, subject, null, headers, body); + otelParentContext = Context.current(); + + if (!PRODUCER_INSTRUMENTER.shouldStart(otelParentContext, natsRequest)) { + return; + } + + otelContext = PRODUCER_INSTRUMENTER.start(otelParentContext, natsRequest); + otelScope = otelContext.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.This Connection connection, + @Advice.Thrown Throwable throwable, + @Advice.Return(readOnly = false) CompletableFuture messageFuture, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelParentContext") Context otelParentContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + if (otelScope == null) { + return; + } + + otelScope.close(); + if (throwable != null) { + PRODUCER_INSTRUMENTER.end(otelContext, natsRequest, null, throwable); + } else { + messageFuture = + messageFuture.whenComplete( + new SpanFinisher(PRODUCER_INSTRUMENTER, otelContext, connection, natsRequest)); + messageFuture = CompletableFutureWrapper.wrap(messageFuture, otelParentContext); + } + } + } + + @SuppressWarnings("unused") + public static class RequestTimeoutFutureMessageAdvice { + + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static CompletableFuture onEnter( + @Advice.This Connection connection, + @Advice.Argument(value = 0, readOnly = false) Message message, + @Advice.Argument(1) Duration timeout) { + if (message == null) { + return null; + } + + // call the instrumented requestWithTimeout method + return connection.requestWithTimeout( + message.getSubject(), message.getHeaders(), message.getData(), timeout); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) CompletableFuture result, + @Advice.Enter CompletableFuture future) { + if (future != null) { + result = future; + } + } + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/MessageHandlerInstrumentation.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/MessageHandlerInstrumentation.java new file mode 100644 index 000000000000..d25df179a09f --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/MessageHandlerInstrumentation.java @@ -0,0 +1,76 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static io.opentelemetry.javaagent.instrumentation.nats.v2_17.NatsSingletons.CONSUMER_PROCESS_INSTRUMENTER; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.nats.client.Message; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class MessageHandlerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("io.nats.client.MessageHandler")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isPublic() + .and(named("onMessage")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.nats.client.Message"))), + MessageHandlerInstrumentation.class.getName() + "$OnMessageAdvice"); + } + + @SuppressWarnings("unused") + public static class OnMessageAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(0) Message message, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + Context parentContext = Context.current(); + natsRequest = NatsRequest.create(message.getConnection(), message); + + if (!CONSUMER_PROCESS_INSTRUMENTER.shouldStart(parentContext, natsRequest)) { + return; + } + + otelContext = CONSUMER_PROCESS_INSTRUMENTER.start(parentContext, natsRequest); + otelScope = otelContext.makeCurrent(); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Thrown Throwable throwable, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("natsRequest") NatsRequest natsRequest) { + if (otelScope == null) { + return; + } + + otelScope.close(); + CONSUMER_PROCESS_INSTRUMENTER.end(otelContext, natsRequest, null, throwable); + } + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsIgnoredTypesConfigurer.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsIgnoredTypesConfigurer.java new file mode 100644 index 000000000000..9789bca6af21 --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsIgnoredTypesConfigurer.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesBuilder; +import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesConfigurer; +import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; + +@AutoService(IgnoredTypesConfigurer.class) +public class NatsIgnoredTypesConfigurer implements IgnoredTypesConfigurer { + + @Override + public void configure(IgnoredTypesBuilder builder, ConfigProperties config) { + builder.ignoreTaskClass("io.nats.client.impl.NatsDispatcher"); + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java new file mode 100644 index 000000000000..3701dacf3b25 --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsInstrumentationModule.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import static java.util.Arrays.asList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class NatsInstrumentationModule extends InstrumentationModule { + + public NatsInstrumentationModule() { + super("nats", "nats-2.17"); + } + + @Override + public List typeInstrumentations() { + return asList( + new ConnectionPublishInstrumentation(), + new ConnectionRequestInstrumentation(), + new MessageHandlerInstrumentation()); + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java new file mode 100644 index 000000000000..13e95063e94c --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsSingletons.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createConsumerProcessInstrumenter; +import static io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory.createProducerInstrumenter; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; +import java.util.List; + +public final class NatsSingletons { + + private static final List capturedHeaders = + ExperimentalConfig.get().getMessagingHeaders(); + + public static final Instrumenter PRODUCER_INSTRUMENTER = + createProducerInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders); + + public static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER = + createConsumerProcessInstrumenter(GlobalOpenTelemetry.get(), capturedHeaders); + + private NatsSingletons() {} +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/SpanFinisher.java b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/SpanFinisher.java new file mode 100644 index 000000000000..a02b9a618f79 --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/SpanFinisher.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import io.nats.client.Connection; +import io.nats.client.Message; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest; +import java.util.function.BiConsumer; + +public final class SpanFinisher implements BiConsumer { + private final Instrumenter instrumenter; + private final Context context; + private final Connection connection; + private final NatsRequest request; + + public SpanFinisher( + Instrumenter instrumenter, + Context context, + Connection connection, + NatsRequest request) { + this.instrumenter = instrumenter; + this.context = context; + this.connection = connection; + this.request = request; + } + + @Override + public void accept(Message message, Throwable throwable) { + if (message != null) { + NatsRequest response = NatsRequest.create(connection, message); + instrumenter.end(context, request, response, throwable); + } else { + instrumenter.end(context, request, null, throwable); + } + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsDispatcherTest.java b/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsDispatcherTest.java new file mode 100644 index 000000000000..1260668a9534 --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsDispatcherTest.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import io.opentelemetry.instrumentation.nats.v2_17.AbstractNatsDispatcherTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class NatsDispatcherTest extends AbstractNatsDispatcherTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsExperimentalTest.java b/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsExperimentalTest.java new file mode 100644 index 000000000000..55c174bb8323 --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsExperimentalTest.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import io.opentelemetry.instrumentation.nats.v2_17.AbstractNatsExperimentalTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class NatsExperimentalTest extends AbstractNatsExperimentalTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsPublishTest.java b/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsPublishTest.java new file mode 100644 index 000000000000..940d64d3117f --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsPublishTest.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import io.opentelemetry.instrumentation.nats.v2_17.AbstractNatsPublishTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class NatsPublishTest extends AbstractNatsPublishTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsRequestTest.java b/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsRequestTest.java new file mode 100644 index 000000000000..01eb0301e12a --- /dev/null +++ b/instrumentation/nats/nats-2.17/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/nats/v2_17/NatsRequestTest.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.nats.v2_17; + +import io.opentelemetry.instrumentation.nats.v2_17.AbstractNatsRequestTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class NatsRequestTest extends AbstractNatsRequestTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/nats/nats-2.17/library/README.md b/instrumentation/nats/nats-2.17/library/README.md new file mode 100644 index 000000000000..0dfa8e0f6258 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/README.md @@ -0,0 +1,63 @@ +# Library Instrumentation for NATS version 2.17 + +Provides OpenTelemetry instrumentation for [NATS Client](https://github.com/nats-io/nats.java). + +## Quickstart + +### Add these dependencies to your project + +Replace `OPENTELEMETRY_VERSION` with the [latest +release](https://central.sonatype.com/artifact/io.opentelemetry.instrumentation/opentelemetry-nats-2.17). + +For Maven, add to your `pom.xml` dependencies: + +```xml + + + io.opentelemetry.instrumentation + opentelemetry-nats-2.17 + OPENTELEMETRY_VERSION + + +``` + +For Gradle, add to your dependencies: + +```groovy +implementation("io.opentelemetry.instrumentation:opentelemetry-nats-2.17:OPENTELEMETRY_VERSION") +``` + +### Usage + +The instrumentation library provides the class `NatsTelemetry` that has a builder +method and allows the creation of an instance of the `Connection` to provide +OpenTelemetry-based instrumentation: + +```java +import io.nats.client.Connection; +import io.nats.client.Nats; +import io.nats.client.Options; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.nats.v2_17.NatsTelemetry; +import java.io.IOException; + +public class OpenTelemetryNatsConnection { + + private OpenTelemetry openTelemetry; + private NatsTelemetry telemetry; + + public OpenTelemetryNatsConnection(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + this.telemetry = NatsTelemetry.create(openTelemetry); + } + + public Connection newConnection() throws IOException, InterruptedException { + return newConnection(Options.builder()); + } + + public Connection newConnection(Options.Builder options) throws IOException, InterruptedException { + return telemetry.newConnection(options.build(), Nats::connect); + } + +} +``` diff --git a/instrumentation/nats/nats-2.17/library/build.gradle.kts b/instrumentation/nats/nats-2.17/library/build.gradle.kts new file mode 100644 index 000000000000..dbee7d91c511 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/build.gradle.kts @@ -0,0 +1,18 @@ +plugins { + id("otel.library-instrumentation") +} + +dependencies { + library("io.nats:jnats:2.17.2") + + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + + testImplementation(project(":instrumentation:nats:nats-2.17:testing")) +} + +tasks { + withType().configureEach { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/nats/client/impl/OpenTelemetryDispatcherFactory.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/nats/client/impl/OpenTelemetryDispatcherFactory.java new file mode 100644 index 000000000000..6202f9aea801 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/nats/client/impl/OpenTelemetryDispatcherFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.nats.client.impl; + +import io.nats.client.MessageHandler; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest; +import io.opentelemetry.instrumentation.nats.v2_17.internal.OpenTelemetryMessageHandler; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class OpenTelemetryDispatcherFactory extends DispatcherFactory { + + private final DispatcherFactory delegate; + private final Instrumenter consumerProcessInstrumenter; + + public OpenTelemetryDispatcherFactory( + DispatcherFactory delegate, Instrumenter consumerProcessInstrumenter) { + this.delegate = delegate; + this.consumerProcessInstrumenter = consumerProcessInstrumenter; + } + + @Override + NatsDispatcher createDispatcher(NatsConnection natsConnection, MessageHandler messageHandler) { + return delegate.createDispatcher( + natsConnection, + new OpenTelemetryMessageHandler(messageHandler, consumerProcessInstrumenter)); + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetry.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetry.java new file mode 100644 index 000000000000..eedd1311bae3 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetry.java @@ -0,0 +1,86 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import io.nats.client.Connection; +import io.nats.client.Options; +import io.nats.client.impl.DispatcherFactory; +import io.nats.client.impl.OpenTelemetryDispatcherFactory; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest; +import java.io.IOException; + +/** Entrypoint for instrumenting NATS clients. */ +public final class NatsTelemetry { + + /** Returns a new {@link NatsTelemetry} configured with the given {@link OpenTelemetry}. */ + public static NatsTelemetry create(OpenTelemetry openTelemetry) { + return new NatsTelemetryBuilder(openTelemetry).build(); + } + + /** Returns a new {@link NatsTelemetryBuilder} configured with the given {@link OpenTelemetry}. */ + public static NatsTelemetryBuilder builder(OpenTelemetry openTelemetry) { + return new NatsTelemetryBuilder(openTelemetry); + } + + private final Instrumenter producerInstrumenter; + private final Instrumenter consumerProcessInstrumenter; + + NatsTelemetry( + Instrumenter producerInstrumenter, + Instrumenter consumerProcessInstrumenter) { + this.producerInstrumenter = producerInstrumenter; + this.consumerProcessInstrumenter = consumerProcessInstrumenter; + } + + /** + * Returns a {@link Connection} with telemetry instrumentation. + * + *

This method should be used together with {@link #configure(Options.Builder)}. Consider using + * {@link #newConnection(Options.Builder, ConnectionFactory)} or {@link #newConnection(Options, + * ConnectionFactory)} instead. + */ + public Connection wrap(Connection connection) { + return OpenTelemetryConnection.wrap( + connection, producerInstrumenter, consumerProcessInstrumenter); + } + + /** + * Returns a {@link Options.Builder} configured with telemetry instrumentation. + * + *

This method should be used together with {@link #wrap(Connection)}. Consider using {@link + * #newConnection(Options.Builder, ConnectionFactory)} or {@link #newConnection(Options, + * ConnectionFactory)} instead. + */ + public Options.Builder configure(Options.Builder options) { + DispatcherFactory factory = options.build().getDispatcherFactory(); + + if (factory == null) { + factory = new DispatcherFactory(); + } + + return options.dispatcherFactory( + new OpenTelemetryDispatcherFactory(factory, consumerProcessInstrumenter)); + } + + /** Returns a {@link Connection} with telemetry instrumentation. */ + public Connection newConnection(Options options, ConnectionFactory connectionFactory) + throws IOException, InterruptedException { + return wrap(connectionFactory.create(configure(new Options.Builder(options)).build())); + } + + /** Returns a {@link Connection} with telemetry instrumentation. */ + public Connection newConnection( + Options.Builder builder, ConnectionFactory connectionFactory) + throws IOException, InterruptedException { + return wrap(connectionFactory.create(configure(builder))); + } + + public interface ConnectionFactory { + Connection create(T options) throws IOException, InterruptedException; + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java new file mode 100644 index 000000000000..c125d371e64e --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTelemetryBuilder.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import static java.util.Collections.emptyList; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsInstrumenterFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** A builder of {@link NatsTelemetry}. */ +public final class NatsTelemetryBuilder { + + private final OpenTelemetry openTelemetry; + private List capturedHeaders = emptyList(); + + NatsTelemetryBuilder(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + } + + /** + * Configures the messaging headers that will be captured as span attributes. + * + * @param capturedHeaders A list of messaging header names. + */ + @CanIgnoreReturnValue + public NatsTelemetryBuilder setCapturedHeaders(Collection capturedHeaders) { + this.capturedHeaders = new ArrayList<>(capturedHeaders); + return this; + } + + /** Returns a new {@link NatsTelemetry} with the settings of this {@link NatsTelemetryBuilder}. */ + public NatsTelemetry build() { + return new NatsTelemetry( + NatsInstrumenterFactory.createProducerInstrumenter(openTelemetry, capturedHeaders), + NatsInstrumenterFactory.createConsumerProcessInstrumenter(openTelemetry, capturedHeaders)); + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/OpenTelemetryConnection.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/OpenTelemetryConnection.java new file mode 100644 index 000000000000..61fde251dc6d --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/OpenTelemetryConnection.java @@ -0,0 +1,345 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import io.nats.client.Connection; +import io.nats.client.Dispatcher; +import io.nats.client.Message; +import io.nats.client.MessageHandler; +import io.nats.client.impl.Headers; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsMessageWritableHeaders; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest; +import io.opentelemetry.instrumentation.nats.v2_17.internal.OpenTelemetryMessageHandler; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +final class OpenTelemetryConnection implements InvocationHandler { + + private final Connection delegate; + private final Instrumenter producerInstrumenter; + private final Instrumenter consumerProcessInstrumenter; + + public OpenTelemetryConnection( + Connection connection, + Instrumenter producerInstrumenter, + Instrumenter consumerProcessInstrumenter) { + this.delegate = connection; + this.producerInstrumenter = producerInstrumenter; + this.consumerProcessInstrumenter = consumerProcessInstrumenter; + } + + public static Connection wrap( + Connection connection, + Instrumenter producerInstrumenter, + Instrumenter consumerProcessInstrumenter) { + return (Connection) + Proxy.newProxyInstance( + OpenTelemetryConnection.class.getClassLoader(), + new Class[] {Connection.class}, + new OpenTelemetryConnection( + connection, producerInstrumenter, consumerProcessInstrumenter)); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if ("publish".equals(method.getName()) && method.getReturnType().equals(Void.TYPE)) { + publish(method, args); + return null; + } + + if ("request".equals(method.getName()) && method.getReturnType().equals(Message.class)) { + return request(method, args); + } + + if (("request".equals(method.getName()) || "requestWithTimeout".equals(method.getName())) + && method.getReturnType().equals(CompletableFuture.class)) { + return requestAsync(method, args); + } + + if ("createDispatcher".equals(method.getName()) + && method.getReturnType().equals(Dispatcher.class)) { + return createDispatcher(method, args); + } + + if ("closeDispatcher".equals(method.getName())) { + return closeDispatcher(method, args); + } + + return invokeMethod(method, delegate, args); + } + + private static Object invokeMethod(Method method, Object target, Object[] args) throws Throwable { + try { + return method.invoke(target, args); + } catch (InvocationTargetException exception) { + throw exception.getCause(); + } + } + + // void publish(String subject, byte[] body) + // void publish(String subject, Headers headers, byte[] body) + // void publish(String subject, String replyTo, byte[] body) + // void publish(String subject, String replyTo, Headers headers, byte[] body) + // void publish(Message message) + private void publish(Method method, Object[] args) throws Throwable { + String subject = null; + String replyTo = null; + Headers headers = null; + byte[] body = null; + + if (method.getParameterCount() == 2 + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == byte[].class) { + subject = (String) args[0]; + body = (byte[]) args[1]; + } else if (method.getParameterCount() == 3 + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == Headers.class + && method.getParameterTypes()[2] == byte[].class) { + subject = (String) args[0]; + headers = (Headers) args[1]; + body = (byte[]) args[2]; + } else if (method.getParameterCount() == 3 + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == String.class + && method.getParameterTypes()[2] == byte[].class) { + subject = (String) args[0]; + replyTo = (String) args[1]; + body = (byte[]) args[2]; + } else if (method.getParameterCount() == 4 + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == String.class + && method.getParameterTypes()[2] == Headers.class + && method.getParameterTypes()[3] == byte[].class) { + subject = (String) args[0]; + replyTo = (String) args[1]; + headers = (Headers) args[2]; + body = (byte[]) args[3]; + } else if (method.getParameterCount() == 1 + && method.getParameterTypes()[0] == Message.class + && args[0] != null) { + subject = ((Message) args[0]).getSubject(); + replyTo = ((Message) args[0]).getReplyTo(); + headers = ((Message) args[0]).getHeaders(); + body = ((Message) args[0]).getData(); + } + + Context parentContext = Context.current(); + NatsRequest natsRequest = null; + + if (subject != null) { + headers = NatsMessageWritableHeaders.create(headers); + natsRequest = NatsRequest.create(delegate, subject, replyTo, headers, body); + } + + if (natsRequest == null || !producerInstrumenter.shouldStart(parentContext, natsRequest)) { + invokeMethod(method, delegate, args); + return; + } + + Context context = producerInstrumenter.start(parentContext, natsRequest); + Throwable throwable = null; + try (Scope ignored = context.makeCurrent()) { + delegate.publish(subject, replyTo, headers, body); + } catch (Throwable t) { + throwable = t; + throw t; + } finally { + producerInstrumenter.end(context, natsRequest, null, throwable); + } + } + + // Message request(String subject, byte[] body, Duration timeout) throws InterruptedException; + // Message request(String subject, Headers headers, byte[] body, Duration timeout) throws + // InterruptedException; + // Message request(Message message, Duration timeout) throws InterruptedException; + @SuppressWarnings("InterruptedExceptionSwallowed") + private Message request(Method method, Object[] args) throws Throwable { + String subject = null; + Headers headers = null; + byte[] body = null; + Duration timeout = null; + + if (method.getParameterCount() == 3 + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == byte[].class) { + subject = (String) args[0]; + body = (byte[]) args[1]; + timeout = (Duration) args[2]; + } else if (method.getParameterCount() == 4 + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == Headers.class + && method.getParameterTypes()[2] == byte[].class) { + subject = (String) args[0]; + headers = (Headers) args[1]; + body = (byte[]) args[2]; + timeout = (Duration) args[3]; + } else if (method.getParameterCount() == 2 + && method.getParameterTypes()[0] == Message.class + && args[0] != null) { + subject = ((Message) args[0]).getSubject(); + headers = ((Message) args[0]).getHeaders(); + body = ((Message) args[0]).getData(); + timeout = (Duration) args[1]; + } + + Context parentContext = Context.current(); + NatsRequest natsRequest = null; + + if (subject != null) { + headers = NatsMessageWritableHeaders.create(headers); + natsRequest = NatsRequest.create(delegate, subject, null, headers, body); + } + + if (timeout == null + || natsRequest == null + || !producerInstrumenter.shouldStart(parentContext, natsRequest)) { + return (Message) invokeMethod(method, delegate, args); + } + + Context context = producerInstrumenter.start(parentContext, natsRequest); + NatsRequest response = null; + Throwable throwable = null; + + try (Scope ignored = context.makeCurrent()) { + Message result = delegate.request(subject, headers, body, timeout); + + if (result != null) { + response = NatsRequest.create(delegate, result); + } + + return result; + } catch (Throwable t) { + throwable = t; + throw t; + } finally { + producerInstrumenter.end(context, natsRequest, response, throwable); + } + } + + // CompletableFuture request(String subject, byte[] body); + // CompletableFuture requestWithTimeout(String subject, byte[] body, Duration timeout); + // CompletableFuture request(String subject, Headers headers, byte[] body); + // CompletableFuture requestWithTimeout(String subject, Headers headers, byte[] body, + // Duration timeout); + // CompletableFuture request(Message message); + // CompletableFuture requestWithTimeout(Message message, Duration timeout); + @SuppressWarnings("unchecked") + private CompletableFuture requestAsync(Method method, Object[] args) throws Throwable { + String subject = null; + Headers headers = null; + byte[] body = null; + Duration timeout = null; + + if ((method.getParameterCount() == 2) + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == byte[].class) { + subject = (String) args[0]; + body = (byte[]) args[1]; + } else if ((method.getParameterCount() == 3) + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == byte[].class) { + subject = (String) args[0]; + body = (byte[]) args[1]; + timeout = (Duration) args[2]; + } else if ((method.getParameterCount() == 3) + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == Headers.class + && method.getParameterTypes()[2] == byte[].class) { + subject = (String) args[0]; + headers = (Headers) args[1]; + body = (byte[]) args[2]; + } else if ((method.getParameterCount() == 4) + && method.getParameterTypes()[0] == String.class + && method.getParameterTypes()[1] == Headers.class + && method.getParameterTypes()[2] == byte[].class) { + subject = (String) args[0]; + headers = (Headers) args[1]; + body = (byte[]) args[2]; + timeout = (Duration) args[3]; + } else if ((method.getParameterCount() == 1) + && method.getParameterTypes()[0] == Message.class + && args[0] != null) { + subject = ((Message) args[0]).getSubject(); + headers = ((Message) args[0]).getHeaders(); + body = ((Message) args[0]).getData(); + } else if ((method.getParameterCount() == 2) + && method.getParameterTypes()[0] == Message.class + && args[0] != null) { + subject = ((Message) args[0]).getSubject(); + headers = ((Message) args[0]).getHeaders(); + body = ((Message) args[0]).getData(); + timeout = (Duration) args[1]; + } + + Context parentContext = Context.current(); + NatsRequest natsRequest = null; + + if (subject != null) { + headers = NatsMessageWritableHeaders.create(headers); + natsRequest = NatsRequest.create(delegate, subject, null, headers, body); + } + + if (natsRequest == null || !producerInstrumenter.shouldStart(parentContext, natsRequest)) { + return (CompletableFuture) invokeMethod(method, delegate, args); + } + + NatsRequest notNullNatsRequest = natsRequest; + Context context = producerInstrumenter.start(parentContext, notNullNatsRequest); + + CompletableFuture future; + try { + if (timeout != null) { + future = delegate.requestWithTimeout(subject, headers, body, timeout); + } else { + future = delegate.request(subject, headers, body); + } + } catch (Throwable t) { + producerInstrumenter.end(context, notNullNatsRequest, null, t); + throw t; + } + + return future.whenComplete( + (result, exception) -> { + if (result != null) { + NatsRequest response = NatsRequest.create(delegate, result); + producerInstrumenter.end(context, notNullNatsRequest, response, exception); + } else { + producerInstrumenter.end(context, notNullNatsRequest, null, exception); + } + }); + } + + // public Dispatcher createDispatcher() + // public Dispatcher createDispatcher(MessageHandler messageHandler) + private Dispatcher createDispatcher(Method method, Object[] args) throws Throwable { + if (method.getParameterCount() == 1 && method.getParameterTypes()[0] == MessageHandler.class) { + args[0] = + new OpenTelemetryMessageHandler((MessageHandler) args[0], consumerProcessInstrumenter); + } + + Dispatcher wrapped = (Dispatcher) invokeMethod(method, delegate, args); + return OpenTelemetryDispatcher.wrap(wrapped, consumerProcessInstrumenter); + } + + // public void closeDispatcher(Dispatcher dispatcher) + private Object closeDispatcher(Method method, Object[] args) throws Throwable { + if (method.getParameterCount() == 1 + && args[0] instanceof Proxy + && Proxy.getInvocationHandler(args[0]) instanceof OpenTelemetryDispatcher) { + args[0] = ((OpenTelemetryDispatcher) Proxy.getInvocationHandler(args[0])).getDelegate(); + } + + return invokeMethod(method, delegate, args); + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/OpenTelemetryDispatcher.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/OpenTelemetryDispatcher.java new file mode 100644 index 000000000000..0b3bd5c63f2d --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/OpenTelemetryDispatcher.java @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import io.nats.client.Dispatcher; +import io.nats.client.MessageHandler; +import io.nats.client.Subscription; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.nats.v2_17.internal.NatsRequest; +import io.opentelemetry.instrumentation.nats.v2_17.internal.OpenTelemetryMessageHandler; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +final class OpenTelemetryDispatcher implements InvocationHandler { + + private final Dispatcher delegate; + private final Instrumenter consumerProcessInstrumenter; + + public OpenTelemetryDispatcher( + Dispatcher delegate, Instrumenter consumerProcessInstrumenter) { + this.delegate = delegate; + this.consumerProcessInstrumenter = consumerProcessInstrumenter; + } + + public static Dispatcher wrap( + Dispatcher delegate, Instrumenter consumerProcessInstrumenter) { + return (Dispatcher) + Proxy.newProxyInstance( + OpenTelemetryDispatcher.class.getClassLoader(), + new Class[] {Dispatcher.class}, + new OpenTelemetryDispatcher(delegate, consumerProcessInstrumenter)); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if ("subscribe".equals(method.getName()) && method.getReturnType().equals(Subscription.class)) { + return subscribe(method, args); + } + + try { + return method.invoke(delegate, args); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + } + + private static Object invokeMethod(Method method, Object target, Object[] args) throws Throwable { + try { + return method.invoke(target, args); + } catch (InvocationTargetException exception) { + throw exception.getCause(); + } + } + + // Subscription subscribe(String subject, MessageHandler handler); + // Subscription subscribe(String subject, String queue, MessageHandler handler); + private Subscription subscribe(Method method, Object[] args) throws Throwable { + if (method.getParameterCount() == 2 && method.getParameterTypes()[1] == MessageHandler.class) { + args[1] = + new OpenTelemetryMessageHandler((MessageHandler) args[1], consumerProcessInstrumenter); + } else if (method.getParameterCount() == 3 + && method.getParameterTypes()[2] == MessageHandler.class) { + args[2] = + new OpenTelemetryMessageHandler((MessageHandler) args[2], consumerProcessInstrumenter); + } + + return (Subscription) invokeMethod(method, delegate, args); + } + + Dispatcher getDelegate() { + return delegate; + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java new file mode 100644 index 000000000000..303335627e01 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java @@ -0,0 +1,56 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17.internal; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import java.util.List; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class NatsInstrumenterFactory { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.nats-2.17"; + + public static Instrumenter createProducerInstrumenter( + OpenTelemetry openTelemetry, List capturedHeaders) { + return Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create( + NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder( + NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PUBLISH) + .setCapturedHeaders(capturedHeaders) + .build()) + .buildProducerInstrumenter(NatsRequestTextMapSetter.INSTANCE); + } + + public static Instrumenter createConsumerProcessInstrumenter( + OpenTelemetry openTelemetry, List capturedHeaders) { + InstrumenterBuilder builder = + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create( + NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder( + NatsRequestMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS) + .setCapturedHeaders(capturedHeaders) + .build()); + + return builder.buildConsumerInstrumenter(NatsRequestTextMapGetter.INSTANCE); + } + + private NatsInstrumenterFactory() {} +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsMessageWritableHeaders.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsMessageWritableHeaders.java new file mode 100644 index 000000000000..ff35e1f866c5 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsMessageWritableHeaders.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17.internal; + +import io.nats.client.impl.Headers; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class NatsMessageWritableHeaders { + + public static Headers create(Headers headers) { + if (headers == null || headers.isReadOnly()) { + return new Headers(headers); + } + + return headers; + } + + private NatsMessageWritableHeaders() {} +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequest.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequest.java new file mode 100644 index 000000000000..0648e2490ec4 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequest.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17.internal; + +import com.google.auto.value.AutoValue; +import io.nats.client.Connection; +import io.nats.client.Message; +import io.nats.client.impl.Headers; +import javax.annotation.Nullable; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +@AutoValue +public abstract class NatsRequest { + + public static NatsRequest create( + Connection connection, String subject, String replyTo, Headers headers, byte[] body) { + return new AutoValue_NatsRequest( + replyTo, + connection.getServerInfo().getClientId(), + subject, + headers, + getDataSize(body), + connection.getOptions().getInboxPrefix()); + } + + public static NatsRequest create(Connection connection, Message message) { + return new AutoValue_NatsRequest( + message.getReplyTo(), + connection.getServerInfo().getClientId(), + message.getSubject(), + message.getHeaders(), + getDataSize(message.getData()), + connection.getOptions().getInboxPrefix()); + } + + @Nullable + public abstract String getReplyTo(); + + public abstract int getClientId(); + + public abstract String getSubject(); + + @Nullable + public abstract Headers getHeaders(); + + public abstract long getDataSize(); + + private static long getDataSize(byte[] data) { + return data == null ? 0 : data.length; + } + + public abstract String getInboxPrefix(); +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java new file mode 100644 index 000000000000..c1daa68711f4 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestMessagingAttributesGetter.java @@ -0,0 +1,91 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17.internal; + +import io.nats.client.impl.Headers; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + +enum NatsRequestMessagingAttributesGetter + implements MessagingAttributesGetter { + INSTANCE; + + @Override + public String getSystem(NatsRequest request) { + return "nats"; + } + + @Nullable + @Override + public String getDestination(NatsRequest request) { + return request.getSubject(); + } + + @Nullable + @Override + public String getDestinationTemplate(NatsRequest request) { + if (isTemporaryDestination(request)) { + return request.getInboxPrefix(); + } + return null; + } + + @Override + public boolean isTemporaryDestination(NatsRequest request) { + return request.getSubject().startsWith(request.getInboxPrefix()); + } + + @Override + public boolean isAnonymousDestination(NatsRequest request) { + return false; + } + + @Nullable + @Override + public String getConversationId(NatsRequest request) { + return null; + } + + @Override + public Long getMessageBodySize(NatsRequest request) { + return request.getDataSize(); + } + + @Nullable + @Override + public Long getMessageEnvelopeSize(NatsRequest request) { + return null; + } + + @Nullable + @Override + public String getMessageId(NatsRequest request, @Nullable Object unused) { + return null; + } + + @Override + public String getClientId(NatsRequest request) { + return String.valueOf(request.getClientId()); + } + + @Nullable + @Override + public Long getBatchMessageCount(NatsRequest request, @Nullable Object unused) { + return null; + } + + @Override + public List getMessageHeader(NatsRequest request, String name) { + Headers headers = request.getHeaders(); + if (headers == null) { + return Collections.emptyList(); + } + List result = headers.get(name); + return result == null ? Collections.emptyList() : result; + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestTextMapGetter.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestTextMapGetter.java new file mode 100644 index 000000000000..85b78c9fc65a --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestTextMapGetter.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17.internal; + +import io.nats.client.impl.Headers; +import io.opentelemetry.context.propagation.TextMapGetter; +import java.util.Collections; +import javax.annotation.Nullable; + +enum NatsRequestTextMapGetter implements TextMapGetter { + INSTANCE; + + @Override + public Iterable keys(NatsRequest request) { + Headers headers = request.getHeaders(); + + if (headers == null) { + return Collections.emptyList(); + } + + return headers.keySet(); + } + + @Nullable + @Override + public String get(@Nullable NatsRequest request, String key) { + if (request == null || request.getHeaders() == null) { + return null; + } + + return request.getHeaders().getFirst(key); + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestTextMapSetter.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestTextMapSetter.java new file mode 100644 index 000000000000..62d44d832899 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsRequestTextMapSetter.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17.internal; + +import io.opentelemetry.context.propagation.TextMapSetter; +import javax.annotation.Nullable; + +enum NatsRequestTextMapSetter implements TextMapSetter { + INSTANCE; + + @Override + /* Can not work if getHeaders doesn't return a writable structure. */ + public void set(@Nullable NatsRequest request, String key, String value) { + if (request == null || request.getHeaders() == null || request.getHeaders().isReadOnly()) { + return; + } + + request.getHeaders().put(key, value); + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/OpenTelemetryMessageHandler.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/OpenTelemetryMessageHandler.java new file mode 100644 index 000000000000..74732af2ee79 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/OpenTelemetryMessageHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17.internal; + +import io.nats.client.Message; +import io.nats.client.MessageHandler; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. Exposed for {@link io.nats.client.impl.OpenTelemetryDispatcherFactory}. + */ +public final class OpenTelemetryMessageHandler implements MessageHandler { + + private final MessageHandler delegate; + private final Instrumenter consumerProcessInstrumenter; + + public OpenTelemetryMessageHandler( + MessageHandler delegate, Instrumenter consumerProcessInstrumenter) { + this.delegate = delegate; + this.consumerProcessInstrumenter = consumerProcessInstrumenter; + } + + @Override + public void onMessage(Message message) throws InterruptedException { + Context parentContext = Context.current(); + NatsRequest natsRequest = NatsRequest.create(message.getConnection(), message); + + if (!consumerProcessInstrumenter.shouldStart(parentContext, natsRequest)) { + delegate.onMessage(message); + return; + } + + Context processContext = consumerProcessInstrumenter.start(parentContext, natsRequest); + Throwable error = null; + + try (Scope ignored = processContext.makeCurrent()) { + delegate.onMessage(message); + } catch (Throwable t) { + error = t; + throw t; + } finally { + consumerProcessInstrumenter.end(processContext, natsRequest, null, error); + } + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsDispatcherTest.java b/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsDispatcherTest.java new file mode 100644 index 000000000000..f4b21df6d3b7 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsDispatcherTest.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; + +class NatsDispatcherTest extends AbstractNatsDispatcherTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @BeforeAll + static void beforeAll() { + connection = NatsTelemetry.create(testing.getOpenTelemetry()).wrap(connection); + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsExperimentalTest.java b/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsExperimentalTest.java new file mode 100644 index 000000000000..efca6c50bcd2 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsExperimentalTest.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import static java.util.Collections.singletonList; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; + +class NatsExperimentalTest extends AbstractNatsExperimentalTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @BeforeAll + static void beforeAll() { + connection = + NatsTelemetry.builder(testing.getOpenTelemetry()) + .setCapturedHeaders(singletonList("captured-header")) + .build() + .wrap(connection); + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsPublishTest.java b/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsPublishTest.java new file mode 100644 index 000000000000..13949ced3d30 --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsPublishTest.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; + +class NatsPublishTest extends AbstractNatsPublishTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @BeforeAll + static void beforeAll() { + connection = NatsTelemetry.create(testing.getOpenTelemetry()).wrap(connection); + } +} diff --git a/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsRequestTest.java b/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsRequestTest.java new file mode 100644 index 000000000000..aed9638c83ea --- /dev/null +++ b/instrumentation/nats/nats-2.17/library/src/test/java/io/opentelemetry/instrumentation/nats/v2_17/NatsRequestTest.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import io.nats.client.Nats; +import io.nats.client.Options; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.io.IOException; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; + +class NatsRequestTest extends AbstractNatsRequestTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @BeforeAll + static void beforeAll() throws IOException, InterruptedException { + NatsTelemetry telemetry = NatsTelemetry.create(testing.getOpenTelemetry()); + connection = + telemetry.newConnection( + Options.builder().server(connection.getConnectedUrl()).build(), Nats::connect); + } +} diff --git a/instrumentation/nats/nats-2.17/metadata.yaml b/instrumentation/nats/nats-2.17/metadata.yaml new file mode 100644 index 000000000000..1a4d1361c7ef --- /dev/null +++ b/instrumentation/nats/nats-2.17/metadata.yaml @@ -0,0 +1,7 @@ +disabled_by_default: false +description: This instrumentation provides messaging spans for NATS +configurations: + - name: otel.instrumentation.messaging.experimental.capture-headers + description: Allows configuring headers to capture as span attributes. + type: list + default: '' diff --git a/instrumentation/nats/nats-2.17/testing/build.gradle.kts b/instrumentation/nats/nats-2.17/testing/build.gradle.kts new file mode 100644 index 000000000000..18ea3fbf6a5b --- /dev/null +++ b/instrumentation/nats/nats-2.17/testing/build.gradle.kts @@ -0,0 +1,11 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + api(project(":testing-common")) + + compileOnly("io.nats:jnats:2.17.2") + + implementation("org.testcontainers:testcontainers") +} diff --git a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsDispatcherTest.java b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsDispatcherTest.java new file mode 100644 index 000000000000..498d2ff7b234 --- /dev/null +++ b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsDispatcherTest.java @@ -0,0 +1,129 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.messagingAttributes; +import static org.assertj.core.api.Assertions.assertThatNoException; + +import io.nats.client.Dispatcher; +import io.nats.client.Subscription; +import io.nats.client.impl.Headers; +import io.nats.client.impl.NatsMessage; +import io.opentelemetry.api.trace.SpanKind; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@SuppressWarnings("deprecation") // using deprecated semconv +public abstract class AbstractNatsDispatcherTest extends AbstractNatsTest { + + private int clientId; + + @BeforeEach + void beforeEach() { + clientId = connection.getServerInfo().getClientId(); + } + + @Test + void testSubscribeDefaultHandler() { + Dispatcher d1 = connection.createDispatcher(msg -> addChildSpan()).subscribe("sub"); + + publishAndAssertTraceAndSpans(); + + // finally, to make sure we're unwrapping properly the + // OpenTelemetryDispatcher in the library + assertThatNoException().isThrownBy(() -> connection.closeDispatcher(d1)); + } + + @Test + void testSubscribeSubscriptionMessageHandler() { + Dispatcher d1 = connection.createDispatcher(); + Subscription s1 = d1.subscribe("sub", msg -> addChildSpan()); + + publishAndAssertTraceAndSpans(); + + // finally, to make sure we're unwrapping properly the + // OpenTelemetryDispatcher in the library + assertThatNoException() + .isThrownBy( + () -> { + d1.unsubscribe(s1); + connection.closeDispatcher(d1); + }); + } + + @Test + void testSubscribeSubscriptionQueueMessageHandler() { + Dispatcher d1 = connection.createDispatcher(); + Subscription s1 = d1.subscribe("sub", "queue", msg -> addChildSpan()); + + publishAndAssertTraceAndSpans(); + + // finally, to make sure we're unwrapping properly the + // OpenTelemetryDispatcher in the library + assertThatNoException() + .isThrownBy( + () -> { + d1.unsubscribe(s1); + connection.closeDispatcher(d1); + }); + } + + void publishAndAssertTraceAndSpans() { + // when + testing() + .runWithSpan( + "parent", + () -> { + NatsMessage.Builder builder = NatsMessage.builder().subject("sub").data("x"); + connection.publish(builder.build()); + connection.publish(builder.headers(new Headers()).build()); + }); + + // then 1 trace + // - parent + // --- 1 publish + // ----- process (propagation with explicit headers) + // -------- test + // --- 1 publish + // ----- process (propagation with headers override) + // -------- test + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("sub publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("sub process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)), + span -> + span.hasName("sub publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("sub process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(4)) + .hasAttributesSatisfyingExactly( + messagingAttributes("process", "sub", clientId)), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(5)))); + } + + void addChildSpan() { + testing().runWithSpan("child", () -> {}); + } +} diff --git a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsExperimentalTest.java b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsExperimentalTest.java new file mode 100644 index 000000000000..4b0f12904c5c --- /dev/null +++ b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsExperimentalTest.java @@ -0,0 +1,72 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.messagingAttributes; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static java.util.Collections.singletonList; + +import io.nats.client.Dispatcher; +import io.nats.client.Message; +import io.nats.client.impl.Headers; +import io.nats.client.impl.NatsMessage; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@SuppressWarnings("deprecation") // using deprecated semconv +public abstract class AbstractNatsExperimentalTest extends AbstractNatsTest { + + private int clientId; + + @BeforeEach + void beforeEach() { + clientId = connection.getServerInfo().getClientId(); + } + + @Test + void testCapturedHeaders() { + // given + Dispatcher dispatcher = connection.createDispatcher(msg -> {}).subscribe("sub"); + + // when + Headers headers = new Headers(); + headers.put("captured-header", "value"); + testing() + .runWithSpan( + "parent", + () -> { + Message message = + NatsMessage.builder().subject("sub").headers(headers).data("x").build(); + connection.publish(message); + }); + connection.closeDispatcher(dispatcher); + + // then + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("sub publish") + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + messagingAttributes( + "publish", + "sub", + clientId, + equalTo( + AttributeKey.stringArrayKey( + "messaging.header.captured_header"), + singletonList("value")))), + span -> + span.hasName("sub process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)))); + } +} diff --git a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java new file mode 100644 index 000000000000..9b89d7bae129 --- /dev/null +++ b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsPublishTest.java @@ -0,0 +1,117 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.assertTraceparentHeader; +import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.messagingAttributes; + +import io.nats.client.Subscription; +import io.nats.client.impl.Headers; +import io.nats.client.impl.NatsMessage; +import io.opentelemetry.api.trace.SpanKind; +import java.time.Duration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@SuppressWarnings("deprecation") // using deprecated semconv +public abstract class AbstractNatsPublishTest extends AbstractNatsTest { + + private int clientId; + private Subscription subscription; + + @BeforeEach + void beforeEach() { + clientId = connection.getServerInfo().getClientId(); + subscription = connection.subscribe("sub"); + } + + @AfterEach + void afterEach() throws InterruptedException { + subscription.drain(Duration.ofSeconds(10)); + } + + @Test + void testPublishBody() throws InterruptedException { + // when + testing().runWithSpan("parent", () -> connection.publish("sub", new byte[] {0})); + + // then + assertPublishSpan(); + assertTraceparentHeader(subscription); + } + + @Test + void testPublishHeadersBody() throws InterruptedException { + // when + testing().runWithSpan("parent", () -> connection.publish("sub", new Headers(), new byte[] {0})); + + // then + assertPublishSpan(); + assertTraceparentHeader(subscription); + } + + @Test + void testPublishReplyToBody() throws InterruptedException { + // when + testing().runWithSpan("parent", () -> connection.publish("sub", "rt", new byte[] {0})); + + // then + assertPublishSpan(); + assertTraceparentHeader(subscription); + } + + @Test + void testPublishReplyToHeadersBody() throws InterruptedException { + // when + testing() + .runWithSpan( + "parent", () -> connection.publish("sub", "rt", new Headers(), new byte[] {0})); + + // then + assertPublishSpan(); + assertTraceparentHeader(subscription); + } + + @Test + void testPublishMessage() throws InterruptedException { + NatsMessage message = NatsMessage.builder().subject("sub").data("x").build(); + + // when + testing().runWithSpan("parent", () -> connection.publish(message)); + + // then + assertPublishSpan(); + assertTraceparentHeader(subscription); + } + + @Test + void testPublishMessageWithHeaders() throws InterruptedException { + NatsMessage message = + NatsMessage.builder().subject("sub").data("x").headers(new Headers()).build(); + + // when + testing().runWithSpan("parent", () -> connection.publish(message)); + + // then + assertPublishSpan(); + assertTraceparentHeader(subscription); + } + + private void assertPublishSpan() { + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("sub publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + messagingAttributes("publish", "sub", clientId)))); + } +} diff --git a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsRequestTest.java b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsRequestTest.java new file mode 100644 index 000000000000..716f8e38010a --- /dev/null +++ b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsRequestTest.java @@ -0,0 +1,370 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.assertTraceparentHeader; +import static io.opentelemetry.instrumentation.nats.v2_17.NatsTestHelper.messagingAttributes; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_TEMPORARY; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +import io.nats.client.Dispatcher; +import io.nats.client.Message; +import io.nats.client.Subscription; +import io.nats.client.impl.Headers; +import io.nats.client.impl.NatsMessage; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@SuppressWarnings("deprecation") // using deprecated semconv +public abstract class AbstractNatsRequestTest extends AbstractNatsTest { + + private int clientId; + private Subscription subscription; + + @BeforeEach + void beforeEach() { + clientId = connection.getServerInfo().getClientId(); + subscription = connection.subscribe("sub"); + } + + @AfterEach + void afterEach() throws InterruptedException { + subscription.drain(Duration.ofSeconds(10)); + } + + @Test + void testRequestTimeout() throws InterruptedException { + // when + Message message = + testing() + .runWithSpan( + "parent", () -> connection.request("sub", new byte[] {0}, Duration.ofSeconds(1))); + + // then + assertThat(message).isNull(); + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("sub publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + messagingAttributes("publish", "sub", clientId)))); + assertTraceparentHeader(subscription); + } + + @Test + void testRequestBody() throws InterruptedException { + // given + Dispatcher dispatcher = + connection + .createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData())) + .subscribe("sub"); + + // when + Message message = + testing() + .runWithSpan( + "parent", () -> connection.request("sub", new byte[] {0}, Duration.ofSeconds(10))); + connection.closeDispatcher(dispatcher); + + // then + assertThat(message).isNotNull(); + assertPublishReceiveSpansSameTrace(); + assertTraceparentHeader(subscription); + } + + @Test + void testRequestHeadersBody() throws InterruptedException { + // given + Dispatcher dispatcher = + connection + .createDispatcher(m -> connection.publish(m.getReplyTo(), new Headers(), m.getData())) + .subscribe("sub"); + + // when + Message message = + testing() + .runWithSpan( + "parent", + () -> + connection.request( + "sub", new Headers(), new byte[] {0}, Duration.ofSeconds(10))); + connection.closeDispatcher(dispatcher); + + // then + assertThat(message).isNotNull(); + assertPublishReceiveSpansSameTrace(); + assertTraceparentHeader(subscription); + } + + @Test + void testRequestMessage() throws InterruptedException { + // given + Dispatcher dispatcher = + connection + .createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData())) + .subscribe("sub"); + NatsMessage message = NatsMessage.builder().subject("sub").data("x").build(); + + // when + Message response = + testing().runWithSpan("parent", () -> connection.request(message, Duration.ofSeconds(10))); + connection.closeDispatcher(dispatcher); + + // then + assertThat(response).isNotNull(); + assertPublishReceiveSpansSameTrace(); + assertTraceparentHeader(subscription); + } + + @Test + void testRequestMessageHeaders() throws InterruptedException { + // given + Dispatcher dispatcher = + connection + .createDispatcher(m -> connection.publish(m.getReplyTo(), new Headers(), m.getData())) + .subscribe("sub"); + NatsMessage message = + NatsMessage.builder().subject("sub").headers(new Headers()).data("x").build(); + + // when + Message response = + testing().runWithSpan("parent", () -> connection.request(message, Duration.ofSeconds(10))); + connection.closeDispatcher(dispatcher); + + // then + assertThat(response).isNotNull(); + assertPublishReceiveSpansSameTrace(); + assertTraceparentHeader(subscription); + } + + @Test + void testRequestFutureBody() throws InterruptedException { + // given + Dispatcher dispatcher = + connection + .createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData())) + .subscribe("sub"); + + // when + CompletableFuture message = + testing() + .runWithSpan("parent", () -> connection.request("sub", new byte[] {0})) + .whenComplete((m, e) -> connection.closeDispatcher(dispatcher)); + + // then + assertPublishReceiveSpansSameTrace(); + assertTraceparentHeader(subscription); + assertThat(message).isCompletedWithValueMatching(Objects::nonNull); + } + + @Test + void testRequestFutureHeadersBody() throws InterruptedException { + // given + Dispatcher dispatcher = + connection + .createDispatcher(m -> connection.publish(m.getReplyTo(), new Headers(), m.getData())) + .subscribe("sub"); + + // when + CompletableFuture message = + testing() + .runWithSpan("parent", () -> connection.request("sub", new Headers(), new byte[] {0})) + .whenComplete((m, e) -> connection.closeDispatcher(dispatcher)); + + // then + assertPublishReceiveSpansSameTrace(); + assertTraceparentHeader(subscription); + assertThat(message).isCompletedWithValueMatching(Objects::nonNull); + } + + @Test + void testRequestFutureMessage() throws InterruptedException { + // given + Dispatcher dispatcher = + connection + .createDispatcher(m -> connection.publish(m.getReplyTo(), m.getData())) + .subscribe("sub"); + NatsMessage message = NatsMessage.builder().subject("sub").data("x").build(); + + // when + CompletableFuture response = + testing() + .runWithSpan("parent", () -> connection.request(message)) + .whenComplete((m, e) -> connection.closeDispatcher(dispatcher)); + + // then + assertPublishReceiveSpansSameTrace(); + assertTraceparentHeader(subscription); + assertThat(response).isCompletedWithValueMatching(Objects::nonNull); + } + + @Test + void testRequestFutureMessageHeaders() throws InterruptedException { + // given + Dispatcher dispatcher = + connection + .createDispatcher(m -> connection.publish(m.getReplyTo(), new Headers(), m.getData())) + .subscribe("sub"); + NatsMessage message = + NatsMessage.builder().subject("sub").headers(new Headers()).data("x").build(); + + // when + CompletableFuture response = + testing() + .runWithSpan("parent", () -> connection.request(message)) + .whenComplete((m, e) -> connection.closeDispatcher(dispatcher)); + + // then + assertPublishReceiveSpansSameTrace(); + assertTraceparentHeader(subscription); + assertThat(response).isCompletedWithValueMatching(Objects::nonNull); + } + + @Test + void testRequestTimeoutFutureBody() throws InterruptedException { + // when + CompletableFuture message = + testing() + .runWithSpan( + "parent", + () -> connection.requestWithTimeout("sub", new byte[] {0}, Duration.ofSeconds(1))); + + // then + assertCancellationPublishSpan(); + assertTraceparentHeader(subscription); + assertThat(message).isCompletedExceptionally(); + } + + @Test + void testRequestTimeoutFutureHeadersBody() throws InterruptedException { + // when + CompletableFuture message = + testing() + .runWithSpan( + "parent", + () -> + connection.requestWithTimeout( + "sub", new Headers(), new byte[] {0}, Duration.ofSeconds(1))); + + // then + assertCancellationPublishSpan(); + assertTraceparentHeader(subscription); + assertThat(message).isCompletedExceptionally(); + } + + @Test + void testRequestTimeoutFutureMessage() throws InterruptedException { + // given + NatsMessage message = NatsMessage.builder().subject("sub").data("x").build(); + + // when + CompletableFuture response = + testing() + .runWithSpan( + "parent", () -> connection.requestWithTimeout(message, Duration.ofSeconds(1))); + + // then + assertCancellationPublishSpan(); + assertTraceparentHeader(subscription); + assertThat(response).isCompletedExceptionally(); + } + + @Test + void testRequestTimeoutFutureMessageHeaders() throws InterruptedException { + // given + NatsMessage message = + NatsMessage.builder().subject("sub").headers(new Headers()).data("x").build(); + + // when + CompletableFuture response = + testing() + .runWithSpan( + "parent", () -> connection.requestWithTimeout(message, Duration.ofSeconds(1))); + + // then + assertCancellationPublishSpan(); + assertTraceparentHeader(subscription); + assertThat(response).isCompletedExceptionally(); + } + + private void assertPublishReceiveSpansSameTrace() { + testing() + .waitAndAssertTraces( + trace -> { + List> asserts = + new ArrayList<>( + asList( + // publisher: parent + publish + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("sub publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + messagingAttributes("publish", "sub", clientId)), + // subscriber: process + publish(response) + span -> + span.hasName("sub process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)), + span -> + span.hasName("(temporary) publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(2)), + // publisher: process + span -> + span.hasName("(temporary) process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(3)) + .hasAttributesSatisfyingExactly( + messagingAttributes( + "process", + "(temporary)", + clientId, + equalTo(MESSAGING_DESTINATION_TEMPORARY, true))))); + + trace.hasSpansSatisfyingExactly(asserts); + }); + } + + private void assertCancellationPublishSpan() { + assertExceptionPublishSpan( + new CancellationException( + "Future cancelled, response not registered in time, check connection status.")); + } + + private void assertExceptionPublishSpan(Throwable exception) { + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("sub publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasException(exception) + .hasAttributesSatisfyingExactly( + messagingAttributes("publish", "sub", clientId)))); + } +} diff --git a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsTest.java b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsTest.java new file mode 100644 index 000000000000..899f41384dc1 --- /dev/null +++ b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/AbstractNatsTest.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import io.nats.client.Connection; +import io.nats.client.Nats; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +abstract class AbstractNatsTest { + + static DockerImageName natsImage; + static GenericContainer natsContainer; + static Connection connection; + + protected abstract InstrumentationExtension testing(); + + @BeforeAll + static void beforeAll() throws IOException, InterruptedException { + natsImage = DockerImageName.parse("nats:2.11.2-alpine3.21"); + + natsContainer = new GenericContainer<>(natsImage).withExposedPorts(4222); + natsContainer.start(); + + String host = natsContainer.getHost(); + Integer port = natsContainer.getMappedPort(4222); + connection = Nats.connect("nats://" + host + ":" + port); + } + + @AfterAll + static void afterAll() throws InterruptedException, TimeoutException { + connection.drain(Duration.ZERO); + natsContainer.close(); + } +} diff --git a/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTestHelper.java b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTestHelper.java new file mode 100644 index 000000000000..c83880fea484 --- /dev/null +++ b/instrumentation/nats/nats-2.17/testing/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/NatsTestHelper.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.nats.v2_17; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static org.assertj.core.api.Assertions.assertThat; + +import io.nats.client.Message; +import io.nats.client.Subscription; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import java.time.Duration; + +@SuppressWarnings("deprecation") // using deprecated semconv +public class NatsTestHelper { + + // copied from MessagingIncubatingAttributes + private static final AttributeKey MESSAGING_CLIENT_ID = stringKey("messaging.client_id"); + + public static AttributeAssertion[] messagingAttributes( + String operation, String subject, int clientId, AttributeAssertion other) { + return messagingAttributes(operation, subject, clientId, new AttributeAssertion[] {other}); + } + + public static AttributeAssertion[] messagingAttributes( + String operation, String subject, int clientId, AttributeAssertion[] other) { + AttributeAssertion[] standard = messagingAttributes(operation, subject, clientId); + AttributeAssertion[] result = new AttributeAssertion[standard.length + other.length]; + System.arraycopy(standard, 0, result, 0, standard.length); + System.arraycopy(other, 0, result, standard.length, other.length); + return result; + } + + public static AttributeAssertion[] messagingAttributes( + String operation, String subject, int clientId) { + return new AttributeAssertion[] { + equalTo(MESSAGING_OPERATION, operation), + equalTo(MESSAGING_SYSTEM, "nats"), + equalTo(MESSAGING_DESTINATION_NAME, subject), + equalTo(MESSAGING_MESSAGE_BODY_SIZE, 1), + equalTo(MESSAGING_CLIENT_ID, String.valueOf(clientId)) + }; + } + + public static void assertTraceparentHeader(Subscription subscription) + throws InterruptedException { + Message published = subscription.nextMessage(Duration.ofSeconds(10)); + assertThat(published.getHeaders().get("traceparent")).isNotEmpty(); + } + + private NatsTestHelper() {} +} diff --git a/settings.gradle.kts b/settings.gradle.kts index b3ff9cbd0215..c77883d50b65 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -415,6 +415,9 @@ include(":instrumentation:mongo:mongo-4.0:javaagent") include(":instrumentation:mongo:mongo-async-3.3:javaagent") include(":instrumentation:mongo:mongo-common:testing") include(":instrumentation:mybatis-3.2:javaagent") +include(":instrumentation:nats:nats-2.17:javaagent") +include(":instrumentation:nats:nats-2.17:library") +include(":instrumentation:nats:nats-2.17:testing") include(":instrumentation:netty:netty-3.8:javaagent") include(":instrumentation:netty:netty-4.0:javaagent") include(":instrumentation:netty:netty-4.1:javaagent")