diff --git a/.fossa.yml b/.fossa.yml index b8327cee5a7d..281b3c6e2976 100644 --- a/.fossa.yml +++ b/.fossa.yml @@ -895,6 +895,9 @@ targets: - type: gradle path: ./ target: ':instrumentation:spring:spring-kafka-2.7:library' + - type: gradle + path: ./ + target: ':instrumentation:spring:spring-pulsar-1.0:javaagent' - type: gradle path: ./ target: ':instrumentation:spring:spring-rabbit-1.0:javaagent' diff --git a/docs/supported-libraries.md b/docs/supported-libraries.md index c120ff44cde2..578837ed5077 100644 --- a/docs/supported-libraries.md +++ b/docs/supported-libraries.md @@ -138,6 +138,7 @@ These are the supported libraries and frameworks: | [Spring Integration](https://spring.io/projects/spring-integration) | 4.1+ (not including 6.0+ yet) | [opentelemetry-spring-integration-4.1](../instrumentation/spring/spring-integration-4.1/library) | [Messaging Spans] | | [Spring JMS](https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms) | 2.0+ | N/A | [Messaging Spans] | | [Spring Kafka](https://spring.io/projects/spring-kafka) | 2.7+ | [opentelemetry-spring-kafka-2.7](../instrumentation/spring/spring-kafka-2.7/library) | [Messaging Spans] | +| [Spring Pulsar](https://spring.io/projects/spring-pulsar) | 1.0+ | | [Messaging Spans] | | [Spring RabbitMQ](https://spring.io/projects/spring-amqp) | 1.0+ | N/A | [Messaging Spans] | | [Spring RestTemplate](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/client/package-summary.html) | 3.1+ | [opentelemetry-spring-web-3.1](../instrumentation/spring/spring-web/spring-web-3.1/library) | [HTTP Client Spans], [HTTP Client Metrics] | | [Spring RMI](https://docs.spring.io/spring-framework/docs/4.0.x/javadoc-api/org/springframework/remoting/rmi/package-summary.html) | 4.0+ | N/A | [RPC Client Spans], [RPC Server Spans] | diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 14ad9a137aa0..c26022efea02 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -263,6 +263,8 @@ public static CompletableFuture> wrapBatch( (messages, throwable) -> { Context context = startAndEndConsumerReceive(parent, messages, timer, consumer, throwable); + // injected context is used in the spring-pulsar instrumentation + messages.forEach(message -> VirtualFieldStore.inject(message, context)); runWithContext( context, () -> { diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts b/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts new file mode 100644 index 000000000000..9364d6f73d9e --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts @@ -0,0 +1,76 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("org.springframework.pulsar") + module.set("spring-pulsar") + versions.set("[1.0.0,)") + assertInverse.set(true) + excludeInstrumentationName("pulsar-2.8") + } +} + +dependencies { + library("org.springframework.pulsar:spring-pulsar:1.0.0") + implementation(project(":instrumentation:pulsar:pulsar-2.8:javaagent")) + + testInstrumentation(project(":instrumentation:pulsar:pulsar-2.8:javaagent")) + + testImplementation(project(":instrumentation:spring:spring-pulsar-1.0:testing")) + + testLibrary("org.springframework.boot:spring-boot-starter-test:3.2.4") + testLibrary("org.springframework.boot:spring-boot-starter:3.2.4") +} + +val latestDepTest = findProperty("testLatestDeps") as Boolean + +testing { + suites { + val testReceiveSpansDisabled by registering(JvmTestSuite::class) { + dependencies { + implementation(project(":instrumentation:spring:spring-pulsar-1.0:testing")) + + if (latestDepTest) { + implementation("org.springframework.pulsar:spring-pulsar:latest.release") + implementation("org.springframework.boot:spring-boot-starter-test:latest.release") + implementation("org.springframework.boot:spring-boot-starter:latest.release") + } else { + implementation("org.springframework.pulsar:spring-pulsar:1.0.0") + implementation("org.springframework.boot:spring-boot-starter-test:3.2.4") + implementation("org.springframework.boot:spring-boot-starter:3.2.4") + } + } + + targets { + all { + testTask.configure { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false") + } + } + } + } + } +} + +tasks { + test { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + + check { + dependsOn(testing.suites) + } +} + +// spring 6 requires java 17 +otelJava { + minJavaVersionSupported.set(JavaVersion.VERSION_17) +} diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java new file mode 100644 index 000000000000..5952cefaf5f0 --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java @@ -0,0 +1,66 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0; + +import static io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0.SpringPulsarSingletons.instrumenter; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.api.Message; + +public class DefaultPulsarMessageListenerContainerInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named( + "org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("dispatchMessageToListener") + .and(takesArguments(3).or(takesArguments(2))) + .and(takesArgument(0, named("org.apache.pulsar.client.api.Message"))), + getClass().getName() + "$DispatchMessageToListenerAdvice"); + } + + @SuppressWarnings("unused") + public static class DispatchMessageToListenerAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(0) Message message, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + Context parentContext = VirtualFieldStore.extract(message); + if (instrumenter().shouldStart(parentContext, message)) { + context = instrumenter().start(parentContext, message); + scope = context.makeCurrent(); + } + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Argument(0) Message message, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope, + @Advice.Thrown Throwable throwable) { + if (scope == null) { + return; + } + scope.close(); + instrumenter().end(context, message, null, throwable); + } + } +} diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/MessageHeaderGetter.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/MessageHeaderGetter.java new file mode 100644 index 000000000000..00e8313adaf4 --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/MessageHeaderGetter.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0; + +import io.opentelemetry.context.propagation.TextMapGetter; +import javax.annotation.Nullable; +import org.apache.pulsar.client.api.Message; + +enum MessageHeaderGetter implements TextMapGetter> { + INSTANCE; + + @Override + public Iterable keys(Message carrier) { + return carrier.getProperties().keySet(); + } + + @Nullable + @Override + public String get(@Nullable Message carrier, String key) { + return carrier == null ? null : carrier.getProperties().get(key); + } +} diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarInstrumentationModule.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarInstrumentationModule.java new file mode 100644 index 000000000000..c9681bf7765e --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarInstrumentationModule.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static java.util.Collections.singletonList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.List; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(InstrumentationModule.class) +public class SpringPulsarInstrumentationModule extends InstrumentationModule { + public SpringPulsarInstrumentationModule() { + super("spring-pulsar", "spring-pulsar-1.0"); + } + + @Override + public ElementMatcher.Junction classLoaderMatcher() { + // added in 1.0.0 + return hasClassesNamed( + "org.springframework.pulsar.annotation.PulsarListenerConsumerBuilderCustomizer"); + } + + @Override + public List typeInstrumentations() { + return singletonList(new DefaultPulsarMessageListenerContainerInstrumentation()); + } +} diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarMessageAttributesGetter.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarMessageAttributesGetter.java new file mode 100644 index 000000000000..47e91df44e18 --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarMessageAttributesGetter.java @@ -0,0 +1,90 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; + +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pulsar.client.api.Message; + +enum SpringPulsarMessageAttributesGetter implements MessagingAttributesGetter, Void> { + INSTANCE; + + @Override + public String getSystem(Message message) { + return "pulsar"; + } + + @Override + @Nullable + public String getDestination(Message message) { + return message.getTopicName(); + } + + @Nullable + @Override + public String getDestinationTemplate(Message message) { + return null; + } + + @Override + public boolean isTemporaryDestination(Message message) { + return false; + } + + @Override + public boolean isAnonymousDestination(Message message) { + return false; + } + + @Override + @Nullable + public String getConversationId(Message message) { + return null; + } + + @Override + public Long getMessageBodySize(Message message) { + return (long) message.size(); + } + + @Nullable + @Override + public Long getMessageEnvelopeSize(Message message) { + return null; + } + + @Override + @Nullable + public String getMessageId(Message message, @Nullable Void unused) { + if (message.getMessageId() != null) { + return message.getMessageId().toString(); + } + + return null; + } + + @Nullable + @Override + public String getClientId(Message message) { + return null; + } + + @Nullable + @Override + public Long getBatchMessageCount(Message message, @Nullable Void unused) { + return null; + } + + @Override + public List getMessageHeader(Message message, String name) { + String value = message.getProperty(name); + return value != null ? singletonList(value) : emptyList(); + } +} diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java new file mode 100644 index 000000000000..fc9c454dcda2 --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java @@ -0,0 +1,55 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; +import org.apache.pulsar.client.api.Message; + +public final class SpringPulsarSingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-pulsar-1.0"; + private static final Instrumenter, Void> INSTRUMENTER; + + static { + OpenTelemetry openTelemetry = GlobalOpenTelemetry.get(); + SpringPulsarMessageAttributesGetter getter = SpringPulsarMessageAttributesGetter.INSTANCE; + MessageOperation operation = MessageOperation.PROCESS; + boolean messagingReceiveInstrumentationEnabled = + ExperimentalConfig.get().messagingReceiveInstrumentationEnabled(); + + InstrumenterBuilder, Void> builder = + Instrumenter., Void>builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .build()); + if (messagingReceiveInstrumentationEnabled) { + builder.addSpanLinksExtractor( + new PropagatorBasedSpanLinksExtractor<>( + openTelemetry.getPropagators().getTextMapPropagator(), MessageHeaderGetter.INSTANCE)); + INSTRUMENTER = builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + } else { + INSTRUMENTER = builder.buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE); + } + } + + public static Instrumenter, Void> instrumenter() { + return INSTRUMENTER; + } + + private SpringPulsarSingletons() {} +} diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarTest.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarTest.java new file mode 100644 index 000000000000..2fba60091be1 --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarTest.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0; + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.api.trace.SpanKind.PRODUCER; +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; + +import io.opentelemetry.instrumentation.spring.pulsar.v1_0.AbstractSpringPulsarTest; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.concurrent.atomic.AtomicReference; + +class SpringPulsarTest extends AbstractSpringPulsarTest { + + @Override + protected void assertSpringPulsar() { + AtomicReference producer = new AtomicReference<>(); + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(INTERNAL, CONSUMER), + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> { + span.hasName(OTEL_TOPIC + " publish") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(publishAttributes()); + + producer.set(trace.getSpan(1)); + }), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(String.format("%s receive", OTEL_TOPIC)) + .hasKind(CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly(receiveAttributes()), + span -> + span.hasName(String.format("%s process", OTEL_TOPIC)) + .hasKind(CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasAttributesSatisfyingExactly(processAttributes()), + span -> span.hasName("consumer").hasParent(trace.getSpan(1)))); + } +} diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/testReceiveSpansDisabled/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSuppressReceiveSpansTest.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/testReceiveSpansDisabled/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSuppressReceiveSpansTest.java new file mode 100644 index 000000000000..f4abc0b566c6 --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/testReceiveSpansDisabled/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSuppressReceiveSpansTest.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0; + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER; +import static io.opentelemetry.api.trace.SpanKind.PRODUCER; + +import io.opentelemetry.instrumentation.spring.pulsar.v1_0.AbstractSpringPulsarTest; + +class SpringPulsarSuppressReceiveSpansTest extends AbstractSpringPulsarTest { + + @Override + protected void assertSpringPulsar() { + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName(OTEL_TOPIC + " publish") + .hasKind(PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(publishAttributes()), + span -> + span.hasName(String.format("%s process", OTEL_TOPIC)) + .hasKind(CONSUMER) + .hasParent(trace.getSpan(1)) + .hasTotalRecordedLinks(0) + .hasAttributesSatisfyingExactly(processAttributes()), + span -> span.hasName("consumer").hasParent(trace.getSpan(2))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName(String.format("%s receive", OTEL_TOPIC)).hasKind(CONSUMER))); + } +} diff --git a/instrumentation/spring/spring-pulsar-1.0/testing/build.gradle.kts b/instrumentation/spring/spring-pulsar-1.0/testing/build.gradle.kts new file mode 100644 index 000000000000..53bba3f14dd0 --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/testing/build.gradle.kts @@ -0,0 +1,12 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + implementation(project(":testing-common")) + implementation("org.testcontainers:pulsar") + + compileOnly("org.springframework.pulsar:spring-pulsar:1.0.0") + compileOnly("org.springframework.boot:spring-boot-starter-test:3.2.4") + compileOnly("org.springframework.boot:spring-boot-starter:3.2.4") +} diff --git a/instrumentation/spring/spring-pulsar-1.0/testing/src/main/java/io/opentelemetry/instrumentation/spring/pulsar/v1_0/AbstractSpringPulsarTest.java b/instrumentation/spring/spring-pulsar-1.0/testing/src/main/java/io/opentelemetry/instrumentation/spring/pulsar/v1_0/AbstractSpringPulsarTest.java new file mode 100644 index 000000000000..336a722fa030 --- /dev/null +++ b/instrumentation/spring/spring-pulsar-1.0/testing/src/main/java/io/opentelemetry/instrumentation/spring/pulsar/v1_0/AbstractSpringPulsarTest.java @@ -0,0 +1,156 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.pulsar.v1_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static java.util.Arrays.asList; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.GlobalTraceUtil; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.assertj.core.api.AbstractLongAssert; +import org.assertj.core.api.AbstractStringAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.core.PulsarTemplate; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.utility.DockerImageName; + +@SuppressWarnings("deprecation") // using deprecated semconv +public abstract class AbstractSpringPulsarTest { + + @RegisterExtension + protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + static final DockerImageName DEFAULT_IMAGE_NAME = + DockerImageName.parse("apachepulsar/pulsar:4.0.2"); + static PulsarContainer pulsarContainer; + static ConfigurableApplicationContext applicationContext; + static PulsarTemplate pulsarTemplate; + static PulsarClient client; + static CountDownLatch latch = new CountDownLatch(1); + static final String OTEL_SUBSCRIPTION = "otel-subscription"; + protected static String brokerHost; + protected static int brokerPort; + protected static final String OTEL_TOPIC = "persistent://public/default/otel-topic"; + + @BeforeAll + @SuppressWarnings("unchecked") + static void setUp() throws PulsarClientException { + pulsarContainer = + new PulsarContainer(DEFAULT_IMAGE_NAME) + .withEnv("PULSAR_MEM", "-Xmx128m") + .withStartupTimeout(Duration.ofMinutes(2)); + pulsarContainer.start(); + brokerHost = pulsarContainer.getHost(); + brokerPort = pulsarContainer.getMappedPort(6650); + + SpringApplication app = new SpringApplication(ConsumerConfig.class); + Map props = new HashMap<>(); + props.put("spring.main.web-application-type", "none"); + props.put("spring.pulsar.client.service-url", pulsarContainer.getPulsarBrokerUrl()); + props.put("spring.pulsar.consumer.subscription.initial-position", "earliest"); + app.setDefaultProperties(props); + applicationContext = app.run(); + pulsarTemplate = applicationContext.getBean(PulsarTemplate.class); + + client = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build(); + } + + @Test + void testSpringPulsar() throws PulsarClientException, InterruptedException { + testing.runWithSpan( + "parent", + () -> { + pulsarTemplate.send(OTEL_TOPIC, "test"); + }); + latch.await(10, TimeUnit.SECONDS); + assertSpringPulsar(); + } + + @AfterAll + static void teardown() { + if (applicationContext != null) { + applicationContext.close(); + } + if (pulsarContainer != null) { + pulsarContainer.stop(); + } + } + + protected abstract void assertSpringPulsar(); + + static final AttributeKey MESSAGE_TYPE = + AttributeKey.stringKey("messaging.pulsar.message.type"); + + protected List publishAttributes() { + return asList( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION, "publish"), + equalTo(MESSAGING_DESTINATION_NAME, OTEL_TOPIC), + satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), + satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotEmpty), + equalTo(SERVER_ADDRESS, brokerHost), + equalTo(SERVER_PORT, brokerPort), + equalTo(MESSAGE_TYPE, "normal")); + } + + protected List processAttributes() { + return asList( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), + satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotEmpty), + equalTo(MESSAGING_DESTINATION_NAME, OTEL_TOPIC)); + } + + protected List receiveAttributes() { + return asList( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION, "receive"), + equalTo(MESSAGING_DESTINATION_NAME, OTEL_TOPIC), + satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), + satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isNotNegative), + equalTo(SERVER_ADDRESS, brokerHost), + equalTo(SERVER_PORT, brokerPort)); + } + + @SpringBootConfiguration + @EnableAutoConfiguration + static class ConsumerConfig { + @PulsarListener(subscriptionName = OTEL_SUBSCRIPTION, topics = OTEL_TOPIC) + void consumer(String ignored) { + GlobalTraceUtil.runWithSpan("consumer", () -> {}); + latch.countDown(); + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index b363efe00440..e2f863ecf1cc 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -554,6 +554,8 @@ include(":instrumentation:spring:spring-jms:spring-jms-6.0:javaagent") include(":instrumentation:spring:spring-kafka-2.7:javaagent") include(":instrumentation:spring:spring-kafka-2.7:library") include(":instrumentation:spring:spring-kafka-2.7:testing") +include(":instrumentation:spring:spring-pulsar-1.0:javaagent") +include(":instrumentation:spring:spring-pulsar-1.0:testing") include(":instrumentation:spring:spring-rabbit-1.0:javaagent") include(":instrumentation:spring:spring-rmi-4.0:javaagent") include(":instrumentation:spring:spring-scheduling-3.1:bootstrap")