44
55namespace OpenTelemetry \Contrib \Instrumentation \Symfony ;
66
7+ use OpenTelemetry \API \Globals ;
78use OpenTelemetry \API \Instrumentation \CachedInstrumentation ;
89use OpenTelemetry \API \Trace \Span ;
910use OpenTelemetry \API \Trace \SpanKind ;
1011use OpenTelemetry \API \Trace \StatusCode ;
1112use OpenTelemetry \Context \Context ;
13+ use OpenTelemetry \Context \Propagation \GetterSetterInterface ;
14+ use OpenTelemetry \Context \Propagation \PropagationGetterInterface ;
15+ use OpenTelemetry \Context \Propagation \PropagationSetterInterface ;
1216use function OpenTelemetry \Instrumentation \hook ;
1317use OpenTelemetry \SemConv \TraceAttributes ;
18+ use OpenTelemetry \Contrib \Instrumentation \Symfony \Propagation \EnvelopeContextPropagator ;
1419use Symfony \Component \Messenger \Envelope ;
1520use Symfony \Component \Messenger \MessageBusInterface ;
21+ use Symfony \Component \Messenger \Stamp \BusNameStamp ;
22+ use Symfony \Component \Messenger \Stamp \ConsumedByWorkerStamp ;
23+ use Symfony \Component \Messenger \Stamp \DelayStamp ;
24+ use Symfony \Component \Messenger \Stamp \HandledStamp ;
25+ use Symfony \Component \Messenger \Stamp \ReceivedStamp ;
26+ use Symfony \Component \Messenger \Stamp \RedeliveryStamp ;
27+ use Symfony \Component \Messenger \Stamp \SentStamp ;
28+ use Symfony \Component \Messenger \Stamp \SerializerStamp ;
29+ use Symfony \Component \Messenger \Stamp \TransportMessageIdStamp ;
30+ use Symfony \Component \Messenger \Transport \Receiver \ReceiverInterface ;
1631use Symfony \Component \Messenger \Transport \Sender \SenderInterface ;
32+ use Symfony \Component \Messenger \Worker ;
1733
1834/**
19- * The messenger instrumentation will create an internal span for each message dispatched.
20- *
21- * It is currently not designed to facilitate trace context propagation.
22- * This should be done at the transport level.
23- *
24- * An exception to this will be simple transports like the Doctrine and InMemory transports.
25- *
26- * Caution: MessageBuses can be nested, so we might wand to add
27- * a message stamp to keep track of the parent span.
35+ * The messenger instrumentation will create spans for message operations in Symfony's Messenger system.
36+ * It supports distributed tracing and provides rich metadata about message processing.
2837 */
2938final class MessengerInstrumentation
3039{
31- const ATTRIBUTE_MESSENGER_BUS = 'symfony.messenger.bus ' ;
32- const ATTRIBUTE_MESSENGER_MESSAGE = 'symfony.messenger.message ' ;
33- const ATTRIBUTE_MESSENGER_TRANSPORT = 'symfony.messenger.transport ' ;
40+ const ATTRIBUTE_MESSAGING_SYSTEM = 'messaging.system ' ;
41+ const ATTRIBUTE_MESSAGING_OPERATION = 'messaging.operation ' ;
42+ const ATTRIBUTE_MESSAGING_DESTINATION = 'messaging.destination ' ;
43+ const ATTRIBUTE_MESSAGING_MESSAGE_ID = 'messaging.message_id ' ;
44+ const ATTRIBUTE_MESSAGING_MESSAGE = 'messaging.message ' ;
45+ const ATTRIBUTE_MESSAGING_BUS = 'messaging.symfony.bus ' ;
46+ const ATTRIBUTE_MESSAGING_HANDLER = 'messaging.symfony.handler ' ;
47+ const ATTRIBUTE_MESSAGING_REDELIVERED_AT = 'messaging.symfony.redelivered_at ' ;
48+ const ATTRIBUTE_MESSAGING_SENDER = 'messaging.symfony.sender ' ;
49+ const ATTRIBUTE_MESSAGING_DELAY = 'messaging.symfony.delay ' ;
50+ const ATTRIBUTE_MESSAGING_RETRY_COUNT = 'messaging.symfony.retry_count ' ;
51+ const ATTRIBUTE_MESSAGING_STAMPS = 'messaging.symfony.stamps ' ;
52+
53+ // Constants used in tests
54+ const ATTRIBUTE_MESSENGER_BUS = self ::ATTRIBUTE_MESSAGING_BUS ;
55+ const ATTRIBUTE_MESSENGER_TRANSPORT = self ::ATTRIBUTE_MESSAGING_DESTINATION ;
56+ const ATTRIBUTE_MESSENGER_MESSAGE = self ::ATTRIBUTE_MESSAGING_MESSAGE ;
3457
3558 /** @psalm-suppress PossiblyUnusedMethod */
3659 public static function register (): void
@@ -43,9 +66,7 @@ public static function register(): void
4366
4467 /**
4568 * MessageBusInterface dispatches messages to the handlers.
46- *
47- * @psalm-suppress UnusedFunctionCall
48- */
69+ */
4970 hook (
5071 MessageBusInterface::class,
5172 'dispatch ' ,
@@ -70,11 +91,16 @@ public static function register(): void
7091 ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
7192 ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
7293 ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
73-
74- ->setAttribute (self ::ATTRIBUTE_MESSENGER_BUS , $ class )
75- ->setAttribute (self ::ATTRIBUTE_MESSENGER_MESSAGE , $ messageClass )
94+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
95+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_OPERATION , 'dispatch ' )
96+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
97+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_BUS , $ class )
7698 ;
7799
100+ if ($ message instanceof Envelope) {
101+ self ::addMessageStampsToSpan ($ builder , $ message );
102+ }
103+
78104 $ parent = Context::getCurrent ();
79105 $ span = $ builder
80106 ->setParent ($ parent )
@@ -110,14 +136,12 @@ public static function register(): void
110136
111137 /**
112138 * SenderInterface sends messages to a transport.
113- *
114- * @psalm-suppress UnusedFunctionCall
115- */
139+ */
116140 hook (
117141 SenderInterface::class,
118142 'send ' ,
119143 pre: static function (
120- SenderInterface $ bus ,
144+ SenderInterface $ sender ,
121145 array $ params ,
122146 string $ class ,
123147 string $ function ,
@@ -128,6 +152,14 @@ public static function register(): void
128152 $ envelope = $ params [0 ];
129153 $ messageClass = \get_class ($ envelope ->getMessage ());
130154
155+ // Inject OpenTelemetry context into message envelope
156+ $ propagator = Globals::propagator ();
157+ $ currentContext = Context::getCurrent ();
158+ $ headers = [];
159+ $ propagator ->inject ($ headers , EnvelopeContextPropagator::getInstance (), $ currentContext );
160+
161+ $ envelope = EnvelopeContextPropagator::getInstance ()->injectContextIntoEnvelope ($ envelope , $ headers );
162+
131163 /** @psalm-suppress ArgumentTypeCoercion */
132164 $ builder = $ instrumentation
133165 ->tracer ()
@@ -137,25 +169,169 @@ public static function register(): void
137169 ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
138170 ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
139171 ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
140-
141- ->setAttribute (self ::ATTRIBUTE_MESSENGER_TRANSPORT , $ class )
142- ->setAttribute (self ::ATTRIBUTE_MESSENGER_MESSAGE , $ messageClass )
172+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
173+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_OPERATION , 'send ' )
174+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
175+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_DESTINATION , $ class )
143176 ;
144177
178+ self ::addMessageStampsToSpan ($ builder , $ envelope );
179+
145180 $ parent = Context::getCurrent ();
181+ $ span = $ builder
182+ ->setParent ($ parent )
183+ ->startSpan ();
184+
185+ $ context = $ span ->storeInContext ($ parent );
186+ Context::storage ()->attach ($ context );
187+
188+ return [$ envelope ];
189+ },
190+ post: static function (
191+ SenderInterface $ sender ,
192+ array $ params ,
193+ ?Envelope $ result ,
194+ ?\Throwable $ exception
195+ ): void {
196+ $ scope = Context::storage ()->scope ();
197+ if (null === $ scope ) {
198+ return ;
199+ }
200+
201+ $ scope ->detach ();
202+ $ span = Span::fromContext ($ scope ->context ());
203+
204+ if (null !== $ exception ) {
205+ $ span ->recordException ($ exception );
206+ $ span ->setStatus (StatusCode::STATUS_ERROR , $ exception ->getMessage ());
207+ }
208+
209+ $ span ->end ();
210+ }
211+ );
212+
213+ /**
214+ * Worker handles messages from a transport.
215+ */
216+ hook (
217+ Worker::class,
218+ 'handleMessage ' ,
219+ pre: static function (
220+ Worker $ worker ,
221+ array $ params ,
222+ string $ class ,
223+ string $ function ,
224+ ?string $ filename ,
225+ ?int $ lineno ,
226+ ) use ($ instrumentation ): array {
227+ /** @var Envelope $envelope */
228+ $ envelope = $ params [0 ];
229+ /** @var string|ReceiverInterface $transport */
230+ $ transport = $ params [1 ];
231+ $ transportName = \is_object ($ transport ) ? \get_class ($ transport ) : $ transport ;
232+
233+ // Extract OpenTelemetry context from message envelope
234+ $ propagator = Globals::propagator ();
235+ $ extractedContext = EnvelopeContextPropagator::getInstance ()->extractContextFromEnvelope ($ envelope );
236+ if ($ extractedContext !== null ) {
237+ $ propagator ->extract ($ extractedContext , EnvelopeContextPropagator::getInstance ());
238+ }
239+
240+ /** @psalm-suppress ArgumentTypeCoercion */
241+ $ builder = $ instrumentation
242+ ->tracer ()
243+ ->spanBuilder (\sprintf ('CONSUME %s ' , \get_class ($ envelope ->getMessage ())))
244+ ->setSpanKind (SpanKind::KIND_CONSUMER )
245+ ->setAttribute (TraceAttributes::CODE_FUNCTION_NAME , $ function )
246+ ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
247+ ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
248+ ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
249+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
250+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_OPERATION , 'receive ' )
251+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_DESTINATION , $ transportName )
252+ ;
253+
254+ self ::addMessageStampsToSpan ($ builder , $ envelope );
146255
256+ $ parent = Context::getCurrent ();
147257 $ span = $ builder
148258 ->setParent ($ parent )
149259 ->startSpan ();
150260
151261 $ context = $ span ->storeInContext ($ parent );
262+ Context::storage ()->attach ($ context );
152263
264+ return $ params ;
265+ },
266+ post: static function (
267+ Worker $ worker ,
268+ array $ params ,
269+ ?Envelope $ result ,
270+ ?\Throwable $ exception
271+ ): void {
272+ $ scope = Context::storage ()->scope ();
273+ if (null === $ scope ) {
274+ return ;
275+ }
276+
277+ $ scope ->detach ();
278+ $ span = Span::fromContext ($ scope ->context ());
279+
280+ if (null !== $ exception ) {
281+ $ span ->recordException ($ exception );
282+ $ span ->setStatus (StatusCode::STATUS_ERROR , $ exception ->getMessage ());
283+ }
284+
285+ $ span ->end ();
286+ }
287+ );
288+
289+ /**
290+ * HandleMessageMiddleware processes messages with handlers.
291+ */
292+ hook (
293+ 'Symfony\Component\Messenger\Middleware\HandleMessageMiddleware ' ,
294+ 'handle ' ,
295+ pre: static function (
296+ object $ middleware ,
297+ array $ params ,
298+ string $ class ,
299+ string $ function ,
300+ ?string $ filename ,
301+ ?int $ lineno ,
302+ ) use ($ instrumentation ): array {
303+ /** @var Envelope $envelope */
304+ $ envelope = $ params [0 ];
305+ $ messageClass = \get_class ($ envelope ->getMessage ());
306+
307+ /** @psalm-suppress ArgumentTypeCoercion */
308+ $ builder = $ instrumentation
309+ ->tracer ()
310+ ->spanBuilder (\sprintf ('HANDLE %s ' , $ messageClass ))
311+ ->setSpanKind (SpanKind::KIND_INTERNAL )
312+ ->setAttribute (TraceAttributes::CODE_FUNCTION_NAME , $ function )
313+ ->setAttribute (TraceAttributes::CODE_NAMESPACE , $ class )
314+ ->setAttribute (TraceAttributes::CODE_FILEPATH , $ filename )
315+ ->setAttribute (TraceAttributes::CODE_LINE_NUMBER , $ lineno )
316+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_SYSTEM , 'symfony ' )
317+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_OPERATION , 'process ' )
318+ ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE , $ messageClass )
319+ ;
320+
321+ self ::addMessageStampsToSpan ($ builder , $ envelope );
322+
323+ $ parent = Context::getCurrent ();
324+ $ span = $ builder
325+ ->setParent ($ parent )
326+ ->startSpan ();
327+
328+ $ context = $ span ->storeInContext ($ parent );
153329 Context::storage ()->attach ($ context );
154330
155331 return $ params ;
156332 },
157333 post: static function (
158- SenderInterface $ sender ,
334+ object $ middleware ,
159335 array $ params ,
160336 ?Envelope $ result ,
161337 ?\Throwable $ exception
@@ -177,4 +353,53 @@ public static function register(): void
177353 }
178354 );
179355 }
356+
357+ private static function addMessageStampsToSpan ($ builder , Envelope $ envelope ): void
358+ {
359+ $ busStamp = $ envelope ->last (BusNameStamp::class);
360+ $ consumedByWorkerStamp = $ envelope ->last (ConsumedByWorkerStamp::class);
361+ $ delayStamp = $ envelope ->last (DelayStamp::class);
362+ $ handledStamp = $ envelope ->last (HandledStamp::class);
363+ $ receivedStamp = $ envelope ->last (ReceivedStamp::class);
364+ $ redeliveryStamp = $ envelope ->last (RedeliveryStamp::class);
365+ $ sentStamp = $ envelope ->last (SentStamp::class);
366+ $ transportMessageIdStamp = $ envelope ->last (TransportMessageIdStamp::class);
367+
368+ if ($ busStamp ) {
369+ $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_BUS , $ busStamp ->getBusName ());
370+ }
371+
372+ if ($ handledStamp ) {
373+ $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_HANDLER , $ handledStamp ->getHandlerName ());
374+ }
375+
376+ if ($ redeliveryStamp ) {
377+ $ builder ->setAttribute (
378+ self ::ATTRIBUTE_MESSAGING_REDELIVERED_AT ,
379+ $ redeliveryStamp ->getRedeliveredAt ()->format ('Y-m-d\TH:i:sP ' )
380+ );
381+ $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_RETRY_COUNT , $ redeliveryStamp ->getRetryCount ());
382+ }
383+
384+ if ($ sentStamp ) {
385+ $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_SENDER , $ sentStamp ->getSenderClass ());
386+ }
387+
388+ if ($ delayStamp ) {
389+ $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_DELAY , $ delayStamp ->getDelay ());
390+ }
391+
392+ if ($ transportMessageIdStamp ) {
393+ $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_MESSAGE_ID , $ transportMessageIdStamp ->getId ());
394+ }
395+
396+ // Add count of all stamps as a metric
397+ $ stamps = [];
398+ foreach ($ envelope ->all () as $ stampFqcn => $ instances ) {
399+ $ stamps [$ stampFqcn ] = \count ($ instances );
400+ }
401+ if (!empty ($ stamps )) {
402+ $ builder ->setAttribute (self ::ATTRIBUTE_MESSAGING_STAMPS , $ stamps );
403+ }
404+ }
180405}
0 commit comments