diff --git a/.fossa.yml b/.fossa.yml index ce66de0fded9..e6a2150ddaef 100644 --- a/.fossa.yml +++ b/.fossa.yml @@ -772,6 +772,12 @@ targets: - type: gradle path: ./ target: ':instrumentation:pulsar:pulsar-2.8:javaagent' + - type: gradle + path: ./ + target: ':instrumentation:pulsar:pulsar-4.0:javaagent' + - type: gradle + path: ./ + target: ':instrumentation:pulsar:pulsar-common:javaagent' - type: gradle path: ./ target: ':instrumentation:ratpack:ratpack-1.4:javaagent' diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index f87c814d2a31..f2e1528f5b05 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -66,5 +66,12 @@ static void applyReceiveMessagesAdvice(LongCounterBuilder builder) { ((ExtendedLongCounterBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES); } + static void applyProduceMessagesAdvice(LongCounterBuilder builder) { + if (!(builder instanceof ExtendedLongCounterBuilder)) { + return; + } + ((ExtendedLongCounterBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES); + } + private MessagingMetricsAdvice() {} } diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 44d5b243744a..ce4b16950044 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -9,9 +9,12 @@ import com.google.auto.value.AutoValue; import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.context.Context; import io.opentelemetry.context.ContextKey; @@ -29,11 +32,15 @@ public final class MessagingProducerMetrics implements OperationListener { private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); + // copied from MessagingIncubatingAttributes + private static final AttributeKey MESSAGING_BATCH_MESSAGE_COUNT = + AttributeKey.longKey("messaging.batch.message_count"); private static final ContextKey MESSAGING_PRODUCER_METRICS_STATE = ContextKey.named("messaging-producer-metrics-state"); private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); private final DoubleHistogram publishDurationHistogram; + private final LongCounter produceMessageCount; private MessagingProducerMetrics(Meter meter) { DoubleHistogramBuilder durationBuilder = @@ -44,6 +51,14 @@ private MessagingProducerMetrics(Meter meter) { .setUnit("s"); MessagingMetricsAdvice.applyPublishDurationAdvice(durationBuilder); publishDurationHistogram = durationBuilder.build(); + + LongCounterBuilder longCounterBuilder = + meter + .counterBuilder("messaging.client.sent.messages") + .setDescription("Number of messages producer attempted to send to the broker.") + .setUnit("{message}"); + MessagingMetricsAdvice.applyProduceMessagesAdvice(longCounterBuilder); + produceMessageCount = longCounterBuilder.build(); } public static OperationMetrics get() { @@ -73,6 +88,21 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) { publishDurationHistogram.record( (endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); + + long receiveMessagesCount = getProduceMessagesCount(state.startAttributes(), endAttributes); + if (receiveMessagesCount > 0) { + produceMessageCount.add(receiveMessagesCount, attributes, context); + } + } + + private static long getProduceMessagesCount(Attributes... attributesList) { + for (Attributes attributes : attributesList) { + Long value = attributes.get(MESSAGING_BATCH_MESSAGE_COUNT); + if (value != null) { + return value; + } + } + return 0; } @AutoValue diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/build.gradle.kts b/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/build.gradle.kts index 4a143d7886c7..53082fb85cdc 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/build.gradle.kts +++ b/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/build.gradle.kts @@ -4,4 +4,5 @@ plugins { dependencies { testImplementation(project(":instrumentation:pulsar:pulsar-2.8:javaagent")) + testImplementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParserTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParserTest.java index bb109a99c6f6..19f98c595eee 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParserTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent-unit-tests/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParserTest.java @@ -7,7 +7,8 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.UrlData; import org.junit.jupiter.api.Test; public class UrlParserTest { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts b/instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts index a0f01abb60b3..c1c041b34335 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts @@ -13,6 +13,7 @@ muzzle { dependencies { library("org.apache.pulsar:pulsar-client:2.8.0") + implementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) testImplementation("javax.annotation:javax.annotation-api:1.3.2") testImplementation("org.testcontainers:pulsar") diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerBaseInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerBaseInstrumentation.java index dc2e3baea918..c755e64e4092 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerBaseInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerBaseInstrumentation.java @@ -11,7 +11,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.MessageListenerContext; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageListenerContext; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java index 0919440dc11e..3d90ce114370 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java @@ -20,6 +20,7 @@ import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; import java.util.concurrent.CompletableFuture; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageInstrumentation.java index f1ff1814b4f7..654ac19920ff 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageInstrumentation.java @@ -12,6 +12,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java index d0b8dfe6a8ff..4a8d2419058c 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/MessageListenerInstrumentation.java @@ -15,7 +15,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.implementation.bytecode.assign.Assigner; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java index 79211824e317..6466ffcd7913 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerImplInstrumentation.java @@ -16,7 +16,8 @@ import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java index 85295042e335..23acf3766d7c 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackInstrumentation.java @@ -15,7 +15,9 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.SendCallbackData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java index ae1c81cf188c..65aa5c96a469 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java @@ -5,9 +5,10 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; -import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.parseUrl; +import static io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.parseUrl; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.BasePulsarRequest; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.common.naming.TopicName; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequestSpanLinksExtractor.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequestSpanLinksExtractor.java index 9f93fab83e25..cc1dfbc51f66 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequestSpanLinksExtractor.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequestSpanLinksExtractor.java @@ -10,6 +10,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageTextMapGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import org.apache.pulsar.client.api.Message; final class PulsarBatchRequestSpanLinksExtractor implements SpanLinksExtractor { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java index 08492480c143..0c1be0eb0121 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java @@ -9,6 +9,7 @@ import static java.util.Collections.singletonList; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import java.util.List; import javax.annotation.Nullable; import org.apache.pulsar.client.api.Message; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index c26022efea02..3fcc0701a9b3 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -27,7 +27,13 @@ import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor; import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.ExperimentalProducerAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageListenerContext; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageTextMapGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageTextMapSetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarNetClientAttributesGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.Consumer; diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts new file mode 100644 index 000000000000..0e5b63b28996 --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/build.gradle.kts @@ -0,0 +1,20 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("org.apache.pulsar") + module.set("pulsar-client") + versions.set("[4.0.0,)") + assertInverse.set(true) + } +} + +dependencies { + library("org.apache.pulsar:pulsar-client:4.0.0") + implementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) + + testImplementation("org.testcontainers:pulsar") + testImplementation("org.apache.pulsar:pulsar-client-admin:4.0.0") +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/ProducerImplInstrumentation.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/ProducerImplInstrumentation.java new file mode 100644 index 000000000000..81ddfa2820cb --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/ProducerImplInstrumentation.java @@ -0,0 +1,85 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0; + +import static io.opentelemetry.javaagent.instrumentation.pulsar.v4_0.telemetry.PulsarSingletons.producerInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.SendCallback; + +public class ProducerImplInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.pulsar.client.impl.ProducerImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and(isPublic()) + .and( + takesArgument(0, hasSuperType(named("org.apache.pulsar.client.api.PulsarClient")))), + ProducerImplInstrumentation.class.getName() + "$ProducerImplConstructorAdvice"); + transformer.applyAdviceToMethod( + isMethod() + .and(named("sendAsync")) + .and(takesArgument(1, named("org.apache.pulsar.client.impl.SendCallback"))), + ProducerImplInstrumentation.class.getName() + "$ProducerSendAsyncMethodAdvice"); + } + + @SuppressWarnings("unused") + public static class ProducerImplConstructorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void intercept( + @Advice.This ProducerImpl producer, @Advice.Argument(value = 0) PulsarClient client) { + PulsarClientImpl pulsarClient = (PulsarClientImpl) client; + String brokerUrl = pulsarClient.getLookup().getServiceUrl(); + String topic = producer.getTopic(); + VirtualFieldStore.inject(producer, brokerUrl, topic); + } + } + + @SuppressWarnings("unused") + public static class ProducerSendAsyncMethodAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void before( + @Advice.This ProducerImpl producer, + @Advice.Argument(value = 0) Message message, + @Advice.Argument(value = 1) SendCallback callback) { + Context parent = Context.current(); + PulsarRequest request = PulsarRequest.create(message, VirtualFieldStore.extract(producer)); + + if (!producerInstrumenter().shouldStart(parent, request)) { + return; + } + + Context context = producerInstrumenter().start(parent, request); + // Inject the context/request into the SendCallback. This will be extracted and used when the + // message is sent and the callback is invoked. see `SendCallbackInstrumentation`. + VirtualFieldStore.inject(callback, context, request); + } + } +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarInstrumentationModule.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarInstrumentationModule.java new file mode 100644 index 000000000000..f6c9f7844b73 --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarInstrumentationModule.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.Arrays; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class PulsarInstrumentationModule extends InstrumentationModule { + public PulsarInstrumentationModule() { + super("pulsar", "pulsar-4.0"); + } + + @Override + public List typeInstrumentations() { + return Arrays.asList(new ProducerImplInstrumentation(), new SendCallbackInstrumentation()); + } +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/SendCallbackInstrumentation.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/SendCallbackInstrumentation.java new file mode 100644 index 000000000000..429127417571 --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/SendCallbackInstrumentation.java @@ -0,0 +1,80 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasSuperType; +import static io.opentelemetry.javaagent.instrumentation.pulsar.v4_0.telemetry.PulsarSingletons.producerInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.SendCallbackData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.pulsar.client.impl.OpSendMsgStats; +import org.apache.pulsar.client.impl.SendCallback; + +public class SendCallbackInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("org.apache.pulsar.client.impl.SendCallback"); + } + + @Override + public ElementMatcher typeMatcher() { + return hasSuperType(named("org.apache.pulsar.client.impl.SendCallback")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("sendComplete")), + SendCallbackInstrumentation.class.getName() + "$SendCallbackSendCompleteAdvice"); + } + + @SuppressWarnings("unused") + public static class SendCallbackSendCompleteAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.This SendCallback callback, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("otelRequest") PulsarRequest request) { + // Extract the Context and PulsarRequest from the SendCallback instance. + SendCallbackData callBackData = VirtualFieldStore.extract(callback); + if (callBackData != null) { + // If the extraction was successful, store the Context and PulsarRequest in local variables. + otelContext = callBackData.context; + request = callBackData.request; + otelScope = otelContext.makeCurrent(); + } + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.Argument(0) Throwable t, + @Advice.Argument(1) OpSendMsgStats opSendMsgStats, + @Advice.Local("otelContext") Context otelContext, + @Advice.Local("otelScope") Scope otelScope, + @Advice.Local("otelRequest") PulsarRequest request) { + if (otelScope != null) { + // Close the Scope and end the span. + otelScope.close(); + request.setProduceNumMessages(opSendMsgStats.getNumMessagesInBatch()); + producerInstrumenter().end(otelContext, request, null, t); + } + } + } +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarMessagingAttributesGetter.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarMessagingAttributesGetter.java new file mode 100644 index 000000000000..e0ea9b075bec --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarMessagingAttributesGetter.java @@ -0,0 +1,102 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0.telemetry; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; + +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.naming.TopicName; + +enum PulsarMessagingAttributesGetter implements MessagingAttributesGetter { + INSTANCE; + + @Override + public String getSystem(PulsarRequest request) { + return "pulsar"; + } + + @Nullable + @Override + public String getDestination(PulsarRequest request) { + return request.getDestination(); + } + + @Nullable + @Override + public String getDestinationTemplate(PulsarRequest request) { + return null; + } + + @Override + public boolean isTemporaryDestination(PulsarRequest request) { + return false; + } + + @Override + public boolean isAnonymousDestination(PulsarRequest request) { + return false; + } + + @Nullable + @Override + public String getConversationId(PulsarRequest message) { + return null; + } + + @Override + public Long getMessageBodySize(PulsarRequest request) { + return (long) request.getMessage().size(); + } + + @Nullable + @Override + public Long getMessageEnvelopeSize(PulsarRequest request) { + return null; + } + + @Nullable + @Override + public String getMessageId(PulsarRequest request, @Nullable Void response) { + Message message = request.getMessage(); + if (message.getMessageId() != null) { + return message.getMessageId().toString(); + } + + return null; + } + + @Nullable + @Override + public String getClientId(PulsarRequest request) { + return null; + } + + @Override + public Long getBatchMessageCount(PulsarRequest request, @Nullable Void unused) { + return (long) request.getProduceNumMessages(); + } + + @Nullable + @Override + public String getDestinationPartitionId(PulsarRequest request) { + int partitionIndex = TopicName.getPartitionIndex(request.getDestination()); + if (partitionIndex == -1) { + return null; + } + return String.valueOf(partitionIndex); + } + + @Override + public List getMessageHeader(PulsarRequest request, String name) { + String value = request.getMessage().getProperty(name); + return value != null ? singletonList(value) : emptyList(); + } +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarSingletons.java new file mode 100644 index 000000000000..da3dbe7a4acd --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/telemetry/PulsarSingletons.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0.telemetry; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.MessageTextMapSetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarNetClientAttributesGetter; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; + +public class PulsarSingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.pulsar-4.0"; + private static final OpenTelemetry TELEMETRY = GlobalOpenTelemetry.get(); + private static final Instrumenter PRODUCER_INSTRUMENTER = + createProducerInstrumenter(); + + public static Instrumenter producerInstrumenter() { + return PRODUCER_INSTRUMENTER; + } + + private static Instrumenter createProducerInstrumenter() { + MessagingAttributesGetter getter = + PulsarMessagingAttributesGetter.INSTANCE; + + InstrumenterBuilder builder = + Instrumenter.builder( + TELEMETRY, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, MessageOperation.PUBLISH)) + .addAttributesExtractor( + createMessagingAttributesExtractor(getter, MessageOperation.PUBLISH)) + .addAttributesExtractor( + ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) + .addOperationMetrics(MessagingProducerMetrics.get()); + + return builder.buildProducerInstrumenter(MessageTextMapSetter.INSTANCE); + } + + private static AttributesExtractor createMessagingAttributesExtractor( + MessagingAttributesGetter getter, MessageOperation operation) { + return MessagingAttributesExtractor.builder(getter, operation).build(); + } + + private PulsarSingletons() {} +} diff --git a/instrumentation/pulsar/pulsar-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarClientTest.java b/instrumentation/pulsar/pulsar-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarClientTest.java new file mode 100644 index 000000000000..d6202d6647c1 --- /dev/null +++ b/instrumentation/pulsar/pulsar-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v4_0/PulsarClientTest.java @@ -0,0 +1,152 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v4_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.time.Duration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +class PulsarClientTest { + + private static final Logger logger = LoggerFactory.getLogger(PulsarClientTest.class); + + private static final DockerImageName DEFAULT_IMAGE_NAME = + DockerImageName.parse("apachepulsar/pulsar:2.8.0"); + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static PulsarContainer pulsar; + static PulsarClient client; + static PulsarAdmin admin; + static Producer producer; + static Consumer consumer; + + static String brokerHost; + static int brokerPort; + + static final double[] DURATION_BUCKETS = + new double[] { + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 + }; + + @BeforeAll + static void beforeAll() throws PulsarClientException { + pulsar = + new PulsarContainer(DEFAULT_IMAGE_NAME) + .withEnv("PULSAR_MEM", "-Xmx128m") + .withLogConsumer(new Slf4jLogConsumer(logger)) + .withStartupTimeout(Duration.ofMinutes(2)) + .withTransactions(); + pulsar.start(); + + brokerHost = pulsar.getHost(); + brokerPort = pulsar.getMappedPort(6650); + client = + PulsarClient.builder() + .serviceUrl(pulsar.getPulsarBrokerUrl()) + .openTelemetry(OpenTelemetry.noop()) + .enableTransaction(true) + .build(); + admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build(); + } + + @AfterEach + void afterEach() throws PulsarClientException { + if (producer != null) { + producer.close(); + } + if (consumer != null) { + consumer.close(); + } + } + + @AfterAll + static void afterAll() throws PulsarClientException { + if (client != null) { + client.close(); + } + if (admin != null) { + admin.close(); + } + pulsar.close(); + } + + @Test + void testProduceBatch() throws Exception { + String topic = "persistent://public/default/testProduceBatch"; + admin.topics().createNonPartitionedTopic(topic); + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(true).create(); + + String msg = "test"; + int sendCount = 10; + for (int i = 0; i < sendCount; i++) { + producer.send(msg); + } + + assertThat(testing.metrics()) + .satisfiesExactlyInAnyOrder( + metric -> + assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS))), + metric -> + assertThat(metric) + .hasName("messaging.client.sent.messages") + .hasUnit("{message}") + .hasDescription("Number of messages producer attempted to send to the broker.") + .hasLongSumSatisfying( + sum -> { + sum.hasPointsSatisfying( + point -> { + point + .hasValue(sendCount) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)); + }); + })); + } +} diff --git a/instrumentation/pulsar/pulsar-common/javaagent/build.gradle.kts b/instrumentation/pulsar/pulsar-common/javaagent/build.gradle.kts new file mode 100644 index 000000000000..a2a38d2eb768 --- /dev/null +++ b/instrumentation/pulsar/pulsar-common/javaagent/build.gradle.kts @@ -0,0 +1,7 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +dependencies { + library("org.apache.pulsar:pulsar-client:2.8.0") +} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerData.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/ProducerData.java similarity index 85% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerData.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/ProducerData.java index c7826e1f1293..65f0b80ea633 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ProducerData.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/ProducerData.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; +package io.opentelemetry.javaagent.instrumentation.pulsar.common; public final class ProducerData { public final String url; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackData.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/SendCallbackData.java similarity index 76% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackData.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/SendCallbackData.java index 462843d70e31..21da6a76c7d4 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/SendCallbackData.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/SendCallbackData.java @@ -3,10 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; +package io.opentelemetry.javaagent.instrumentation.pulsar.common; import io.opentelemetry.context.Context; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; public final class SendCallbackData { public final Context context; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParser.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/UrlParser.java similarity index 95% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParser.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/UrlParser.java index cc04d9083f6f..bcbb09515af3 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/UrlParser.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/UrlParser.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; +package io.opentelemetry.javaagent.instrumentation.pulsar.common; public class UrlParser { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/VirtualFieldStore.java similarity index 94% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/VirtualFieldStore.java index 9ec84944feaf..fc830488680b 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/VirtualFieldStore.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/VirtualFieldStore.java @@ -3,11 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; +package io.opentelemetry.javaagent.instrumentation.pulsar.common; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarRequest; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry.PulsarRequest; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/BasePulsarRequest.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/BasePulsarRequest.java similarity index 73% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/BasePulsarRequest.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/BasePulsarRequest.java index 0d12e97019db..453684f15227 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/BasePulsarRequest.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/BasePulsarRequest.java @@ -3,9 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.UrlData; public class BasePulsarRequest { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/ExperimentalProducerAttributesExtractor.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java similarity index 90% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/ExperimentalProducerAttributesExtractor.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java index c3f4801b998f..9ab1e287b342 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/ExperimentalProducerAttributesExtractor.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/ExperimentalProducerAttributesExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributesBuilder; @@ -15,7 +15,8 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; -enum ExperimentalProducerAttributesExtractor implements AttributesExtractor { +public enum ExperimentalProducerAttributesExtractor + implements AttributesExtractor { INSTANCE; private static final AttributeKey MESSAGE_TYPE = diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageListenerContext.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageListenerContext.java similarity index 92% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageListenerContext.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageListenerContext.java index 7bcea2245431..7828502b9701 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageListenerContext.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageListenerContext.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; /** * Helper class used to determine whether message is going to be processed by a listener. If we know diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapGetter.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapGetter.java similarity index 76% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapGetter.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapGetter.java index 931dfa11e33b..bff75aaf1eb1 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapGetter.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapGetter.java @@ -3,12 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.context.propagation.TextMapGetter; import javax.annotation.Nullable; -enum MessageTextMapGetter implements TextMapGetter { +public enum MessageTextMapGetter implements TextMapGetter { INSTANCE; @Override diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapSetter.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java similarity index 79% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapSetter.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java index 8b892f02fbef..b9b3d9af6bdc 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/MessageTextMapSetter.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/MessageTextMapSetter.java @@ -3,13 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.context.propagation.TextMapSetter; import javax.annotation.Nullable; import org.apache.pulsar.client.impl.MessageImpl; -enum MessageTextMapSetter implements TextMapSetter { +public enum MessageTextMapSetter implements TextMapSetter { INSTANCE; @Override diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarNetClientAttributesGetter.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java similarity index 89% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarNetClientAttributesGetter.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java index 91391d088cac..7b00c7972a9b 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarNetClientAttributesGetter.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarNetClientAttributesGetter.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter; import javax.annotation.Nullable; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarRequest.java b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java similarity index 63% rename from instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarRequest.java rename to instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java index efb9d34f3f53..e7d5850d4b24 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarRequest.java +++ b/instrumentation/pulsar/pulsar-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/common/telemetry/PulsarRequest.java @@ -3,18 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +package io.opentelemetry.javaagent.instrumentation.pulsar.common.telemetry; -import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.parseUrl; +import static io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser.parseUrl; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.ProducerData; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.ProducerData; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.UrlParser; import org.apache.pulsar.client.api.Message; public final class PulsarRequest extends BasePulsarRequest { private final Message message; + private int produceNumMessages; - private PulsarRequest(Message message, String destination, UrlData urlData) { + private PulsarRequest(Message message, String destination, UrlParser.UrlData urlData) { super(destination, urlData); this.message = message; } @@ -27,7 +28,7 @@ public static PulsarRequest create(Message message, String url) { return new PulsarRequest(message, message.getTopicName(), parseUrl(url)); } - public static PulsarRequest create(Message message, UrlData urlData) { + public static PulsarRequest create(Message message, UrlParser.UrlData urlData) { return new PulsarRequest(message, message.getTopicName(), urlData); } @@ -38,4 +39,12 @@ public static PulsarRequest create(Message message, ProducerData producerData public Message getMessage() { return message; } + + public int getProduceNumMessages() { + return produceNumMessages; + } + + public void setProduceNumMessages(int produceNumMessages) { + this.produceNumMessages = produceNumMessages; + } } diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts b/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts index 9364d6f73d9e..e7267a746cab 100644 --- a/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/build.gradle.kts @@ -14,7 +14,7 @@ muzzle { dependencies { library("org.springframework.pulsar:spring-pulsar:1.0.0") - implementation(project(":instrumentation:pulsar:pulsar-2.8:javaagent")) + implementation(project(":instrumentation:pulsar:pulsar-common:javaagent")) testInstrumentation(project(":instrumentation:pulsar:pulsar-2.8:javaagent")) diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java index 5952cefaf5f0..e0b4d8ecbeaf 100644 --- a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/DefaultPulsarMessageListenerContainerInstrumentation.java @@ -14,7 +14,7 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore; +import io.opentelemetry.javaagent.instrumentation.pulsar.common.VirtualFieldStore; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; diff --git a/settings.gradle.kts b/settings.gradle.kts index 0bdfc02339ff..4c730d9cb47c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -457,6 +457,8 @@ include(":instrumentation:play:play-ws:play-ws-common:testing") include(":instrumentation:powerjob-4.0:javaagent") include(":instrumentation:pulsar:pulsar-2.8:javaagent") include(":instrumentation:pulsar:pulsar-2.8:javaagent-unit-tests") +include(":instrumentation:pulsar:pulsar-4.0:javaagent") +include(":instrumentation:pulsar:pulsar-common:javaagent") include(":instrumentation:quarkus-resteasy-reactive:common-testing") include(":instrumentation:quarkus-resteasy-reactive:javaagent") include(":instrumentation:quarkus-resteasy-reactive:quarkus2-testing")