2727import io .opentelemetry .api .trace .StatusCode ;
2828import io .opentelemetry .api .trace .Tracer ;
2929import io .opentelemetry .context .Context ;
30- import io .opentelemetry .semconv .trace .attributes .SemanticAttributes ;
3130import java .util .List ;
3231
3332public class OpenTelemetryPubsubTracer {
@@ -40,6 +39,13 @@ public class OpenTelemetryPubsubTracer {
4039 "subscriber concurrency control" ;
4140 private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler" ;
4241
42+ private static final String MESSAGING_SYSTEM_ATTR_KEY = "messaging.system" ;
43+ private static final String MESSAGING_DESTINATION_NAME_ATTR_KEY = "messaging.destination.name" ;
44+ private static final String CODE_FUNCTION_ATTR_KEY = "code.function" ;
45+ private static final String MESSAGING_OPERATION_ATTR_KEY = "messaging.operation" ;
46+ private static final String MESSAGING_BATCH_MESSAGE_COUNT_ATTR_KEY =
47+ "messaging.batch.message_count" ;
48+ private static final String MESSAGING_MESSAGE_ID_ATTR_KEY = "messaging.message.id" ;
4349 private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size" ;
4450 private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key" ;
4551 private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id" ;
@@ -66,12 +72,12 @@ private static final AttributesBuilder createCommonSpanAttributesBuilder(
6672 String destinationName , String projectName , String codeFunction , String operation ) {
6773 AttributesBuilder attributesBuilder =
6874 Attributes .builder ()
69- .put (SemanticAttributes . MESSAGING_SYSTEM , MESSAGING_SYSTEM_VALUE )
70- .put (SemanticAttributes . MESSAGING_DESTINATION_NAME , destinationName )
75+ .put (MESSAGING_SYSTEM_ATTR_KEY , MESSAGING_SYSTEM_VALUE )
76+ .put (MESSAGING_DESTINATION_NAME_ATTR_KEY , destinationName )
7177 .put (PROJECT_ATTR_KEY , projectName )
72- .put (SemanticAttributes . CODE_FUNCTION , codeFunction );
78+ .put (CODE_FUNCTION_ATTR_KEY , codeFunction );
7379 if (operation != null ) {
74- attributesBuilder .put (SemanticAttributes . MESSAGING_OPERATION , operation );
80+ attributesBuilder .put (MESSAGING_OPERATION_ATTR_KEY , operation );
7581 }
7682
7783 return attributesBuilder ;
@@ -179,15 +185,15 @@ Span startPublishRpcSpan(TopicName topicName, List<PubsubMessageWrapper> message
179185 Attributes attributes =
180186 createCommonSpanAttributesBuilder (
181187 topicName .getTopic (), topicName .getProject (), "publishCall" , "publish" )
182- .put (SemanticAttributes . MESSAGING_BATCH_MESSAGE_COUNT , messages .size ())
188+ .put (MESSAGING_BATCH_MESSAGE_COUNT_ATTR_KEY , messages .size ())
183189 .build ();
184190 SpanBuilder publishRpcSpanBuilder =
185191 tracer
186192 .spanBuilder (topicName .getTopic () + PUBLISH_RPC_SPAN_SUFFIX )
187193 .setSpanKind (SpanKind .CLIENT )
188194 .setAllAttributes (attributes );
189195 Attributes linkAttributes =
190- Attributes .builder ().put (SemanticAttributes . MESSAGING_OPERATION , "publish" ).build ();
196+ Attributes .builder ().put (MESSAGING_OPERATION_ATTR_KEY , "publish" ).build ();
191197 for (PubsubMessageWrapper message : messages ) {
192198 if (message .getPublisherSpan ().getSpanContext ().isSampled ())
193199 publishRpcSpanBuilder .addLink (message .getPublisherSpan ().getSpanContext (), linkAttributes );
@@ -237,7 +243,7 @@ void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDelive
237243 message .getSubscriptionName (), message .getSubscriptionProject (), "onResponse" , null );
238244
239245 attributesBuilder
240- .put (SemanticAttributes . MESSAGING_MESSAGE_ID , message .getMessageId ())
246+ .put (MESSAGING_MESSAGE_ID_ATTR_KEY , message .getMessageId ())
241247 .put (MESSAGE_SIZE_ATTR_KEY , message .getDataSize ())
242248 .put (MESSAGE_ACK_ID_ATTR_KEY , message .getAckId ())
243249 .put (MESSAGE_EXACTLY_ONCE_ATTR_KEY , exactlyOnceDeliveryEnabled );
@@ -336,8 +342,7 @@ void startSubscribeProcessSpan(PubsubMessageWrapper message) {
336342 if (subscriberSpan != null ) {
337343 Span subscribeProcessSpan =
338344 startChildSpan (message .getSubscriptionName () + " process" , subscriberSpan );
339- subscribeProcessSpan .setAttribute (
340- SemanticAttributes .MESSAGING_SYSTEM , MESSAGING_SYSTEM_VALUE );
345+ subscribeProcessSpan .setAttribute (MESSAGING_SYSTEM_ATTR_KEY , MESSAGING_SYSTEM_VALUE );
341346 Span publisherSpan = message .getPublisherSpan ();
342347 if (publisherSpan != null ) {
343348 subscribeProcessSpan .addLink (publisherSpan .getSpanContext ());
@@ -373,7 +378,7 @@ Span startSubscribeRpcSpan(
373378 subscriptionName .getProject (),
374379 codeFunction ,
375380 rpcOperation )
376- .put (SemanticAttributes . MESSAGING_BATCH_MESSAGE_COUNT , messages .size ());
381+ .put (MESSAGING_BATCH_MESSAGE_COUNT_ATTR_KEY , messages .size ());
377382
378383 // Ack deadline and receipt modack are specific to the modack operation
379384 if (rpcOperation == "modack" ) {
@@ -388,7 +393,7 @@ Span startSubscribeRpcSpan(
388393 .setSpanKind (SpanKind .CLIENT )
389394 .setAllAttributes (attributesBuilder .build ());
390395 Attributes linkAttributes =
391- Attributes .builder ().put (SemanticAttributes . MESSAGING_OPERATION , rpcOperation ).build ();
396+ Attributes .builder ().put (MESSAGING_OPERATION_ATTR_KEY , rpcOperation ).build ();
392397 for (PubsubMessageWrapper message : messages ) {
393398 if (message .getSubscriberSpan ().getSpanContext ().isSampled ()) {
394399 rpcSpanBuilder .addLink (message .getSubscriberSpan ().getSpanContext (), linkAttributes );
0 commit comments