@@ -37,11 +37,31 @@ class_alias(
3737}
3838
3939/**
40- * The messenger instrumentation will create spans for message operations in Symfony's Messenger system.
41- * It supports distributed tracing and provides rich metadata about message processing.
40+ * The messenger instrumentation creates spans for message operations in Symfony's Messenger system.
41+ *
42+ * This implementation follows the OpenTelemetry messaging semantic conventions:
43+ * @see https://opentelemetry.io/docs/specs/semconv/messaging/
44+ *
45+ * Key features:
46+ * - Context propagation between producers and consumers
47+ * - Span naming following the "{operation} {destination}" convention
48+ * - Proper span kind assignment based on operation type
49+ * - Standard messaging attributes
50+ * - Support for distributed tracing across message boundaries
4251 */
4352final class MessengerInstrumentation
4453{
54+ // Standard messaging operation types as defined in the spec
55+ private const OPERATION_TYPE_CREATE = 'create ' ;
56+ private const OPERATION_TYPE_SEND = 'send ' ;
57+ private const OPERATION_TYPE_RECEIVE = 'receive ' ;
58+ private const OPERATION_TYPE_PROCESS = 'process ' ;
59+ private const OPERATION_TYPE_SETTLE = 'settle ' ;
60+
61+ // Symfony-specific operation types
62+ private const OPERATION_TYPE_MIDDLEWARE = 'middleware ' ;
63+
64+ // Attribute constants
4565 const ATTRIBUTE_MESSAGING_MESSAGE = 'messaging.message ' ;
4666 const ATTRIBUTE_MESSAGING_BUS = 'messaging.symfony.bus ' ;
4767 const ATTRIBUTE_MESSAGING_HANDLER = 'messaging.symfony.handler ' ;
@@ -57,9 +77,17 @@ final class MessengerInstrumentation
5777 const ATTRIBUTE_MESSENGER_BUS = self ::ATTRIBUTE_MESSAGING_BUS ;
5878 const ATTRIBUTE_MESSENGER_MESSAGE = self ::ATTRIBUTE_MESSAGING_MESSAGE ;
5979
60- /** @psalm-suppress PossiblyUnusedMethod */
80+ /**
81+ * Registers the instrumentation hooks for Symfony Messenger.
82+ *
83+ * @psalm-suppress PossiblyUnusedMethod
84+ */
6185 public static function register (): void
6286 {
87+ // Check if we should use the stable messaging conventions
88+ $ useStableConventions = self ::shouldUseStableConventions ();
89+ $ emitDuplicateConventions = self ::shouldEmitDuplicateConventions ();
90+
6391 $ instrumentation = new CachedInstrumentation (
6492 'io.opentelemetry.contrib.php.symfony_messenger ' ,
6593 null ,
@@ -84,19 +112,26 @@ public static function register(): void
84112 $ message = $ params [0 ];
85113 $ messageClass = \get_class ($ message );
86114
115+ // Get destination name if available
116+ $ destinationName = $ message instanceof Envelope
117+ ? self ::getDestinationName ($ message )
118+ : $ class ;
119+
120+ // Create span with proper naming convention
87121 /** @psalm-suppress ArgumentTypeCoercion */
88122 $ builder = $ instrumentation
89123 ->tracer ()
90- ->spanBuilder (\sprintf ('DISPATCH %s ' , $ messageClass ))
124+ ->spanBuilder (\sprintf ('%s %s ' , self :: OPERATION_TYPE_CREATE , $ destinationName ))
91125 ->setSpanKind (SpanKind::KIND_PRODUCER )
92126 ->setAttribute (TraceAttributes::CODE_FUNCTION_NAME , $ function )
93127 ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
94128 ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
95129 ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
96130 ->setAttribute (TraceAttributes::MESSAGING_SYSTEM , 'symfony ' )
97- ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , ' dispatch ' )
131+ ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , self :: OPERATION_TYPE_CREATE )
98132 ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
99133 ->setAttribute (self ::ATTRIBUTE_MESSAGING_BUS , $ class )
134+ ->setAttribute (TraceAttributes::MESSAGING_DESTINATION_NAME , $ destinationName )
100135 ;
101136
102137 if ($ message instanceof Envelope) {
@@ -159,22 +194,25 @@ public static function register(): void
159194 $ currentContext = Context::getCurrent ();
160195 $ headers = [];
161196 $ propagator ->inject ($ headers , EnvelopeContextPropagator::getInstance (), $ currentContext );
162-
197+
163198 $ envelope = EnvelopeContextPropagator::getInstance ()->injectContextIntoEnvelope ($ envelope , $ headers );
164199
200+ // Get destination name
201+ $ destinationName = self ::getDestinationName ($ envelope ) ?: $ class ;
202+
165203 /** @psalm-suppress ArgumentTypeCoercion */
166204 $ builder = $ instrumentation
167205 ->tracer ()
168- ->spanBuilder (\sprintf ('SEND %s ' , $ messageClass ))
206+ ->spanBuilder (\sprintf ('%s %s ' , self :: OPERATION_TYPE_SEND , $ destinationName ))
169207 ->setSpanKind (SpanKind::KIND_PRODUCER )
170208 ->setAttribute (TraceAttributes::CODE_FUNCTION_NAME , $ function )
171209 ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
172210 ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
173211 ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
174212 ->setAttribute (TraceAttributes::MESSAGING_SYSTEM , 'symfony ' )
175- ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , ' send ' )
213+ ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , self :: OPERATION_TYPE_SEND )
176214 ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
177- ->setAttribute (TraceAttributes::MESSAGING_DESTINATION_NAME , $ class )
215+ ->setAttribute (TraceAttributes::MESSAGING_DESTINATION_NAME , $ destinationName )
178216 ;
179217
180218 self ::addMessageStampsToSpan ($ builder , $ envelope );
@@ -245,18 +283,21 @@ public static function register(): void
245283 $ propagator ->extract ($ extractedContext , EnvelopeContextPropagator::getInstance ());
246284 }
247285
286+ $ messageClass = \get_class ($ envelope ->getMessage ());
287+
248288 /** @psalm-suppress ArgumentTypeCoercion */
249289 $ builder = $ instrumentation
250290 ->tracer ()
251- ->spanBuilder (\sprintf ('CONSUME %s ' , \get_class ( $ envelope -> getMessage ()) ))
291+ ->spanBuilder (\sprintf ('%s %s ' , self :: OPERATION_TYPE_RECEIVE , $ transportName ))
252292 ->setSpanKind (SpanKind::KIND_CONSUMER )
253293 ->setAttribute (TraceAttributes::CODE_FUNCTION_NAME , $ function )
254294 ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
255295 ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
256296 ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
257297 ->setAttribute (TraceAttributes::MESSAGING_SYSTEM , 'symfony ' )
258- ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , ' receive ' )
298+ ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , self :: OPERATION_TYPE_RECEIVE )
259299 ->setAttribute (TraceAttributes::MESSAGING_DESTINATION_NAME , $ transportName )
300+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
260301 ;
261302
262303 self ::addMessageStampsToSpan ($ builder , $ envelope );
@@ -312,18 +353,22 @@ public static function register(): void
312353 $ envelope = $ params [0 ];
313354 $ messageClass = \get_class ($ envelope ->getMessage ());
314355
356+ // Get destination name
357+ $ destinationName = self ::getDestinationName ($ envelope ) ?: $ messageClass ;
358+
315359 /** @psalm-suppress ArgumentTypeCoercion */
316360 $ builder = $ instrumentation
317361 ->tracer ()
318- ->spanBuilder (\sprintf ('HANDLE %s ' , $ messageClass ))
319- ->setSpanKind (SpanKind::KIND_INTERNAL )
362+ ->spanBuilder (\sprintf ('%s %s ' , self :: OPERATION_TYPE_PROCESS , $ destinationName ))
363+ ->setSpanKind (SpanKind::KIND_CONSUMER ) // Changed from INTERNAL to CONSUMER per spec
320364 ->setAttribute (TraceAttributes::CODE_FUNCTION_NAME , $ function )
321365 ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
322366 ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
323367 ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
324368 ->setAttribute (TraceAttributes::MESSAGING_SYSTEM , 'symfony ' )
325- ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , ' process ' )
369+ ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , self :: OPERATION_TYPE_PROCESS )
326370 ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
371+ ->setAttribute (TraceAttributes::MESSAGING_DESTINATION_NAME , $ destinationName )
327372 ;
328373
329374 self ::addMessageStampsToSpan ($ builder , $ envelope );
@@ -379,19 +424,23 @@ public static function register(): void
379424 $ handlerClass = \get_class ($ handler );
380425 $ messageClass = \get_class ($ message );
381426
427+ // For handler-specific spans, use a custom destination format
428+ $ destinationName = sprintf ('%s::%s ' , $ handlerClass , $ messageClass );
429+
382430 /** @psalm-suppress ArgumentTypeCoercion */
383431 $ builder = $ instrumentation
384432 ->tracer ()
385- ->spanBuilder (\sprintf ('HANDLE %s:: %s ' , $ handlerClass , $ messageClass ))
386- ->setSpanKind (SpanKind::KIND_INTERNAL )
433+ ->spanBuilder (\sprintf ('%s %s ' , self :: OPERATION_TYPE_PROCESS , $ destinationName ))
434+ ->setSpanKind (SpanKind::KIND_CONSUMER ) // Changed from INTERNAL to CONSUMER per spec
387435 ->setAttribute (TraceAttributes::CODE_FUNCTION_NAME , $ function )
388436 ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
389437 ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
390438 ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
391439 ->setAttribute (TraceAttributes::MESSAGING_SYSTEM , 'symfony ' )
392- ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , ' process ' )
440+ ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , self :: OPERATION_TYPE_PROCESS )
393441 ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
394442 ->setAttribute (self ::ATTRIBUTE_MESSAGING_HANDLER , $ handlerClass )
443+ ->setAttribute (TraceAttributes::MESSAGING_DESTINATION_NAME , $ destinationName )
395444 ;
396445
397446 $ parent = Context::getCurrent ();
@@ -448,19 +497,23 @@ public static function register(): void
448497 $ messageClass = \get_class ($ envelope ->getMessage ());
449498 $ middlewareClass = \get_class ($ middleware );
450499
500+ // For middleware spans, use a custom destination format
501+ $ destinationName = sprintf ('%s::%s ' , $ middlewareClass , $ messageClass );
502+
451503 /** @psalm-suppress ArgumentTypeCoercion */
452504 $ builder = $ instrumentation
453505 ->tracer ()
454- ->spanBuilder (\sprintf ('MIDDLEWARE %s:: %s ' , $ middlewareClass , $ messageClass ))
455- ->setSpanKind (SpanKind::KIND_INTERNAL )
506+ ->spanBuilder (\sprintf ('%s %s ' , self :: OPERATION_TYPE_MIDDLEWARE , $ destinationName ))
507+ ->setSpanKind (SpanKind::KIND_INTERNAL ) // Keep as INTERNAL since middleware is not a standard messaging operation
456508 ->setAttribute (TraceAttributes::CODE_FUNCTION_NAME , $ function )
457509 ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
458510 ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
459511 ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
460512 ->setAttribute (TraceAttributes::MESSAGING_SYSTEM , 'symfony ' )
461- ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , ' middleware ' )
513+ ->setAttribute (TraceAttributes::MESSAGING_OPERATION_TYPE , self :: OPERATION_TYPE_MIDDLEWARE )
462514 ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
463515 ->setAttribute (self ::ATTRIBUTE_MESSAGING_MIDDLEWARE , $ middlewareClass )
516+ ->setAttribute (TraceAttributes::MESSAGING_DESTINATION_NAME , $ destinationName )
464517 ;
465518
466519 self ::addMessageStampsToSpan ($ builder , $ envelope );
@@ -500,6 +553,58 @@ public static function register(): void
500553 }
501554 }
502555
556+ /**
557+ * Determines if stable messaging conventions should be used.
558+ */
559+ private static function shouldUseStableConventions (): bool
560+ {
561+ $ optIn = getenv ('OTEL_SEMCONV_STABILITY_OPT_IN ' );
562+ if (!$ optIn ) {
563+ return false ;
564+ }
565+
566+ $ values = explode (', ' , $ optIn );
567+
568+ return in_array ('messaging ' , $ values ) || in_array ('messaging/dup ' , $ values );
569+ }
570+
571+ /**
572+ * Determines if both old and new conventions should be emitted.
573+ */
574+ private static function shouldEmitDuplicateConventions (): bool
575+ {
576+ $ optIn = getenv ('OTEL_SEMCONV_STABILITY_OPT_IN ' );
577+ if (!$ optIn ) {
578+ return false ;
579+ }
580+
581+ $ values = explode (', ' , $ optIn );
582+
583+ return in_array ('messaging/dup ' , $ values );
584+ }
585+
586+ /**
587+ * Gets the destination name from an envelope.
588+ */
589+ private static function getDestinationName (Envelope $ envelope ): ?string
590+ {
591+ $ sentStamp = $ envelope ->last (SentStamp::class);
592+ $ receivedStamp = $ envelope ->last (ReceivedStamp::class);
593+
594+ if ($ sentStamp && $ sentStamp ->getSenderAlias ()) {
595+ return $ sentStamp ->getSenderAlias ();
596+ }
597+
598+ if ($ receivedStamp ) {
599+ return $ receivedStamp ->getTransportName ();
600+ }
601+
602+ return null ;
603+ }
604+
605+ /**
606+ * Adds message stamps as span attributes.
607+ */
503608 private static function addMessageStampsToSpan (SpanBuilderInterface $ builder , Envelope $ envelope ): void
504609 {
505610 $ busStamp = $ envelope ->last (BusNameStamp::class);
0 commit comments