Skip to content

Commit 5e9b17c

Browse files
committed
[Messenger] Define multiple buses from the framework.messenger.buses configuration
1 parent 981e67d commit 5e9b17c

File tree

10 files changed

+364
-48
lines changed

10 files changed

+364
-48
lines changed

DataCollector/MessengerDataCollector.php

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,34 @@
1414
use Symfony\Component\HttpFoundation\Request;
1515
use Symfony\Component\HttpFoundation\Response;
1616
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
17-
use Symfony\Component\Messenger\MiddlewareInterface;
17+
use Symfony\Component\Messenger\TraceableMessageBus;
1818

1919
/**
2020
* @author Samuel Roze <[email protected]>
2121
*
2222
* @experimental in 4.1
2323
*/
24-
class MessengerDataCollector extends DataCollector implements MiddlewareInterface
24+
class MessengerDataCollector extends DataCollector
2525
{
26+
private $traceableBuses = array();
27+
28+
public function registerBus(string $name, TraceableMessageBus $bus)
29+
{
30+
$this->traceableBuses[$name] = $bus;
31+
}
32+
2633
/**
2734
* {@inheritdoc}
2835
*/
2936
public function collect(Request $request, Response $response, \Exception $exception = null)
3037
{
31-
// noop
38+
$this->data = array('messages' => array());
39+
40+
foreach ($this->traceableBuses as $busName => $bus) {
41+
foreach ($bus->getDispatchedMessages() as $message) {
42+
$this->data['messages'][] = $this->collectMessage($busName, $message);
43+
}
44+
}
3245
}
3346

3447
/**
@@ -47,21 +60,20 @@ public function reset()
4760
$this->data = array();
4861
}
4962

50-
/**
51-
* {@inheritdoc}
52-
*/
53-
public function handle($message, callable $next)
63+
private function collectMessage(string $busName, array $tracedMessage)
5464
{
65+
$message = $tracedMessage['message'];
66+
5567
$debugRepresentation = array(
68+
'bus' => $busName,
5669
'message' => array(
5770
'type' => \get_class($message),
5871
'object' => $this->cloneVar($message),
5972
),
6073
);
6174

62-
$exception = null;
63-
try {
64-
$result = $next($message);
75+
if (array_key_exists('result', $tracedMessage)) {
76+
$result = $tracedMessage['result'];
6577

6678
if (\is_object($result)) {
6779
$debugRepresentation['result'] = array(
@@ -79,20 +91,18 @@ public function handle($message, callable $next)
7991
'value' => $result,
8092
);
8193
}
82-
} catch (\Throwable $exception) {
94+
}
95+
96+
if (isset($tracedMessage['exception'])) {
97+
$exception = $tracedMessage['exception'];
98+
8399
$debugRepresentation['exception'] = array(
84100
'type' => \get_class($exception),
85101
'message' => $exception->getMessage(),
86102
);
87103
}
88104

89-
$this->data['messages'][] = $debugRepresentation;
90-
91-
if (null !== $exception) {
92-
throw $exception;
93-
}
94-
95-
return $result;
105+
return $debugRepresentation;
96106
}
97107

98108
public function getMessages(): array

DependencyInjection/MessengerPass.php

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\DependencyInjection;
1313

14+
use Symfony\Component\DependencyInjection\ChildDefinition;
1415
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
1516
use Symfony\Component\DependencyInjection\Compiler\PriorityTaggedServiceTrait;
1617
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
@@ -21,6 +22,7 @@
2122
use Symfony\Component\Messenger\Handler\ChainHandler;
2223
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
2324
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
25+
use Symfony\Component\Messenger\TraceableMessageBus;
2426

2527
/**
2628
* @author Samuel Roze <[email protected]>
@@ -29,28 +31,38 @@ class MessengerPass implements CompilerPassInterface
2931
{
3032
use PriorityTaggedServiceTrait;
3133

32-
private $messageBusService;
33-
private $messageHandlerResolverService;
3434
private $handlerTag;
35+
private $busTag;
36+
private $senderTag;
37+
private $receiverTag;
3538

36-
public function __construct(string $messageBusService = 'message_bus', string $messageHandlerResolverService = 'messenger.handler_resolver', string $handlerTag = 'messenger.message_handler')
39+
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver')
3740
{
38-
$this->messageBusService = $messageBusService;
39-
$this->messageHandlerResolverService = $messageHandlerResolverService;
4041
$this->handlerTag = $handlerTag;
42+
$this->busTag = $busTag;
43+
$this->senderTag = $senderTag;
44+
$this->receiverTag = $receiverTag;
4145
}
4246

4347
/**
4448
* {@inheritdoc}
4549
*/
4650
public function process(ContainerBuilder $container)
4751
{
48-
if (!$container->hasDefinition($this->messageBusService)) {
52+
if (!$container->hasDefinition('messenger.handler_resolver')) {
4953
return;
5054
}
5155

52-
if (!$container->getParameter('kernel.debug') || !$container->hasAlias('logger')) {
53-
$container->removeDefinition('messenger.middleware.debug.logging');
56+
foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
57+
if ($container->hasParameter($busMiddlewaresParameter = $busId.'.middlewares')) {
58+
$this->registerBusMiddlewares($container, $busId, $container->getParameter($busMiddlewaresParameter));
59+
60+
$container->getParameterBag()->remove($busMiddlewaresParameter);
61+
}
62+
63+
if ($container->hasDefinition('messenger.data_collector')) {
64+
$this->registerBusToCollector($container, $busId, $tags[0]);
65+
}
5466
}
5567

5668
$this->registerReceivers($container);
@@ -110,7 +122,7 @@ private function registerHandlers(ContainerBuilder $container)
110122
$handlersLocatorMapping['handler.'.$message] = $handler;
111123
}
112124

113-
$handlerResolver = $container->getDefinition($this->messageHandlerResolverService);
125+
$handlerResolver = $container->getDefinition('messenger.handler_resolver');
114126
$handlerResolver->replaceArgument(0, ServiceLocatorTagPass::register($container, $handlersLocatorMapping));
115127
}
116128

@@ -149,7 +161,7 @@ private function guessHandledClasses(\ReflectionClass $handlerClass, string $ser
149161
private function registerReceivers(ContainerBuilder $container)
150162
{
151163
$receiverMapping = array();
152-
foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) {
164+
foreach ($container->findTaggedServiceIds($this->receiverTag) as $id => $tags) {
153165
foreach ($tags as $tag) {
154166
$receiverMapping[$id] = new Reference($id);
155167

@@ -165,7 +177,7 @@ private function registerReceivers(ContainerBuilder $container)
165177
private function registerSenders(ContainerBuilder $container)
166178
{
167179
$senderLocatorMapping = array();
168-
foreach ($container->findTaggedServiceIds('messenger.sender') as $id => $tags) {
180+
foreach ($container->findTaggedServiceIds($this->senderTag) as $id => $tags) {
169181
foreach ($tags as $tag) {
170182
$senderLocatorMapping[$id] = new Reference($id);
171183

@@ -177,4 +189,35 @@ private function registerSenders(ContainerBuilder $container)
177189

178190
$container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping);
179191
}
192+
193+
private function registerBusToCollector(ContainerBuilder $container, string $busId, array $tag)
194+
{
195+
$container->setDefinition(
196+
$tracedBusId = 'debug.traced.'.$busId,
197+
(new Definition(TraceableMessageBus::class, array(new Reference($tracedBusId.'.inner'))))->setDecoratedService($busId)
198+
);
199+
200+
$container->getDefinition('messenger.data_collector')->addMethodCall('registerBus', array($tag['name'] ?? $busId, new Reference($tracedBusId)));
201+
}
202+
203+
private function registerBusMiddlewares(ContainerBuilder $container, string $busId, array $middlewares)
204+
{
205+
$container->getDefinition($busId)->replaceArgument(0, array_map(function (string $name) use ($container, $busId) {
206+
if (!$container->has($messengerMiddlewareId = 'messenger.middleware.'.$name)) {
207+
$messengerMiddlewareId = $name;
208+
}
209+
210+
if (!$container->has($messengerMiddlewareId)) {
211+
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $name));
212+
}
213+
214+
if ($container->getDefinition($messengerMiddlewareId)->isAbstract()) {
215+
$childDefinition = new ChildDefinition($messengerMiddlewareId);
216+
217+
$container->setDefinition($messengerMiddlewareId = $busId.'.middleware.'.$name, $childDefinition);
218+
}
219+
220+
return new Reference($messengerMiddlewareId);
221+
}, $middlewares));
222+
}
180223
}

Debug/LoggingMiddleware.php renamed to Middleware/LoggingMiddleware.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
namespace Symfony\Component\Messenger\Debug;
12+
namespace Symfony\Component\Messenger\Middleware;
1313

1414
use Symfony\Component\Messenger\MiddlewareInterface;
1515
use Psr\Log\LoggerInterface;

Middleware/TolerateNoHandler.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
15+
use Symfony\Component\Messenger\MiddlewareInterface;
16+
17+
/**
18+
* @author Samuel Roze <[email protected]>
19+
*/
20+
class TolerateNoHandler implements MiddlewareInterface
21+
{
22+
public function handle($message, callable $next)
23+
{
24+
try {
25+
return $next($message);
26+
} catch (NoHandlerForMessageException $e) {
27+
// We tolerate not having a handler for this message.
28+
}
29+
}
30+
}

Tests/DataCollector/MessengerDataCollectorTest.php

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
namespace Symfony\Component\Messenger\Tests\DataCollector;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\HttpFoundation\Request;
16+
use Symfony\Component\HttpFoundation\Response;
1517
use Symfony\Component\Messenger\DataCollector\MessengerDataCollector;
18+
use Symfony\Component\Messenger\MessageBusInterface;
1619
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
20+
use Symfony\Component\Messenger\TraceableMessageBus;
1721
use Symfony\Component\VarDumper\Test\VarDumperTestTrait;
1822

1923
/**
@@ -28,13 +32,18 @@ class MessengerDataCollectorTest extends TestCase
2832
*/
2933
public function testHandle($returnedValue, $expected)
3034
{
31-
$collector = new MessengerDataCollector();
3235
$message = new DummyMessage('dummy message');
3336

34-
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
35-
$next->expects($this->once())->method('__invoke')->with($message)->willReturn($returnedValue);
37+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
38+
$bus->method('dispatch')->with($message)->willReturn($returnedValue);
39+
$bus = new TraceableMessageBus($bus);
40+
41+
$collector = new MessengerDataCollector();
42+
$collector->registerBus('default', $bus);
43+
44+
$bus->dispatch($message);
3645

37-
$this->assertSame($returnedValue, $collector->handle($message, $next));
46+
$collector->collect(Request::create('/'), new Response());
3847

3948
$messages = $collector->getMessages();
4049
$this->assertCount(1, $messages);
@@ -45,6 +54,7 @@ public function testHandle($returnedValue, $expected)
4554
public function getHandleTestData()
4655
{
4756
$messageDump = <<<DUMP
57+
"bus" => "default"
4858
"message" => array:2 [
4959
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
5060
"object" => Symfony\Component\VarDumper\Cloner\Data {%A
@@ -56,7 +66,7 @@ public function getHandleTestData()
5666
yield 'no returned value' => array(
5767
null,
5868
<<<DUMP
59-
array:2 [
69+
array:3 [
6070
$messageDump
6171
"result" => array:2 [
6272
"type" => "NULL"
@@ -69,7 +79,7 @@ public function getHandleTestData()
6979
yield 'scalar returned value' => array(
7080
'returned value',
7181
<<<DUMP
72-
array:2 [
82+
array:3 [
7383
$messageDump
7484
"result" => array:2 [
7585
"type" => "string"
@@ -82,7 +92,7 @@ public function getHandleTestData()
8292
yield 'array returned value' => array(
8393
array('returned value'),
8494
<<<DUMP
85-
array:2 [
95+
array:3 [
8696
$messageDump
8797
"result" => array:2 [
8898
"type" => "array"
@@ -95,24 +105,29 @@ public function getHandleTestData()
95105

96106
public function testHandleWithException()
97107
{
98-
$collector = new MessengerDataCollector();
99108
$message = new DummyMessage('dummy message');
100109

101-
$expectedException = new \RuntimeException('foo');
102-
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
103-
$next->expects($this->once())->method('__invoke')->with($message)->willThrowException($expectedException);
110+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
111+
$bus->method('dispatch')->with($message)->will($this->throwException(new \RuntimeException('foo')));
112+
$bus = new TraceableMessageBus($bus);
113+
114+
$collector = new MessengerDataCollector();
115+
$collector->registerBus('default', $bus);
104116

105117
try {
106-
$collector->handle($message, $next);
107-
} catch (\Throwable $actualException) {
108-
$this->assertSame($expectedException, $actualException);
118+
$bus->dispatch($message);
119+
} catch (\Throwable $e) {
120+
// Ignore.
109121
}
110122

123+
$collector->collect(Request::create('/'), new Response());
124+
111125
$messages = $collector->getMessages();
112126
$this->assertCount(1, $messages);
113127

114128
$this->assertDumpMatchesFormat(<<<DUMP
115-
array:2 [
129+
array:3 [
130+
"bus" => "default"
116131
"message" => array:2 [
117132
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
118133
"object" => Symfony\Component\VarDumper\Cloner\Data {%A

0 commit comments

Comments
 (0)