diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessageOperation.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessageOperation.java index e7ef5c479ed0..378a918e6921 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessageOperation.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessageOperation.java @@ -13,8 +13,10 @@ * that may be used in a messaging system. */ public enum MessageOperation { + CREATE, PUBLISH, RECEIVE, + SEND, PROCESS; /** diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java index bdf5ab879786..4f1cff6b423e 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; +import static io.opentelemetry.api.common.AttributeKey.stringKey; import static io.opentelemetry.instrumentation.api.internal.AttributesExtractorUtil.internalSet; import io.opentelemetry.api.common.AttributeKey; @@ -28,6 +29,8 @@ public final class MessagingAttributesExtractor implements AttributesExtractor, SpanKeyProvider { // copied from MessagingIncubatingAttributes + private static final AttributeKey MESSAGING_OPERATION_NAME = + stringKey("messaging.operation.name"); private static final AttributeKey MESSAGING_BATCH_MESSAGE_COUNT = AttributeKey.longKey("messaging.batch.message_count"); private static final AttributeKey MESSAGING_CLIENT_ID = @@ -50,8 +53,8 @@ public final class MessagingAttributesExtractor AttributeKey.longKey("messaging.message.envelope.size"); private static final AttributeKey MESSAGING_MESSAGE_ID = AttributeKey.stringKey("messaging.message.id"); - private static final AttributeKey MESSAGING_OPERATION = - AttributeKey.stringKey("messaging.operation"); + private static final AttributeKey MESSAGING_OPERATION_TYPE = + stringKey("messaging.operation.type"); private static final AttributeKey MESSAGING_SYSTEM = AttributeKey.stringKey("messaging.system"); @@ -72,24 +75,37 @@ public static AttributesExtractor create( */ public static MessagingAttributesExtractorBuilder builder( MessagingAttributesGetter getter, MessageOperation operation) { - return new MessagingAttributesExtractorBuilder<>(getter, operation); + return new MessagingAttributesExtractorBuilder<>(getter, operation, ""); + } + + public static MessagingAttributesExtractorBuilder builder( + MessagingAttributesGetter getter, + MessageOperation operation, + String operationName) { + return new MessagingAttributesExtractorBuilder<>(getter, operation, operationName); } private final MessagingAttributesGetter getter; private final MessageOperation operation; private final List capturedHeaders; + private final String operationName; MessagingAttributesExtractor( MessagingAttributesGetter getter, MessageOperation operation, - List capturedHeaders) { + List capturedHeaders, + String operationName) { this.getter = getter; this.operation = operation; this.capturedHeaders = CapturedMessageHeadersUtil.lowercase(capturedHeaders); + this.operationName = operationName; } @Override public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) { + if (operationName != null) { + internalSet(attributes, MESSAGING_OPERATION_NAME, operationName); + } internalSet(attributes, MESSAGING_SYSTEM, getter.getSystem(request)); boolean isTemporaryDestination = getter.isTemporaryDestination(request); if (isTemporaryDestination) { @@ -112,7 +128,7 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST attributes, MESSAGING_MESSAGE_ENVELOPE_SIZE, getter.getMessageEnvelopeSize(request)); internalSet(attributes, MESSAGING_CLIENT_ID, getter.getClientId(request)); if (operation != null) { - internalSet(attributes, MESSAGING_OPERATION, operation.operationName()); + internalSet(attributes, MESSAGING_OPERATION_TYPE, operation.operationName()); } } @@ -147,6 +163,8 @@ public SpanKey internalGetSpanKey() { switch (operation) { case PUBLISH: + case CREATE: + case SEND: return SpanKey.PRODUCER; case RECEIVE: return SpanKey.CONSUMER_RECEIVE; diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractorBuilder.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractorBuilder.java index 769de0862362..765c9fe33755 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractorBuilder.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractorBuilder.java @@ -19,11 +19,15 @@ public final class MessagingAttributesExtractorBuilder { final MessagingAttributesGetter getter; final MessageOperation operation; List capturedHeaders = emptyList(); + final String operationName; MessagingAttributesExtractorBuilder( - MessagingAttributesGetter getter, MessageOperation operation) { + MessagingAttributesGetter getter, + MessageOperation operation, + String operationName) { this.getter = getter; this.operation = operation; + this.operationName = operationName; } /** @@ -47,6 +51,6 @@ public MessagingAttributesExtractorBuilder setCapturedHeaders * MessagingAttributesExtractorBuilder}. */ public AttributesExtractor build() { - return new MessagingAttributesExtractor<>(getter, operation, capturedHeaders); + return new MessagingAttributesExtractor<>(getter, operation, capturedHeaders, operationName); } } diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingConsumerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingConsumerMetrics.java index fec179dc8da8..c84bbe0b9fc6 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingConsumerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingConsumerMetrics.java @@ -45,8 +45,9 @@ public final class MessagingConsumerMetrics implements OperationListener { private MessagingConsumerMetrics(Meter meter) { DoubleHistogramBuilder durationBuilder = meter - .histogramBuilder("messaging.receive.duration") - .setDescription("Measures the duration of receive operation.") + .histogramBuilder("messaging.client.operation.duration") + .setDescription( + "Duration of messaging operation initiated by a producer or consumer client.") .setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS) .setUnit("s"); MessagingMetricsAdvice.applyReceiveDurationAdvice(durationBuilder); @@ -54,8 +55,8 @@ private MessagingConsumerMetrics(Meter meter) { LongCounterBuilder longCounterBuilder = meter - .counterBuilder("messaging.receive.messages") - .setDescription("Measures the number of received messages.") + .counterBuilder("messaging.client.consumed.messages") + .setDescription("Number of messages that were delivered to the application.") .setUnit("{message}"); MessagingMetricsAdvice.applyReceiveMessagesAdvice(longCounterBuilder); receiveMessageCount = longCounterBuilder.build(); diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index f87c814d2a31..c6a1c0d36b4a 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; +import static io.opentelemetry.api.common.AttributeKey.stringKey; import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; @@ -23,12 +24,14 @@ final class MessagingMetricsAdvice { asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); // copied from MessagingIncubatingAttributes + private static final AttributeKey MESSAGING_OPERATION_NAME = + stringKey("messaging.operation.name"); private static final AttributeKey MESSAGING_SYSTEM = AttributeKey.stringKey("messaging.system"); private static final AttributeKey MESSAGING_DESTINATION_NAME = AttributeKey.stringKey("messaging.destination.name"); - private static final AttributeKey MESSAGING_OPERATION = - AttributeKey.stringKey("messaging.operation"); + private static final AttributeKey MESSAGING_OPERATION_TYPE = + stringKey("messaging.operation.type"); private static final AttributeKey MESSAGING_DESTINATION_PARTITION_ID = AttributeKey.stringKey("messaging.destination.partition.id"); private static final AttributeKey MESSAGING_DESTINATION_TEMPLATE = @@ -36,9 +39,10 @@ final class MessagingMetricsAdvice { private static final List> MESSAGING_ATTRIBUTES = asList( + MESSAGING_OPERATION_NAME, MESSAGING_SYSTEM, MESSAGING_DESTINATION_NAME, - MESSAGING_OPERATION, + MESSAGING_OPERATION_TYPE, MESSAGING_DESTINATION_PARTITION_ID, MESSAGING_DESTINATION_TEMPLATE, ErrorAttributes.ERROR_TYPE, diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 44d5b243744a..b23cffc50d4e 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -38,8 +38,9 @@ public final class MessagingProducerMetrics implements OperationListener { private MessagingProducerMetrics(Meter meter) { DoubleHistogramBuilder durationBuilder = meter - .histogramBuilder("messaging.publish.duration") - .setDescription("Measures the duration of publish operation.") + .histogramBuilder("messaging.client.operation.duration") + .setDescription( + "Duration of messaging operation initiated by a producer or consumer client.") .setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS) .setUnit("s"); MessagingMetricsAdvice.applyPublishDurationAdvice(durationBuilder); diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java index f2f199718e82..6d75b4797bd7 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -77,7 +77,7 @@ void collectsMetrics() { .satisfiesExactlyInAnyOrder( metric -> assertThat(metric) - .hasName("messaging.publish.duration") + .hasName("messaging.client.operation.duration") .hasUnit("s") .hasDescription("Measures the duration of publish operation.") .hasHistogramSatisfying( @@ -113,7 +113,7 @@ void collectsMetrics() { .satisfiesExactlyInAnyOrder( metric -> assertThat(metric) - .hasName("messaging.publish.duration") + .hasName("messaging.client.operation.duration") .hasHistogramSatisfying( histogram -> histogram.hasPointsSatisfying( diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index c26022efea02..ba16ac96e256 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -76,7 +76,7 @@ private static Instrumenter createConsumerReceiveInstrument INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE)) .addAttributesExtractor( - createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE)) + createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE, "receive")) .addOperationMetrics(MessagingConsumerMetrics.get()) .addAttributesExtractor( ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())); @@ -99,7 +99,7 @@ private static Instrumenter createConsumerBatchReceive INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE)) .addAttributesExtractor( - createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE)) + createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE, "receive")) .addAttributesExtractor( ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) .addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(PROPAGATOR)) @@ -117,7 +117,7 @@ private static Instrumenter createConsumerProcessInstrument INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS)) .addAttributesExtractor( - createMessagingAttributesExtractor(getter, MessageOperation.PROCESS)); + createMessagingAttributesExtractor(getter, MessageOperation.PROCESS, "process")); if (receiveInstrumentationEnabled) { SpanLinksExtractor spanLinksExtractor = @@ -136,9 +136,9 @@ private static Instrumenter createProducerInstrumenter() { Instrumenter.builder( TELEMETRY, INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create(getter, MessageOperation.PUBLISH)) + MessagingSpanNameExtractor.create(getter, MessageOperation.SEND)) .addAttributesExtractor( - createMessagingAttributesExtractor(getter, MessageOperation.PUBLISH)) + createMessagingAttributesExtractor(getter, MessageOperation.SEND, "send")) .addAttributesExtractor( ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) .addOperationMetrics(MessagingProducerMetrics.get()); @@ -152,8 +152,8 @@ private static Instrumenter createProducerInstrumenter() { } private static AttributesExtractor createMessagingAttributesExtractor( - MessagingAttributesGetter getter, MessageOperation operation) { - return MessagingAttributesExtractor.builder(getter, operation) + MessagingAttributesGetter getter, MessageOperation operation, String operationName) { + return MessagingAttributesExtractor.builder(getter, operation, operationName) .setCapturedHeaders(capturedHeaders) .build(); } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index 993e53536f8b..f90c75283a29 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -15,7 +15,8 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; -import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import io.opentelemetry.api.common.AttributeKey; @@ -157,7 +158,7 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic + " publish") + span.hasName(topic + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -179,9 +180,10 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { .satisfiesExactlyInAnyOrder( metric -> assertThat(metric) - .hasName("messaging.receive.duration") + .hasName("messaging.client.operation.duration") .hasUnit("s") - .hasDescription("Measures the duration of receive operation.") + .hasDescription( + "Duration of messaging operation initiated by a producer or consumer client.") .hasHistogramSatisfying( histogram -> histogram.hasPointsSatisfying( @@ -190,32 +192,28 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { .hasSumGreaterThan(0.0) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)) - .hasBucketBoundaries(DURATION_BUCKETS))), - metric -> - assertThat(metric) - .hasName("messaging.publish.duration") - .hasUnit("s") - .hasDescription("Measures the duration of publish operation.") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( + .hasBucketBoundaries(DURATION_BUCKETS), point -> point .hasSumGreaterThan(0.0) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "send"), + equalTo(MESSAGING_OPERATION_TYPE, "send"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)) .hasBucketBoundaries(DURATION_BUCKETS))), metric -> assertThat(metric) - .hasName("messaging.receive.messages") + .hasName("messaging.client.consumed.messages") .hasUnit("{message}") - .hasDescription("Measures the number of received messages.") + .hasDescription("Number of messages that were delivered to the application.") .hasLongSumSatisfying( sum -> sum.hasPointsSatisfying( @@ -224,6 +222,8 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { .hasValue(1) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost))))); @@ -269,7 +269,7 @@ void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic + " publish") + span.hasName(topic + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -295,9 +295,10 @@ void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception { .satisfiesExactlyInAnyOrder( metric -> assertThat(metric) - .hasName("messaging.receive.duration") + .hasName("messaging.client.operation.duration") .hasUnit("s") - .hasDescription("Measures the duration of receive operation.") + .hasDescription( + "Duration of messaging operation initiated by a producer or consumer client.") .hasHistogramSatisfying( histogram -> histogram.hasPointsSatisfying( @@ -306,32 +307,28 @@ void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception { .hasSumGreaterThan(0.0) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)) - .hasBucketBoundaries(DURATION_BUCKETS))), - metric -> - assertThat(metric) - .hasName("messaging.publish.duration") - .hasUnit("s") - .hasDescription("Measures the duration of publish operation.") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( + .hasBucketBoundaries(DURATION_BUCKETS), point -> point .hasSumGreaterThan(0.0) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "send"), + equalTo(MESSAGING_OPERATION_TYPE, "send"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)) .hasBucketBoundaries(DURATION_BUCKETS))), metric -> assertThat(metric) - .hasName("messaging.receive.messages") + .hasName("messaging.client.consumed.messages") .hasUnit("{message}") - .hasDescription("Measures the number of received messages.") + .hasDescription("Number of messages that were delivered to the application.") .hasLongSumSatisfying( sum -> sum.hasPointsSatisfying( @@ -340,6 +337,8 @@ void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception { .hasValue(1) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost))))); @@ -352,10 +351,11 @@ static List sendAttributes( new ArrayList<>( Arrays.asList( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "send"), equalTo(SERVER_ADDRESS, brokerHost), equalTo(SERVER_PORT, brokerPort), equalTo(MESSAGING_DESTINATION_NAME, destination), - equalTo(MESSAGING_OPERATION, "publish"), + equalTo(MESSAGING_OPERATION_TYPE, "send"), equalTo(MESSAGING_MESSAGE_ID, messageId), satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), equalTo(MESSAGE_TYPE, "normal"))); @@ -390,10 +390,11 @@ static List receiveAttributes( new ArrayList<>( Arrays.asList( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), equalTo(SERVER_ADDRESS, brokerHost), equalTo(SERVER_PORT, brokerPort), equalTo(MESSAGING_DESTINATION_NAME, destination), - equalTo(MESSAGING_OPERATION, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), equalTo(MESSAGING_MESSAGE_ID, messageId), satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative))); if (testHeaders) { @@ -419,8 +420,9 @@ static List processAttributes( new ArrayList<>( Arrays.asList( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "process"), equalTo(MESSAGING_DESTINATION_NAME, destination), - equalTo(MESSAGING_OPERATION, "process"), + equalTo(MESSAGING_OPERATION_TYPE, "process"), equalTo(MESSAGING_MESSAGE_ID, messageId), satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative))); if (testHeaders) { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index a2c4c94b69bc..0fca7a073a7d 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -12,6 +12,8 @@ import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import io.opentelemetry.api.trace.SpanKind; @@ -65,7 +67,7 @@ void testConsumeNonPartitionedTopic() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic + " publish") + span.hasName(topic + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -117,7 +119,7 @@ void testConsumeNonPartitionedTopicUsingReceive() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic + " publish") + span.hasName(topic + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -139,9 +141,10 @@ void testConsumeNonPartitionedTopicUsingReceive() throws Exception { .satisfiesExactlyInAnyOrder( metric -> assertThat(metric) - .hasName("messaging.receive.duration") + .hasName("messaging.client.operation.duration") .hasUnit("s") - .hasDescription("Measures the duration of receive operation.") + .hasDescription( + "Duration of messaging operation initiated by a producer or consumer client.") .hasHistogramSatisfying( histogram -> histogram.hasPointsSatisfying( @@ -150,15 +153,28 @@ void testConsumeNonPartitionedTopicUsingReceive() throws Exception { .hasSumGreaterThan(0.0) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS), + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "send"), + equalTo(MESSAGING_OPERATION_TYPE, "send"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)) .hasBucketBoundaries(DURATION_BUCKETS))), metric -> assertThat(metric) - .hasName("messaging.receive.messages") + .hasName("messaging.client.consumed.messages") .hasUnit("{message}") - .hasDescription("Measures the number of received messages.") + .hasDescription("Number of messages that were delivered to the application.") .hasLongSumSatisfying( sum -> { sum.hasPointsSatisfying( @@ -167,28 +183,13 @@ void testConsumeNonPartitionedTopicUsingReceive() throws Exception { .hasValue(1) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)); }); - }), - metric -> - assertThat(metric) - .hasName("messaging.publish.duration") - .hasUnit("s") - .hasDescription("Measures the duration of publish operation.") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasSumGreaterThan(0.0) - .hasAttributesSatisfying( - equalTo(MESSAGING_SYSTEM, "pulsar"), - equalTo(MESSAGING_DESTINATION_NAME, topic), - equalTo(SERVER_PORT, brokerPort), - equalTo(SERVER_ADDRESS, brokerHost)) - .hasBucketBoundaries(DURATION_BUCKETS)))); + })); } @Test @@ -227,7 +228,7 @@ void testConsumeNonPartitionedTopicUsingReceiveAsync() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic + " publish") + span.hasName(topic + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -253,9 +254,10 @@ void testConsumeNonPartitionedTopicUsingReceiveAsync() throws Exception { .satisfiesExactlyInAnyOrder( metric -> assertThat(metric) - .hasName("messaging.receive.duration") + .hasName("messaging.client.operation.duration") .hasUnit("s") - .hasDescription("Measures the duration of receive operation.") + .hasDescription( + "Duration of messaging operation initiated by a producer or consumer client.") .hasHistogramSatisfying( histogram -> histogram.hasPointsSatisfying( @@ -264,15 +266,28 @@ void testConsumeNonPartitionedTopicUsingReceiveAsync() throws Exception { .hasSumGreaterThan(0.0) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS), + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "send"), + equalTo(MESSAGING_OPERATION_TYPE, "send"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)) .hasBucketBoundaries(DURATION_BUCKETS))), metric -> assertThat(metric) - .hasName("messaging.receive.messages") + .hasName("messaging.client.consumed.messages") .hasUnit("{message}") - .hasDescription("Measures the number of received messages.") + .hasDescription("Number of messages that were delivered to the application.") .hasLongSumSatisfying( sum -> { sum.hasPointsSatisfying( @@ -281,28 +296,13 @@ void testConsumeNonPartitionedTopicUsingReceiveAsync() throws Exception { .hasValue(1) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)); }); - }), - metric -> - assertThat(metric) - .hasName("messaging.publish.duration") - .hasUnit("s") - .hasDescription("Measures the duration of publish operation.") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasSumGreaterThan(0.0) - .hasAttributesSatisfying( - equalTo(MESSAGING_SYSTEM, "pulsar"), - equalTo(MESSAGING_DESTINATION_NAME, topic), - equalTo(SERVER_PORT, brokerPort), - equalTo(SERVER_ADDRESS, brokerHost)) - .hasBucketBoundaries(DURATION_BUCKETS)))); + })); } @Test @@ -332,7 +332,7 @@ void testConsumeNonPartitionedTopicUsingReceiveWithTimeout() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic + " publish") + span.hasName(topic + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -354,9 +354,10 @@ void testConsumeNonPartitionedTopicUsingReceiveWithTimeout() throws Exception { .satisfiesExactlyInAnyOrder( metric -> assertThat(metric) - .hasName("messaging.receive.duration") + .hasName("messaging.client.operation.duration") .hasUnit("s") - .hasDescription("Measures the duration of receive operation.") + .hasDescription( + "Duration of messaging operation initiated by a producer or consumer client.") .hasHistogramSatisfying( histogram -> histogram.hasPointsSatisfying( @@ -365,15 +366,28 @@ void testConsumeNonPartitionedTopicUsingReceiveWithTimeout() throws Exception { .hasSumGreaterThan(0.0) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS), + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "send"), + equalTo(MESSAGING_OPERATION_TYPE, "send"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)) .hasBucketBoundaries(DURATION_BUCKETS))), metric -> assertThat(metric) - .hasName("messaging.receive.messages") + .hasName("messaging.client.consumed.messages") .hasUnit("{message}") - .hasDescription("Measures the number of received messages.") + .hasDescription("Number of messages that were delivered to the application.") .hasLongSumSatisfying( sum -> { sum.hasPointsSatisfying( @@ -382,28 +396,13 @@ void testConsumeNonPartitionedTopicUsingReceiveWithTimeout() throws Exception { .hasValue(1) .hasAttributesSatisfying( equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)); }); - }), - metric -> - assertThat(metric) - .hasName("messaging.publish.duration") - .hasUnit("s") - .hasDescription("Measures the duration of publish operation.") - .hasHistogramSatisfying( - histogram -> - histogram.hasPointsSatisfying( - point -> - point - .hasSumGreaterThan(0.0) - .hasAttributesSatisfying( - equalTo(MESSAGING_SYSTEM, "pulsar"), - equalTo(MESSAGING_DESTINATION_NAME, topic), - equalTo(SERVER_PORT, brokerPort), - equalTo(SERVER_ADDRESS, brokerHost)) - .hasBucketBoundaries(DURATION_BUCKETS)))); + })); } @Test @@ -442,7 +441,7 @@ void captureMessageHeaderAsSpanAttribute() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic + " publish") + span.hasName(topic + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -502,7 +501,7 @@ void testConsumePartitionedTopic() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic + "-partition-0 publish") + span.hasName(topic + "-partition-0 send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -564,7 +563,7 @@ void testConsumeMultiTopics() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic1 + " publish") + span.hasName(topic1 + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -592,7 +591,7 @@ void testConsumeMultiTopics() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic2 + " publish") + span.hasName(topic2 + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( @@ -644,9 +643,9 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception { .satisfiesOnlyOnce( metric -> assertThat(metric) - .hasName("messaging.receive.messages") + .hasName("messaging.client.consumed.messages") .hasUnit("{message}") - .hasDescription("Measures the number of received messages.") + .hasDescription("Number of messages that were delivered to the application.") .hasLongSumSatisfying( sum -> { sum.satisfies( @@ -694,7 +693,7 @@ void testSendMessageWithTxn() throws Exception { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> - span.hasName(topic + " publish") + span.hasName(topic + " send") .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)))); } diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java index fc9c454dcda2..90bc9bbd9b5e 100644 --- a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java @@ -34,7 +34,7 @@ public final class SpringPulsarSingletons { INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor( - MessagingAttributesExtractor.builder(getter, operation) + MessagingAttributesExtractor.builder(getter, operation, "process") .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .build()); if (messagingReceiveInstrumentationEnabled) { diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarTest.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarTest.java index 2fba60091be1..4574e9d80781 100644 --- a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarTest.java +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarTest.java @@ -27,7 +27,7 @@ protected void assertSpringPulsar() { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasNoParent(), span -> { - span.hasName(OTEL_TOPIC + " publish") + span.hasName(OTEL_TOPIC + " send") .hasKind(PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly(publishAttributes()); diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/testReceiveSpansDisabled/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSuppressReceiveSpansTest.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/testReceiveSpansDisabled/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSuppressReceiveSpansTest.java index f4abc0b566c6..6d2cddcfff4b 100644 --- a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/testReceiveSpansDisabled/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSuppressReceiveSpansTest.java +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/testReceiveSpansDisabled/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSuppressReceiveSpansTest.java @@ -19,7 +19,7 @@ protected void assertSpringPulsar() { trace.hasSpansSatisfyingExactly( span -> span.hasName("parent").hasNoParent(), span -> - span.hasName(OTEL_TOPIC + " publish") + span.hasName(OTEL_TOPIC + " send") .hasKind(PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly(publishAttributes()), diff --git a/instrumentation/spring/spring-pulsar-1.0/testing/src/main/java/io/opentelemetry/instrumentation/spring/pulsar/v1_0/AbstractSpringPulsarTest.java b/instrumentation/spring/spring-pulsar-1.0/testing/src/main/java/io/opentelemetry/instrumentation/spring/pulsar/v1_0/AbstractSpringPulsarTest.java index 336a722fa030..7db77c750bec 100644 --- a/instrumentation/spring/spring-pulsar-1.0/testing/src/main/java/io/opentelemetry/instrumentation/spring/pulsar/v1_0/AbstractSpringPulsarTest.java +++ b/instrumentation/spring/spring-pulsar-1.0/testing/src/main/java/io/opentelemetry/instrumentation/spring/pulsar/v1_0/AbstractSpringPulsarTest.java @@ -13,7 +13,8 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; -import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import static java.util.Arrays.asList; @@ -115,7 +116,8 @@ static void teardown() { protected List publishAttributes() { return asList( equalTo(MESSAGING_SYSTEM, "pulsar"), - equalTo(MESSAGING_OPERATION, "publish"), + equalTo(MESSAGING_OPERATION_NAME, "send"), + equalTo(MESSAGING_OPERATION_TYPE, "send"), equalTo(MESSAGING_DESTINATION_NAME, OTEL_TOPIC), satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotEmpty), @@ -127,7 +129,8 @@ protected List publishAttributes() { protected List processAttributes() { return asList( equalTo(MESSAGING_SYSTEM, "pulsar"), - equalTo(MESSAGING_OPERATION, "process"), + equalTo(MESSAGING_OPERATION_NAME, "process"), + equalTo(MESSAGING_OPERATION_TYPE, "process"), satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotEmpty), equalTo(MESSAGING_DESTINATION_NAME, OTEL_TOPIC)); @@ -136,7 +139,8 @@ protected List processAttributes() { protected List receiveAttributes() { return asList( equalTo(MESSAGING_SYSTEM, "pulsar"), - equalTo(MESSAGING_OPERATION, "receive"), + equalTo(MESSAGING_OPERATION_NAME, "receive"), + equalTo(MESSAGING_OPERATION_TYPE, "receive"), equalTo(MESSAGING_DESTINATION_NAME, OTEL_TOPIC), satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative), satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isNotNegative),