Skip to content

Commit 479ac8c

Browse files
authored
Record spans for symfony messenger (#215)
* Record spans for symfony messenger * Use Legacy InMemory transport where applicable * Abort running CI on new pushes * Add messenger tests & map test root * Add instrumentations to README.md
1 parent 9d2c286 commit 479ac8c

File tree

5 files changed

+313
-1
lines changed

5 files changed

+313
-1
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ install and configure the extension and SDK.
2323
The following features are supported:
2424
* root span creation (`Symfony\Component\HttpKernel\HttpKernel::handle` hook)
2525
* context propagation
26+
* HttpClient client span creation
27+
* HTTPClient context propagation
28+
* Message Bus span creation
29+
* Message Transport span creation
2630

2731
## Installation via composer
2832

_register.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
declare(strict_types=1);
44

55
use OpenTelemetry\Contrib\Instrumentation\Symfony\HttpClientInstrumentation;
6+
use OpenTelemetry\Contrib\Instrumentation\Symfony\MessengerInstrumentation;
67
use OpenTelemetry\Contrib\Instrumentation\Symfony\SymfonyInstrumentation;
78
use OpenTelemetry\SDK\Sdk;
89

@@ -17,4 +18,5 @@
1718
}
1819

1920
SymfonyInstrumentation::register();
21+
MessengerInstrumentation::register();
2022
HttpClientInstrumentation::register();

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"phpunit/phpunit": "^9.5",
3131
"vimeo/psalm": "^4.0",
3232
"symfony/http-client": "^5.4||^6.0",
33+
"symfony/messenger": "^5.4||^6.0",
3334
"open-telemetry/opentelemetry-propagation-traceresponse": "*",
3435
"open-telemetry/opentelemetry-propagation-server-timing": "*"
3536
},
@@ -43,7 +44,7 @@
4344
},
4445
"autoload-dev": {
4546
"psr-4": {
46-
"OpenTelemetry\\Tests\\Instrumentation\\Symfony\\": "tests/"
47+
"OpenTelemetry\\Tests\\Instrumentation\\Symfony\\tests\\": "tests/"
4748
}
4849
},
4950
"config": {

src/MessengerInstrumentation.php

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace OpenTelemetry\Contrib\Instrumentation\Symfony;
6+
7+
use OpenTelemetry\API\Instrumentation\CachedInstrumentation;
8+
use OpenTelemetry\API\Trace\Span;
9+
use OpenTelemetry\API\Trace\SpanKind;
10+
use OpenTelemetry\API\Trace\StatusCode;
11+
use OpenTelemetry\Context\Context;
12+
use function OpenTelemetry\Instrumentation\hook;
13+
use OpenTelemetry\SemConv\TraceAttributes;
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\MessageBusInterface;
16+
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
17+
18+
/**
19+
* The messenger instrumentation will create an internal span for each message dispatched.
20+
*
21+
* It is currently not designed to facilitate trace context propagation.
22+
* This should be done at the transport level.
23+
*
24+
* An exception to this will be simple transports like the Doctrine and InMemory transports.
25+
*
26+
* Caution: MessageBuses can be nested, so we might wand to add
27+
* a message stamp to keep track of the parent span.
28+
*/
29+
final class MessengerInstrumentation
30+
{
31+
const ATTRIBUTE_MESSENGER_BUS = 'symfony.messenger.bus';
32+
const ATTRIBUTE_MESSENGER_MESSAGE = 'symfony.messenger.message';
33+
const ATTRIBUTE_MESSENGER_TRANSPORT = 'symfony.messenger.transport';
34+
35+
public static function register(): void
36+
{
37+
$instrumentation = new CachedInstrumentation('io.opentelemetry.contrib.php.symfony_messenger');
38+
39+
/**
40+
* MessageBusInterface dispatches messages to the handlers.
41+
*/
42+
hook(
43+
MessageBusInterface::class,
44+
'dispatch',
45+
pre: static function (
46+
MessageBusInterface $bus,
47+
array $params,
48+
string $class,
49+
string $function,
50+
?string $filename,
51+
?int $lineno,
52+
) use ($instrumentation): array {
53+
/** @var object|Envelope $message */
54+
$message = $params[0];
55+
$messageClass = \get_class($message);
56+
57+
/** @psalm-suppress ArgumentTypeCoercion */
58+
$builder = $instrumentation
59+
->tracer()
60+
->spanBuilder(\sprintf('DISPATCH %s', $messageClass))
61+
->setSpanKind(SpanKind::KIND_INTERNAL)
62+
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
63+
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
64+
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
65+
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)
66+
67+
->setAttribute(self::ATTRIBUTE_MESSENGER_BUS, $class)
68+
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
69+
;
70+
71+
$parent = Context::getCurrent();
72+
$span = $builder
73+
->setParent($parent)
74+
->startSpan();
75+
76+
$context = $span->storeInContext($parent);
77+
Context::storage()->attach($context);
78+
79+
return $params;
80+
},
81+
post: static function (
82+
MessageBusInterface $bus,
83+
array $params,
84+
Envelope $result,
85+
?\Throwable $exception
86+
): void {
87+
$scope = Context::storage()->scope();
88+
if (null === $scope) {
89+
return;
90+
}
91+
92+
$scope->detach();
93+
$span = Span::fromContext($scope->context());
94+
95+
if (null !== $exception) {
96+
$span->recordException($exception, [
97+
TraceAttributes::EXCEPTION_ESCAPED => true,
98+
]);
99+
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
100+
}
101+
102+
$span->end();
103+
}
104+
);
105+
106+
/**
107+
* SenderInterface sends messages to a transport.
108+
*/
109+
hook(
110+
SenderInterface::class,
111+
'send',
112+
pre: static function (
113+
SenderInterface $bus,
114+
array $params,
115+
string $class,
116+
string $function,
117+
?string $filename,
118+
?int $lineno,
119+
) use ($instrumentation): array {
120+
/** @var Envelope $envelope */
121+
$envelope = $params[0];
122+
$messageClass = \get_class($envelope->getMessage());
123+
124+
/** @psalm-suppress ArgumentTypeCoercion */
125+
$builder = $instrumentation
126+
->tracer()
127+
->spanBuilder(\sprintf('SEND %s', $messageClass))
128+
->setSpanKind(SpanKind::KIND_INTERNAL)
129+
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
130+
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
131+
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
132+
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)
133+
134+
->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
135+
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
136+
;
137+
138+
$parent = Context::getCurrent();
139+
140+
$span = $builder
141+
->setParent($parent)
142+
->startSpan();
143+
144+
$context = $span->storeInContext($parent);
145+
146+
Context::storage()->attach($context);
147+
148+
return $params;
149+
},
150+
post: static function (
151+
SenderInterface $sender,
152+
array $params,
153+
Envelope $result,
154+
?\Throwable $exception
155+
): void {
156+
$scope = Context::storage()->scope();
157+
if (null === $scope) {
158+
return;
159+
}
160+
161+
$scope->detach();
162+
$span = Span::fromContext($scope->context());
163+
164+
if (null !== $exception) {
165+
$span->recordException($exception, [
166+
TraceAttributes::EXCEPTION_ESCAPED => true,
167+
]);
168+
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
169+
}
170+
171+
$span->end();
172+
}
173+
);
174+
}
175+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration;
6+
7+
use OpenTelemetry\API\Trace\SpanKind;
8+
use OpenTelemetry\Contrib\Instrumentation\Symfony\MessengerInstrumentation;
9+
use OpenTelemetry\SDK\Trace\ImmutableSpan;
10+
use Symfony\Component\Messenger\Envelope;
11+
use Symfony\Component\Messenger\MessageBus;
12+
use Symfony\Component\Messenger\MessageBusInterface;
13+
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
14+
use Symfony\Component\Messenger\Transport\InMemoryTransport as LegacyInmemoryTransport;
15+
16+
final class SendEmailMessage
17+
{
18+
private string $message;
19+
20+
public function __construct(string $message)
21+
{
22+
$this->message = $message;
23+
}
24+
25+
public function getMessage(): string
26+
{
27+
return $this->message;
28+
}
29+
}
30+
31+
final class MessengerInstrumentationTest extends AbstractTest
32+
{
33+
protected function getMessenger(): MessageBusInterface
34+
{
35+
return new MessageBus();
36+
}
37+
protected function getTransport()
38+
{
39+
// Symfony 6+
40+
if (class_exists(InMemoryTransport::class)) {
41+
return new InMemoryTransport();
42+
}
43+
44+
// Symfony 5+
45+
return new LegacyInmemoryTransport();
46+
}
47+
48+
/**
49+
* @dataProvider dispatchDataProvider
50+
* @param mixed $message
51+
* @param string $spanName
52+
* @param int $kind
53+
* @param array $attributes
54+
*/
55+
public function test_dispatch_message($message, string $spanName, int $kind, array $attributes)
56+
{
57+
$bus = $this->getMessenger();
58+
59+
$bus->dispatch($message);
60+
61+
$this->assertCount(1, $this->storage);
62+
63+
/** @var ImmutableSpan $span */
64+
$span = $this->storage[0];
65+
66+
$this->assertEquals($spanName, $span->getName());
67+
$this->assertEquals($kind, $span->getKind());
68+
69+
foreach ($attributes as $key => $value) {
70+
$this->assertTrue($span->getAttributes()->has($key), sprintf('Attribute %s not found', $key));
71+
$this->assertEquals($value, $span->getAttributes()->get($key));
72+
}
73+
}
74+
75+
/**
76+
* @dataProvider sendDataProvider
77+
* @param mixed $message
78+
* @param string $spanName
79+
* @param int $kind
80+
* @param array $attributes
81+
*/
82+
public function test_send_message($message, string $spanName, int $kind, array $attributes)
83+
{
84+
$transport = $this->getTransport();
85+
$transport->send(new Envelope($message));
86+
87+
$this->assertCount(1, $this->storage);
88+
89+
/** @var ImmutableSpan $span */
90+
$span = $this->storage[0];
91+
92+
$this->assertEquals($spanName, $span->getName());
93+
$this->assertEquals($kind, $span->getKind());
94+
95+
foreach ($attributes as $key => $value) {
96+
$this->assertTrue($span->getAttributes()->has($key), sprintf('Attribute %s not found', $key));
97+
$this->assertEquals($value, $span->getAttributes()->get($key));
98+
}
99+
}
100+
101+
public function sendDataProvider(): array
102+
{
103+
return [
104+
[
105+
new SendEmailMessage('Hello Again'),
106+
'SEND OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage',
107+
SpanKind::KIND_INTERNAL,
108+
[
109+
MessengerInstrumentation::ATTRIBUTE_MESSENGER_TRANSPORT => class_exists('Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport') ? 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport' : 'Symfony\Component\Messenger\Transport\InMemoryTransport',
110+
MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage',
111+
],
112+
],
113+
];
114+
}
115+
116+
public function dispatchDataProvider(): array
117+
{
118+
return [
119+
[
120+
new SendEmailMessage('Hello Again'),
121+
'DISPATCH OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage',
122+
SpanKind::KIND_INTERNAL,
123+
[
124+
MessengerInstrumentation::ATTRIBUTE_MESSENGER_BUS => 'Symfony\Component\Messenger\MessageBus',
125+
MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage',
126+
],
127+
],
128+
];
129+
}
130+
}

0 commit comments

Comments
 (0)