Skip to content

Commit 1150b9d

Browse files
committed
feature #26632 [Messenger] Add AMQP adapter (sroze)
This PR was squashed before being merged into the 4.1-dev branch (closes #26632). Discussion ---------- [Messenger] Add AMQP adapter | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | ø | License | MIT - [x] Depends on the Messenger component #24411 - [x] Add tests once we are all happy about the structure --- In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the [enqueue adapter](https://github.com/sroze/enqueue-bridge) can also be used. Configuring the adapter is as simple as the following configuration: ```yaml # config/packages/messenger_adapters.yaml framework: messenger: adapter: "%env(MESSENGER_DSN)%" ``` With the given `.env` for example: ``` MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages ``` Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter. ```yaml # config/packages/messenger_routes.yaml framework: messenger: routing: producer). 'App\Message\Command\CreateNumber': messenger.default_sender ``` --- Additionally, multiple adapters can be created and messages routed to these ones. ```yaml # config/packages/messenger_routes.yaml framework: messenger: adapters: commands: "amqp://guest:guest@localhost:5672/%2f/commands" maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance" routing: producer). 'App\Message\Command\CreateNumber': messenger.commands_sender 'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender ``` Commits ------- 798c230ad5 [Messenger] Add AMQP adapter
2 parents 7c221a9 + 23c1653 commit 1150b9d

File tree

9 files changed

+138
-0
lines changed

9 files changed

+138
-0
lines changed

DependencyInjection/Configuration.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,12 +971,17 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
971971
->arrayNode('messenger')
972972
->info('Messenger configuration')
973973
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
974+
->fixXmlConfig('adapter')
974975
->children()
975976
->arrayNode('routing')
976977
->useAttributeAsKey('message_class')
977978
->beforeNormalization()
978979
->always()
979980
->then(function ($config) {
981+
if (!is_array($config)) {
982+
return array();
983+
}
984+
980985
$newConfig = array();
981986
foreach ($config as $k => $v) {
982987
if (!is_int($k)) {
@@ -1011,6 +1016,28 @@ function ($a) {
10111016
->end()
10121017
->end()
10131018
->end()
1019+
->arrayNode('adapters')
1020+
->useAttributeAsKey('name')
1021+
->arrayPrototype()
1022+
->beforeNormalization()
1023+
->ifString()
1024+
->then(function (string $dsn) {
1025+
return array('dsn' => $dsn);
1026+
})
1027+
->end()
1028+
->fixXmlConfig('option')
1029+
->children()
1030+
->scalarNode('dsn')->end()
1031+
->arrayNode('options')
1032+
->normalizeKeys(false)
1033+
->useAttributeAsKey('name')
1034+
->defaultValue(array())
1035+
->prototype('variable')
1036+
->end()
1037+
->end()
1038+
->end()
1039+
->end()
1040+
->end()
10141041
->end()
10151042
->end()
10161043
->end()

DependencyInjection/FrameworkExtension.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,6 +1468,24 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
14681468
} else {
14691469
$container->removeDefinition('messenger.middleware.validator');
14701470
}
1471+
1472+
foreach ($config['adapters'] as $name => $adapter) {
1473+
$container->setDefinition('messenger.sender.'.$name, (new Definition(SenderInterface::class))->setFactory(array(
1474+
new Reference('messenger.adapter_factory'),
1475+
'createSender',
1476+
))->setArguments(array(
1477+
$adapter['dsn'],
1478+
$adapter['options'],
1479+
))->addTag('messenger.sender'));
1480+
1481+
$container->setDefinition('messenger.receiver.'.$name, (new Definition(ReceiverInterface::class))->setFactory(array(
1482+
new Reference('messenger.adapter_factory'),
1483+
'createReceiver',
1484+
))->setArguments(array(
1485+
$adapter['dsn'],
1486+
$adapter['options'],
1487+
))->addTag('messenger.receiver'));
1488+
}
14711489
}
14721490

14731491
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

Resources/config/messenger.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,18 @@
7272
<tag name="container.service_locator" />
7373
<argument type="collection" />
7474
</service>
75+
76+
<!-- Adapters -->
77+
<service id="messenger.adapter_factory" class="Symfony\Component\Messenger\Adapter\Factory\ChainAdapterFactory">
78+
<argument type="tagged" tag="messenger.adapter_factory" />
79+
</service>
80+
81+
<service id="messenger.adapter.amqp.factory" class="Symfony\Component\Messenger\Adapter\AmqpExt\AmqpAdapterFactory">
82+
<argument type="service" id="messenger.transport.default_encoder" />
83+
<argument type="service" id="messenger.transport.default_decoder" />
84+
<argument>%kernel.debug%</argument>
85+
86+
<tag name="messenger.adapter_factory" />
87+
</service>
7588
</services>
7689
</container>

Resources/config/schema/symfony-1.0.xsd

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@
354354
<xsd:sequence>
355355
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
356356
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
357+
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
357358
</xsd:sequence>
358359
</xsd:complexType>
359360

@@ -368,6 +369,19 @@
368369
<xsd:attribute name="service" type="xsd:string" use="required"/>
369370
</xsd:complexType>
370371

372+
<xsd:complexType name="messenger_adapter">
373+
<xsd:sequence>
374+
<xsd:element name="option" type="messenger_adapter_option" minOccurs="0" maxOccurs="unbounded" />
375+
</xsd:sequence>
376+
<xsd:attribute name="name" type="xsd:string" />
377+
<xsd:attribute name="dsn" type="xsd:string" />
378+
</xsd:complexType>
379+
380+
<xsd:complexType name="messenger_adapter_option">
381+
<xsd:attribute name="name" type="xsd:string" />
382+
<xsd:attribute name="value" type="xsd:string" />
383+
</xsd:complexType>
384+
371385
<xsd:complexType name="messenger_middleware">
372386
<xsd:sequence>
373387
<xsd:element name="validation" type="messenger_validation" minOccurs="0" maxOccurs="1" />

Tests/DependencyInjection/ConfigurationTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
258258
'enabled' => !class_exists(FullStack::class),
259259
),
260260
),
261+
'adapters' => array(),
261262
),
262263
);
263264
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', array(
4+
'messenger' => array(
5+
'adapters' => array(
6+
'default' => 'amqp://localhost/%2f/messages',
7+
'customised' => array(
8+
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
9+
'options' => array('queue_name' => 'Queue'),
10+
),
11+
),
12+
),
13+
));
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger>
10+
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
11+
<framework:adapter name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
12+
<framework:option name="queue_name" value="Queue" />
13+
</framework:adapter>
14+
</framework:messenger>
15+
</framework:config>
16+
</container>
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
framework:
2+
messenger:
3+
adapters:
4+
default: 'amqp://localhost/%2f/messages'
5+
customised:
6+
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
7+
options:
8+
queue_name: Queue

Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ public function testWebLink()
523523
public function testMessenger()
524524
{
525525
$container = $this->createContainerFromFile('messenger');
526+
$this->assertTrue($container->hasDefinition('message_bus'));
526527
$this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction'));
527528
}
528529

@@ -538,6 +539,33 @@ public function testMessengerValidationDisabled()
538539
$this->assertFalse($container->hasDefinition('messenger.middleware.validator'));
539540
}
540541

542+
public function testMessengerAdapter()
543+
{
544+
$container = $this->createContainerFromFile('messenger_adapter');
545+
$this->assertTrue($container->hasDefinition('messenger.sender.default'));
546+
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
547+
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
548+
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));
549+
550+
$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
551+
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
552+
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();
553+
554+
$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createSender'), $senderFactory);
555+
$this->assertCount(2, $senderArguments);
556+
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
557+
$this->assertEquals(array('queue_name' => 'Queue'), $senderArguments[1]);
558+
559+
$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
560+
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
561+
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();
562+
563+
$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createReceiver'), $receiverFactory);
564+
$this->assertCount(2, $receiverArguments);
565+
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
566+
$this->assertEquals(array('queue_name' => 'Queue'), $receiverArguments[1]);
567+
}
568+
541569
public function testTranslator()
542570
{
543571
$container = $this->createContainerFromFile('full');

0 commit comments

Comments
 (0)