Skip to content

Commit 280514a

Browse files
committed
feat: instrument middleware
1 parent 3cef79c commit 280514a

File tree

2 files changed

+340
-15
lines changed

2 files changed

+340
-15
lines changed

src/Instrumentation/Symfony/src/MessengerInstrumentation.php

Lines changed: 201 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@
2626
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2727
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
2828
use Symfony\Component\Messenger\Worker;
29+
use OpenTelemetry\API\Trace\SpanBuilderInterface;
30+
31+
// Add Amazon SQS stamp class if available
32+
if (\class_exists('Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp')) {
33+
class_alias(
34+
'Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp',
35+
'OpenTelemetry\Contrib\Instrumentation\Symfony\AmazonSqsReceivedStamp'
36+
);
37+
}
2938

3039
/**
3140
* The messenger instrumentation will create spans for message operations in Symfony's Messenger system.
@@ -226,6 +235,12 @@ public static function register(): void
226235
$transport = $params[1];
227236
$transportName = \is_object($transport) ? \get_class($transport) : $transport;
228237

238+
// Use ReceivedStamp transport name if available
239+
$receivedStamp = $envelope->last(ReceivedStamp::class);
240+
if ($receivedStamp) {
241+
$transportName = $receivedStamp->getTransportName();
242+
}
243+
229244
// Extract OpenTelemetry context from message envelope
230245
$propagator = Globals::propagator();
231246
$extractedContext = EnvelopeContextPropagator::getInstance()->extractContextFromEnvelope($envelope);
@@ -348,9 +363,164 @@ public static function register(): void
348363
$span->end();
349364
}
350365
);
366+
367+
// Add instrumentation for individual handlers in Symfony 6.2+
368+
if (method_exists('Symfony\Component\Messenger\Middleware\HandleMessageMiddleware', 'callHandler')) {
369+
hook(
370+
'Symfony\Component\Messenger\Middleware\HandleMessageMiddleware',
371+
'callHandler',
372+
pre: static function (
373+
object $middleware,
374+
array $params,
375+
string $class,
376+
string $function,
377+
?string $filename,
378+
?int $lineno,
379+
) use ($instrumentation): array {
380+
$handler = $params[0];
381+
$message = $params[1];
382+
$handlerClass = \get_class($handler);
383+
$messageClass = \get_class($message);
384+
385+
/** @psalm-suppress ArgumentTypeCoercion */
386+
$builder = $instrumentation
387+
->tracer()
388+
->spanBuilder(\sprintf('HANDLE %s::%s', $handlerClass, $messageClass))
389+
->setSpanKind(SpanKind::KIND_INTERNAL)
390+
->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function)
391+
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
392+
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
393+
->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno)
394+
->setAttribute(self::ATTRIBUTE_MESSAGING_SYSTEM, 'symfony')
395+
->setAttribute(self::ATTRIBUTE_MESSAGING_OPERATION, 'process')
396+
->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass)
397+
->setAttribute(self::ATTRIBUTE_MESSAGING_HANDLER, $handlerClass)
398+
;
399+
400+
$parent = Context::getCurrent();
401+
$span = $builder
402+
->setParent($parent)
403+
->startSpan();
404+
405+
$context = $span->storeInContext($parent);
406+
Context::storage()->attach($context);
407+
408+
return $params;
409+
},
410+
post: static function (
411+
object $middleware,
412+
array $params,
413+
$result,
414+
?\Throwable $exception
415+
): void {
416+
$scope = Context::storage()->scope();
417+
if (null === $scope) {
418+
return;
419+
}
420+
421+
$scope->detach();
422+
$span = Span::fromContext($scope->context());
423+
424+
if (null !== $exception) {
425+
$span->recordException($exception);
426+
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
427+
}
428+
429+
$span->end();
430+
}
431+
);
432+
}
433+
434+
if (getenv('OTEL_PHP_MESSENGER_INSTRUMENT_MIDDLEWARES')) {
435+
/**
436+
* Instrument all middlewares
437+
*/
438+
hook(
439+
'Symfony\Component\Messenger\Middleware\MiddlewareInterface',
440+
'handle',
441+
pre: static function (
442+
object $middleware,
443+
array $params,
444+
string $class,
445+
string $function,
446+
?string $filename,
447+
?int $lineno,
448+
) use ($instrumentation): array {
449+
/** @var Envelope $envelope */
450+
$envelope = $params[0];
451+
$messageClass = \get_class($envelope->getMessage());
452+
$middlewareClass = \get_class($middleware);
453+
454+
/** @psalm-suppress ArgumentTypeCoercion */
455+
$builder = $instrumentation
456+
->tracer()
457+
->spanBuilder(\sprintf('MIDDLEWARE %s::%s', $middlewareClass, $messageClass))
458+
->setSpanKind(SpanKind::KIND_INTERNAL)
459+
->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function)
460+
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
461+
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
462+
->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno)
463+
->setAttribute(self::ATTRIBUTE_MESSAGING_SYSTEM, 'symfony')
464+
->setAttribute(self::ATTRIBUTE_MESSAGING_OPERATION, 'middleware')
465+
->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass)
466+
->setAttribute('messaging.symfony.middleware', $middlewareClass)
467+
;
468+
469+
self::addMessageStampsToSpan($builder, $envelope);
470+
471+
$parent = Context::getCurrent();
472+
$span = $builder
473+
->setParent($parent)
474+
->startSpan();
475+
476+
$context = $span->storeInContext($parent);
477+
Context::storage()->attach($context);
478+
479+
return $params;
480+
},
481+
post: static function (
482+
object $middleware,
483+
array $params,
484+
?Envelope $result,
485+
?\Throwable $exception
486+
): void {
487+
$scope = Context::storage()->scope();
488+
if (null === $scope) {
489+
return;
490+
}
491+
492+
$scope->detach();
493+
$span = Span::fromContext($scope->context());
494+
495+
if (null !== $exception) {
496+
$span->recordException($exception);
497+
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
498+
}
499+
500+
$span->end();
501+
}
502+
);
503+
}
504+
}
505+
506+
private static function buildResourceName(string $messageClass, ?string $transportName = null, ?string $operation = null): string
507+
{
508+
if (empty($transportName)) {
509+
return $messageClass;
510+
}
511+
512+
if ($operation === 'send') {
513+
return \sprintf('%s -> %s', $messageClass, $transportName);
514+
}
515+
516+
if ($operation === 'receive' || $operation === 'consume') {
517+
return \sprintf('%s -> %s', $transportName, $messageClass);
518+
}
519+
520+
return \sprintf('%s -> %s', $messageClass, $transportName);
351521
}
352522

353-
private static function addMessageStampsToSpan($builder, Envelope $envelope): void
523+
private static function addMessageStampsToSpan(SpanBuilderInterface $builder, Envelope $envelope): void
354524
{
355525
$busStamp = $envelope->last(BusNameStamp::class);
356526
$consumedByWorkerStamp = $envelope->last(ConsumedByWorkerStamp::class);
@@ -361,6 +531,10 @@ private static function addMessageStampsToSpan($builder, Envelope $envelope): vo
361531
$sentStamp = $envelope->last(SentStamp::class);
362532
$transportMessageIdStamp = $envelope->last(TransportMessageIdStamp::class);
363533

534+
$messageClass = \get_class($envelope->getMessage());
535+
$transportName = null;
536+
$operation = null;
537+
364538
if ($busStamp) {
365539
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_BUS, $busStamp->getBusName());
366540
}
@@ -370,32 +544,48 @@ private static function addMessageStampsToSpan($builder, Envelope $envelope): vo
370544
}
371545

372546
if ($redeliveryStamp) {
373-
$builder->setAttribute(
374-
self::ATTRIBUTE_MESSAGING_REDELIVERED_AT,
375-
$redeliveryStamp->getRedeliveredAt()->format('Y-m-d\TH:i:sP')
376-
);
547+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_REDELIVERED_AT, $redeliveryStamp->getRedeliveredAt()->format('Y-m-d\TH:i:sP'));
377548
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_RETRY_COUNT, $redeliveryStamp->getRetryCount());
378549
}
379550

380551
if ($sentStamp) {
381552
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_SENDER, $sentStamp->getSenderClass());
382-
}
383-
384-
if ($delayStamp) {
385-
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_DELAY, $delayStamp->getDelay());
553+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_DESTINATION, $sentStamp->getSenderAlias());
554+
$transportName = $sentStamp->getSenderAlias();
555+
$operation = 'send';
556+
} elseif ($receivedStamp) {
557+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_DESTINATION, $receivedStamp->getTransportName());
558+
$transportName = $receivedStamp->getTransportName();
559+
$operation = 'receive';
386560
}
387561

388562
if ($transportMessageIdStamp) {
389563
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE_ID, $transportMessageIdStamp->getId());
390564
}
391565

392-
// Add count of all stamps as a metric
566+
if ($delayStamp) {
567+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_DELAY, $delayStamp->getDelay());
568+
}
569+
570+
// Add all stamps count
393571
$stamps = [];
394572
foreach ($envelope->all() as $stampFqcn => $instances) {
395573
$stamps[$stampFqcn] = \count($instances);
396574
}
397575
if (!empty($stamps)) {
398-
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_STAMPS, $stamps);
576+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_STAMPS, \json_encode($stamps));
399577
}
578+
579+
// Support for Amazon SQS
580+
if (\class_exists('Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp')) {
581+
/** @var \Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp|null $amazonSqsReceivedStamp */
582+
$amazonSqsReceivedStamp = $envelope->last('Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp');
583+
if ($amazonSqsReceivedStamp && !$transportMessageIdStamp && method_exists($amazonSqsReceivedStamp, 'getId')) {
584+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE_ID, $amazonSqsReceivedStamp->getId());
585+
}
586+
}
587+
588+
// Set resource name
589+
$builder->setAttribute('resource.name', self::buildResourceName($messageClass, $transportName, $operation));
400590
}
401591
}

0 commit comments

Comments
 (0)