Skip to content

Commit e5b5e23

Browse files
committed
feature #26864 [Messenger] Define multiple buses from the framework.messenger.buses configuration (sroze)
This PR was squashed before being merged into the 4.1-dev branch (closes #26864). Discussion ---------- [Messenger] Define multiple buses from the `framework.messenger.buses` configuration | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #26652 | License | MIT | Doc PR | symfony/symfony-docs#9617 Not everybody will benefit from having only one bus, especially with the CQRS-like usages. While keeping the extremely use of use of the single bus, this PR has the following: - Create multiple buses from the YAML configuration - Tag middleware only a specific buses - Register middlewares from the YAML configuration Even if it's visible in the PR's tests, here's how it will look like, for a completely full-customised version: ```yaml framework: messenger: default_bus: commands buses: commands: ~ events: middlewares: - validation - route_messages - "Your\\Middleware\\Service" - call_message_handler ``` A few things to note: 1. The YAML configuration creates `messenger.bus.[name]` services for the buses. 2. The YAML configuration for middleware just adds tags to the corresponding middlewares. 3. If the middleware definition does not exists, it creates it. (without any magic on the arguments though, if it isn't auto-wirable, well... "your problem") 4. In the PR, there is this "TolerateNoHandler" middleware that is a great example for event buses Commits ------- e5deb8499b [Messenger] Define multiple buses from the `framework.messenger.buses` configuration
2 parents d5f69f2 + 5e9b17c commit e5b5e23

File tree

10 files changed

+364
-48
lines changed

10 files changed

+364
-48
lines changed

DataCollector/MessengerDataCollector.php

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,34 @@
1414
use Symfony\Component\HttpFoundation\Request;
1515
use Symfony\Component\HttpFoundation\Response;
1616
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
17-
use Symfony\Component\Messenger\MiddlewareInterface;
17+
use Symfony\Component\Messenger\TraceableMessageBus;
1818

1919
/**
2020
* @author Samuel Roze <[email protected]>
2121
*
2222
* @experimental in 4.1
2323
*/
24-
class MessengerDataCollector extends DataCollector implements MiddlewareInterface
24+
class MessengerDataCollector extends DataCollector
2525
{
26+
private $traceableBuses = array();
27+
28+
public function registerBus(string $name, TraceableMessageBus $bus)
29+
{
30+
$this->traceableBuses[$name] = $bus;
31+
}
32+
2633
/**
2734
* {@inheritdoc}
2835
*/
2936
public function collect(Request $request, Response $response, \Exception $exception = null)
3037
{
31-
// noop
38+
$this->data = array('messages' => array());
39+
40+
foreach ($this->traceableBuses as $busName => $bus) {
41+
foreach ($bus->getDispatchedMessages() as $message) {
42+
$this->data['messages'][] = $this->collectMessage($busName, $message);
43+
}
44+
}
3245
}
3346

3447
/**
@@ -47,21 +60,20 @@ public function reset()
4760
$this->data = array();
4861
}
4962

50-
/**
51-
* {@inheritdoc}
52-
*/
53-
public function handle($message, callable $next)
63+
private function collectMessage(string $busName, array $tracedMessage)
5464
{
65+
$message = $tracedMessage['message'];
66+
5567
$debugRepresentation = array(
68+
'bus' => $busName,
5669
'message' => array(
5770
'type' => \get_class($message),
5871
'object' => $this->cloneVar($message),
5972
),
6073
);
6174

62-
$exception = null;
63-
try {
64-
$result = $next($message);
75+
if (array_key_exists('result', $tracedMessage)) {
76+
$result = $tracedMessage['result'];
6577

6678
if (\is_object($result)) {
6779
$debugRepresentation['result'] = array(
@@ -79,20 +91,18 @@ public function handle($message, callable $next)
7991
'value' => $result,
8092
);
8193
}
82-
} catch (\Throwable $exception) {
94+
}
95+
96+
if (isset($tracedMessage['exception'])) {
97+
$exception = $tracedMessage['exception'];
98+
8399
$debugRepresentation['exception'] = array(
84100
'type' => \get_class($exception),
85101
'message' => $exception->getMessage(),
86102
);
87103
}
88104

89-
$this->data['messages'][] = $debugRepresentation;
90-
91-
if (null !== $exception) {
92-
throw $exception;
93-
}
94-
95-
return $result;
105+
return $debugRepresentation;
96106
}
97107

98108
public function getMessages(): array

DependencyInjection/MessengerPass.php

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\DependencyInjection;
1313

14+
use Symfony\Component\DependencyInjection\ChildDefinition;
1415
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
1516
use Symfony\Component\DependencyInjection\Compiler\PriorityTaggedServiceTrait;
1617
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
@@ -21,6 +22,7 @@
2122
use Symfony\Component\Messenger\Handler\ChainHandler;
2223
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
2324
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
25+
use Symfony\Component\Messenger\TraceableMessageBus;
2426

2527
/**
2628
* @author Samuel Roze <[email protected]>
@@ -29,28 +31,38 @@ class MessengerPass implements CompilerPassInterface
2931
{
3032
use PriorityTaggedServiceTrait;
3133

32-
private $messageBusService;
33-
private $messageHandlerResolverService;
3434
private $handlerTag;
35+
private $busTag;
36+
private $senderTag;
37+
private $receiverTag;
3538

36-
public function __construct(string $messageBusService = 'message_bus', string $messageHandlerResolverService = 'messenger.handler_resolver', string $handlerTag = 'messenger.message_handler')
39+
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver')
3740
{
38-
$this->messageBusService = $messageBusService;
39-
$this->messageHandlerResolverService = $messageHandlerResolverService;
4041
$this->handlerTag = $handlerTag;
42+
$this->busTag = $busTag;
43+
$this->senderTag = $senderTag;
44+
$this->receiverTag = $receiverTag;
4145
}
4246

4347
/**
4448
* {@inheritdoc}
4549
*/
4650
public function process(ContainerBuilder $container)
4751
{
48-
if (!$container->hasDefinition($this->messageBusService)) {
52+
if (!$container->hasDefinition('messenger.handler_resolver')) {
4953
return;
5054
}
5155

52-
if (!$container->getParameter('kernel.debug') || !$container->hasAlias('logger')) {
53-
$container->removeDefinition('messenger.middleware.debug.logging');
56+
foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
57+
if ($container->hasParameter($busMiddlewaresParameter = $busId.'.middlewares')) {
58+
$this->registerBusMiddlewares($container, $busId, $container->getParameter($busMiddlewaresParameter));
59+
60+
$container->getParameterBag()->remove($busMiddlewaresParameter);
61+
}
62+
63+
if ($container->hasDefinition('messenger.data_collector')) {
64+
$this->registerBusToCollector($container, $busId, $tags[0]);
65+
}
5466
}
5567

5668
$this->registerReceivers($container);
@@ -110,7 +122,7 @@ private function registerHandlers(ContainerBuilder $container)
110122
$handlersLocatorMapping['handler.'.$message] = $handler;
111123
}
112124

113-
$handlerResolver = $container->getDefinition($this->messageHandlerResolverService);
125+
$handlerResolver = $container->getDefinition('messenger.handler_resolver');
114126
$handlerResolver->replaceArgument(0, ServiceLocatorTagPass::register($container, $handlersLocatorMapping));
115127
}
116128

@@ -149,7 +161,7 @@ private function guessHandledClasses(\ReflectionClass $handlerClass, string $ser
149161
private function registerReceivers(ContainerBuilder $container)
150162
{
151163
$receiverMapping = array();
152-
foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) {
164+
foreach ($container->findTaggedServiceIds($this->receiverTag) as $id => $tags) {
153165
foreach ($tags as $tag) {
154166
$receiverMapping[$id] = new Reference($id);
155167

@@ -165,7 +177,7 @@ private function registerReceivers(ContainerBuilder $container)
165177
private function registerSenders(ContainerBuilder $container)
166178
{
167179
$senderLocatorMapping = array();
168-
foreach ($container->findTaggedServiceIds('messenger.sender') as $id => $tags) {
180+
foreach ($container->findTaggedServiceIds($this->senderTag) as $id => $tags) {
169181
foreach ($tags as $tag) {
170182
$senderLocatorMapping[$id] = new Reference($id);
171183

@@ -177,4 +189,35 @@ private function registerSenders(ContainerBuilder $container)
177189

178190
$container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping);
179191
}
192+
193+
private function registerBusToCollector(ContainerBuilder $container, string $busId, array $tag)
194+
{
195+
$container->setDefinition(
196+
$tracedBusId = 'debug.traced.'.$busId,
197+
(new Definition(TraceableMessageBus::class, array(new Reference($tracedBusId.'.inner'))))->setDecoratedService($busId)
198+
);
199+
200+
$container->getDefinition('messenger.data_collector')->addMethodCall('registerBus', array($tag['name'] ?? $busId, new Reference($tracedBusId)));
201+
}
202+
203+
private function registerBusMiddlewares(ContainerBuilder $container, string $busId, array $middlewares)
204+
{
205+
$container->getDefinition($busId)->replaceArgument(0, array_map(function (string $name) use ($container, $busId) {
206+
if (!$container->has($messengerMiddlewareId = 'messenger.middleware.'.$name)) {
207+
$messengerMiddlewareId = $name;
208+
}
209+
210+
if (!$container->has($messengerMiddlewareId)) {
211+
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $name));
212+
}
213+
214+
if ($container->getDefinition($messengerMiddlewareId)->isAbstract()) {
215+
$childDefinition = new ChildDefinition($messengerMiddlewareId);
216+
217+
$container->setDefinition($messengerMiddlewareId = $busId.'.middleware.'.$name, $childDefinition);
218+
}
219+
220+
return new Reference($messengerMiddlewareId);
221+
}, $middlewares));
222+
}
180223
}

Debug/LoggingMiddleware.php renamed to Middleware/LoggingMiddleware.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
namespace Symfony\Component\Messenger\Debug;
12+
namespace Symfony\Component\Messenger\Middleware;
1313

1414
use Symfony\Component\Messenger\MiddlewareInterface;
1515
use Psr\Log\LoggerInterface;

Middleware/TolerateNoHandler.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
15+
use Symfony\Component\Messenger\MiddlewareInterface;
16+
17+
/**
18+
* @author Samuel Roze <[email protected]>
19+
*/
20+
class TolerateNoHandler implements MiddlewareInterface
21+
{
22+
public function handle($message, callable $next)
23+
{
24+
try {
25+
return $next($message);
26+
} catch (NoHandlerForMessageException $e) {
27+
// We tolerate not having a handler for this message.
28+
}
29+
}
30+
}

Tests/DataCollector/MessengerDataCollectorTest.php

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212
namespace Symfony\Component\Messenger\Tests\DataCollector;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\HttpFoundation\Request;
16+
use Symfony\Component\HttpFoundation\Response;
1517
use Symfony\Component\Messenger\DataCollector\MessengerDataCollector;
18+
use Symfony\Component\Messenger\MessageBusInterface;
1619
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
20+
use Symfony\Component\Messenger\TraceableMessageBus;
1721
use Symfony\Component\VarDumper\Test\VarDumperTestTrait;
1822

1923
/**
@@ -28,13 +32,18 @@ class MessengerDataCollectorTest extends TestCase
2832
*/
2933
public function testHandle($returnedValue, $expected)
3034
{
31-
$collector = new MessengerDataCollector();
3235
$message = new DummyMessage('dummy message');
3336

34-
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
35-
$next->expects($this->once())->method('__invoke')->with($message)->willReturn($returnedValue);
37+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
38+
$bus->method('dispatch')->with($message)->willReturn($returnedValue);
39+
$bus = new TraceableMessageBus($bus);
40+
41+
$collector = new MessengerDataCollector();
42+
$collector->registerBus('default', $bus);
43+
44+
$bus->dispatch($message);
3645

37-
$this->assertSame($returnedValue, $collector->handle($message, $next));
46+
$collector->collect(Request::create('/'), new Response());
3847

3948
$messages = $collector->getMessages();
4049
$this->assertCount(1, $messages);
@@ -45,6 +54,7 @@ public function testHandle($returnedValue, $expected)
4554
public function getHandleTestData()
4655
{
4756
$messageDump = <<<DUMP
57+
"bus" => "default"
4858
"message" => array:2 [
4959
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
5060
"object" => Symfony\Component\VarDumper\Cloner\Data {%A
@@ -56,7 +66,7 @@ public function getHandleTestData()
5666
yield 'no returned value' => array(
5767
null,
5868
<<<DUMP
59-
array:2 [
69+
array:3 [
6070
$messageDump
6171
"result" => array:2 [
6272
"type" => "NULL"
@@ -69,7 +79,7 @@ public function getHandleTestData()
6979
yield 'scalar returned value' => array(
7080
'returned value',
7181
<<<DUMP
72-
array:2 [
82+
array:3 [
7383
$messageDump
7484
"result" => array:2 [
7585
"type" => "string"
@@ -82,7 +92,7 @@ public function getHandleTestData()
8292
yield 'array returned value' => array(
8393
array('returned value'),
8494
<<<DUMP
85-
array:2 [
95+
array:3 [
8696
$messageDump
8797
"result" => array:2 [
8898
"type" => "array"
@@ -95,24 +105,29 @@ public function getHandleTestData()
95105

96106
public function testHandleWithException()
97107
{
98-
$collector = new MessengerDataCollector();
99108
$message = new DummyMessage('dummy message');
100109

101-
$expectedException = new \RuntimeException('foo');
102-
$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
103-
$next->expects($this->once())->method('__invoke')->with($message)->willThrowException($expectedException);
110+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
111+
$bus->method('dispatch')->with($message)->will($this->throwException(new \RuntimeException('foo')));
112+
$bus = new TraceableMessageBus($bus);
113+
114+
$collector = new MessengerDataCollector();
115+
$collector->registerBus('default', $bus);
104116

105117
try {
106-
$collector->handle($message, $next);
107-
} catch (\Throwable $actualException) {
108-
$this->assertSame($expectedException, $actualException);
118+
$bus->dispatch($message);
119+
} catch (\Throwable $e) {
120+
// Ignore.
109121
}
110122

123+
$collector->collect(Request::create('/'), new Response());
124+
111125
$messages = $collector->getMessages();
112126
$this->assertCount(1, $messages);
113127

114128
$this->assertDumpMatchesFormat(<<<DUMP
115-
array:2 [
129+
array:3 [
130+
"bus" => "default"
116131
"message" => array:2 [
117132
"type" => "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage"
118133
"object" => Symfony\Component\VarDumper\Cloner\Data {%A

0 commit comments

Comments
 (0)