Skip to content

Commit fd2425b

Browse files
committed
feat: improve symfony messenger tracing
1 parent 56e4005 commit fd2425b

File tree

3 files changed

+467
-26
lines changed

3 files changed

+467
-26
lines changed

src/Instrumentation/Symfony/src/MessengerInstrumentation.php

Lines changed: 251 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,56 @@
44

55
namespace OpenTelemetry\Contrib\Instrumentation\Symfony;
66

7+
use OpenTelemetry\API\Globals;
78
use OpenTelemetry\API\Instrumentation\CachedInstrumentation;
89
use OpenTelemetry\API\Trace\Span;
910
use OpenTelemetry\API\Trace\SpanKind;
1011
use OpenTelemetry\API\Trace\StatusCode;
1112
use OpenTelemetry\Context\Context;
13+
use OpenTelemetry\Context\Propagation\GetterSetterInterface;
14+
use OpenTelemetry\Context\Propagation\PropagationGetterInterface;
15+
use OpenTelemetry\Context\Propagation\PropagationSetterInterface;
1216
use function OpenTelemetry\Instrumentation\hook;
1317
use OpenTelemetry\SemConv\TraceAttributes;
18+
use OpenTelemetry\Contrib\Instrumentation\Symfony\Propagation\EnvelopeContextPropagator;
1419
use Symfony\Component\Messenger\Envelope;
1520
use Symfony\Component\Messenger\MessageBusInterface;
21+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
22+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
23+
use Symfony\Component\Messenger\Stamp\DelayStamp;
24+
use Symfony\Component\Messenger\Stamp\HandledStamp;
25+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
26+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
27+
use Symfony\Component\Messenger\Stamp\SentStamp;
28+
use Symfony\Component\Messenger\Stamp\SerializerStamp;
29+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
30+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1631
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
32+
use Symfony\Component\Messenger\Worker;
1733

1834
/**
19-
* The messenger instrumentation will create an internal span for each message dispatched.
20-
*
21-
* It is currently not designed to facilitate trace context propagation.
22-
* This should be done at the transport level.
23-
*
24-
* An exception to this will be simple transports like the Doctrine and InMemory transports.
25-
*
26-
* Caution: MessageBuses can be nested, so we might wand to add
27-
* a message stamp to keep track of the parent span.
35+
* The messenger instrumentation will create spans for message operations in Symfony's Messenger system.
36+
* It supports distributed tracing and provides rich metadata about message processing.
2837
*/
2938
final class MessengerInstrumentation
3039
{
31-
const ATTRIBUTE_MESSENGER_BUS = 'symfony.messenger.bus';
32-
const ATTRIBUTE_MESSENGER_MESSAGE = 'symfony.messenger.message';
33-
const ATTRIBUTE_MESSENGER_TRANSPORT = 'symfony.messenger.transport';
40+
const ATTRIBUTE_MESSAGING_SYSTEM = 'messaging.system';
41+
const ATTRIBUTE_MESSAGING_OPERATION = 'messaging.operation';
42+
const ATTRIBUTE_MESSAGING_DESTINATION = 'messaging.destination';
43+
const ATTRIBUTE_MESSAGING_MESSAGE_ID = 'messaging.message_id';
44+
const ATTRIBUTE_MESSAGING_MESSAGE = 'messaging.message';
45+
const ATTRIBUTE_MESSAGING_BUS = 'messaging.symfony.bus';
46+
const ATTRIBUTE_MESSAGING_HANDLER = 'messaging.symfony.handler';
47+
const ATTRIBUTE_MESSAGING_REDELIVERED_AT = 'messaging.symfony.redelivered_at';
48+
const ATTRIBUTE_MESSAGING_SENDER = 'messaging.symfony.sender';
49+
const ATTRIBUTE_MESSAGING_DELAY = 'messaging.symfony.delay';
50+
const ATTRIBUTE_MESSAGING_RETRY_COUNT = 'messaging.symfony.retry_count';
51+
const ATTRIBUTE_MESSAGING_STAMPS = 'messaging.symfony.stamps';
52+
53+
// Constants used in tests
54+
const ATTRIBUTE_MESSENGER_BUS = self::ATTRIBUTE_MESSAGING_BUS;
55+
const ATTRIBUTE_MESSENGER_TRANSPORT = self::ATTRIBUTE_MESSAGING_DESTINATION;
56+
const ATTRIBUTE_MESSENGER_MESSAGE = self::ATTRIBUTE_MESSAGING_MESSAGE;
3457

3558
/** @psalm-suppress PossiblyUnusedMethod */
3659
public static function register(): void
@@ -43,9 +66,7 @@ public static function register(): void
4366

4467
/**
4568
* MessageBusInterface dispatches messages to the handlers.
46-
*
47-
* @psalm-suppress UnusedFunctionCall
48-
*/
69+
*/
4970
hook(
5071
MessageBusInterface::class,
5172
'dispatch',
@@ -70,11 +91,16 @@ public static function register(): void
7091
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
7192
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
7293
->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno)
73-
74-
->setAttribute(self::ATTRIBUTE_MESSENGER_BUS, $class)
75-
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
94+
->setAttribute(self::ATTRIBUTE_MESSAGING_SYSTEM, 'symfony')
95+
->setAttribute(self::ATTRIBUTE_MESSAGING_OPERATION, 'dispatch')
96+
->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass)
97+
->setAttribute(self::ATTRIBUTE_MESSAGING_BUS, $class)
7698
;
7799

100+
if ($message instanceof Envelope) {
101+
self::addMessageStampsToSpan($builder, $message);
102+
}
103+
78104
$parent = Context::getCurrent();
79105
$span = $builder
80106
->setParent($parent)
@@ -110,14 +136,12 @@ public static function register(): void
110136

111137
/**
112138
* SenderInterface sends messages to a transport.
113-
*
114-
* @psalm-suppress UnusedFunctionCall
115-
*/
139+
*/
116140
hook(
117141
SenderInterface::class,
118142
'send',
119143
pre: static function (
120-
SenderInterface $bus,
144+
SenderInterface $sender,
121145
array $params,
122146
string $class,
123147
string $function,
@@ -128,6 +152,14 @@ public static function register(): void
128152
$envelope = $params[0];
129153
$messageClass = \get_class($envelope->getMessage());
130154

155+
// Inject OpenTelemetry context into message envelope
156+
$propagator = Globals::propagator();
157+
$currentContext = Context::getCurrent();
158+
$headers = [];
159+
$propagator->inject($headers, EnvelopeContextPropagator::getInstance(), $currentContext);
160+
161+
$envelope = EnvelopeContextPropagator::getInstance()->injectContextIntoEnvelope($envelope, $headers);
162+
131163
/** @psalm-suppress ArgumentTypeCoercion */
132164
$builder = $instrumentation
133165
->tracer()
@@ -137,25 +169,169 @@ public static function register(): void
137169
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
138170
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
139171
->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno)
140-
141-
->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
142-
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
172+
->setAttribute(self::ATTRIBUTE_MESSAGING_SYSTEM, 'symfony')
173+
->setAttribute(self::ATTRIBUTE_MESSAGING_OPERATION, 'send')
174+
->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass)
175+
->setAttribute(self::ATTRIBUTE_MESSAGING_DESTINATION, $class)
143176
;
144177

178+
self::addMessageStampsToSpan($builder, $envelope);
179+
145180
$parent = Context::getCurrent();
181+
$span = $builder
182+
->setParent($parent)
183+
->startSpan();
184+
185+
$context = $span->storeInContext($parent);
186+
Context::storage()->attach($context);
187+
188+
return [$envelope];
189+
},
190+
post: static function (
191+
SenderInterface $sender,
192+
array $params,
193+
?Envelope $result,
194+
?\Throwable $exception
195+
): void {
196+
$scope = Context::storage()->scope();
197+
if (null === $scope) {
198+
return;
199+
}
200+
201+
$scope->detach();
202+
$span = Span::fromContext($scope->context());
203+
204+
if (null !== $exception) {
205+
$span->recordException($exception);
206+
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
207+
}
208+
209+
$span->end();
210+
}
211+
);
212+
213+
/**
214+
* Worker handles messages from a transport.
215+
*/
216+
hook(
217+
Worker::class,
218+
'handleMessage',
219+
pre: static function (
220+
Worker $worker,
221+
array $params,
222+
string $class,
223+
string $function,
224+
?string $filename,
225+
?int $lineno,
226+
) use ($instrumentation): array {
227+
/** @var Envelope $envelope */
228+
$envelope = $params[0];
229+
/** @var string|ReceiverInterface $transport */
230+
$transport = $params[1];
231+
$transportName = \is_object($transport) ? \get_class($transport) : $transport;
232+
233+
// Extract OpenTelemetry context from message envelope
234+
$propagator = Globals::propagator();
235+
$extractedContext = EnvelopeContextPropagator::getInstance()->extractContextFromEnvelope($envelope);
236+
if ($extractedContext !== null) {
237+
$propagator->extract($extractedContext, EnvelopeContextPropagator::getInstance());
238+
}
239+
240+
/** @psalm-suppress ArgumentTypeCoercion */
241+
$builder = $instrumentation
242+
->tracer()
243+
->spanBuilder(\sprintf('CONSUME %s', \get_class($envelope->getMessage())))
244+
->setSpanKind(SpanKind::KIND_CONSUMER)
245+
->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function)
246+
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
247+
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
248+
->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno)
249+
->setAttribute(self::ATTRIBUTE_MESSAGING_SYSTEM, 'symfony')
250+
->setAttribute(self::ATTRIBUTE_MESSAGING_OPERATION, 'receive')
251+
->setAttribute(self::ATTRIBUTE_MESSAGING_DESTINATION, $transportName)
252+
;
253+
254+
self::addMessageStampsToSpan($builder, $envelope);
146255

256+
$parent = Context::getCurrent();
147257
$span = $builder
148258
->setParent($parent)
149259
->startSpan();
150260

151261
$context = $span->storeInContext($parent);
262+
Context::storage()->attach($context);
152263

264+
return $params;
265+
},
266+
post: static function (
267+
Worker $worker,
268+
array $params,
269+
?Envelope $result,
270+
?\Throwable $exception
271+
): void {
272+
$scope = Context::storage()->scope();
273+
if (null === $scope) {
274+
return;
275+
}
276+
277+
$scope->detach();
278+
$span = Span::fromContext($scope->context());
279+
280+
if (null !== $exception) {
281+
$span->recordException($exception);
282+
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
283+
}
284+
285+
$span->end();
286+
}
287+
);
288+
289+
/**
290+
* HandleMessageMiddleware processes messages with handlers.
291+
*/
292+
hook(
293+
'Symfony\Component\Messenger\Middleware\HandleMessageMiddleware',
294+
'handle',
295+
pre: static function (
296+
object $middleware,
297+
array $params,
298+
string $class,
299+
string $function,
300+
?string $filename,
301+
?int $lineno,
302+
) use ($instrumentation): array {
303+
/** @var Envelope $envelope */
304+
$envelope = $params[0];
305+
$messageClass = \get_class($envelope->getMessage());
306+
307+
/** @psalm-suppress ArgumentTypeCoercion */
308+
$builder = $instrumentation
309+
->tracer()
310+
->spanBuilder(\sprintf('HANDLE %s', $messageClass))
311+
->setSpanKind(SpanKind::KIND_INTERNAL)
312+
->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function)
313+
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
314+
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
315+
->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno)
316+
->setAttribute(self::ATTRIBUTE_MESSAGING_SYSTEM, 'symfony')
317+
->setAttribute(self::ATTRIBUTE_MESSAGING_OPERATION, 'process')
318+
->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass)
319+
;
320+
321+
self::addMessageStampsToSpan($builder, $envelope);
322+
323+
$parent = Context::getCurrent();
324+
$span = $builder
325+
->setParent($parent)
326+
->startSpan();
327+
328+
$context = $span->storeInContext($parent);
153329
Context::storage()->attach($context);
154330

155331
return $params;
156332
},
157333
post: static function (
158-
SenderInterface $sender,
334+
object $middleware,
159335
array $params,
160336
?Envelope $result,
161337
?\Throwable $exception
@@ -177,4 +353,53 @@ public static function register(): void
177353
}
178354
);
179355
}
356+
357+
private static function addMessageStampsToSpan($builder, Envelope $envelope): void
358+
{
359+
$busStamp = $envelope->last(BusNameStamp::class);
360+
$consumedByWorkerStamp = $envelope->last(ConsumedByWorkerStamp::class);
361+
$delayStamp = $envelope->last(DelayStamp::class);
362+
$handledStamp = $envelope->last(HandledStamp::class);
363+
$receivedStamp = $envelope->last(ReceivedStamp::class);
364+
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
365+
$sentStamp = $envelope->last(SentStamp::class);
366+
$transportMessageIdStamp = $envelope->last(TransportMessageIdStamp::class);
367+
368+
if ($busStamp) {
369+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_BUS, $busStamp->getBusName());
370+
}
371+
372+
if ($handledStamp) {
373+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_HANDLER, $handledStamp->getHandlerName());
374+
}
375+
376+
if ($redeliveryStamp) {
377+
$builder->setAttribute(
378+
self::ATTRIBUTE_MESSAGING_REDELIVERED_AT,
379+
$redeliveryStamp->getRedeliveredAt()->format('Y-m-d\TH:i:sP')
380+
);
381+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_RETRY_COUNT, $redeliveryStamp->getRetryCount());
382+
}
383+
384+
if ($sentStamp) {
385+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_SENDER, $sentStamp->getSenderClass());
386+
}
387+
388+
if ($delayStamp) {
389+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_DELAY, $delayStamp->getDelay());
390+
}
391+
392+
if ($transportMessageIdStamp) {
393+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE_ID, $transportMessageIdStamp->getId());
394+
}
395+
396+
// Add count of all stamps as a metric
397+
$stamps = [];
398+
foreach ($envelope->all() as $stampFqcn => $instances) {
399+
$stamps[$stampFqcn] = \count($instances);
400+
}
401+
if (!empty($stamps)) {
402+
$builder->setAttribute(self::ATTRIBUTE_MESSAGING_STAMPS, $stamps);
403+
}
404+
}
180405
}

0 commit comments

Comments
 (0)