Skip to content

Commit 46098d2

Browse files
committed
feature symfony#46257 [Messenger] Add AMQP exchange to exchange bindings (Samik081)
This PR was squashed before being merged into the 7.4 branch. Discussion ---------- [Messenger] Add AMQP exchange to exchange bindings ## Q/A | Q | A | ------------- | --- | Branch? | 7.4 | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | | License | MIT | Doc PR | symfony/symfony-docs#16783 ## Changes description This PR introduces very similar changes to this one: symfony#34737, which was closed due to the lack of the feedback. I'd like to continue this topic, share the missing feedback and discuss it further if needed. I have introduced the possibility to configure `exchange-to-exchange` bindings in `amqp` transport. This feature uses `\AMQPExchange::bind()` method already provided by the stub in `php-amqp/php-amqp`: https://github.com/php-amqp/php-amqp/blob/bb7611220e341039a7f5d72e606ca1e16eda4642/stubs/AMQPExchange.php#L22 Example `messenger.yaml`: ```yaml framework: messenger: transports: some_transport: dsn: 'amqp://' options: exchange: type: topic name: some_exchange bindings: # added configuration another_exchange: binding_keys: - key1 - key2 binding_arguments: x-match: all ``` With the above configuration, the `Connection` class creates `some_exchange` and binds it to `another_exchange` using `['key1', 'key2']` keys and `[x-match =>'all']` arguments. ## Reasoning Binding an exchange to an exchange feature can be used to create more complex RabbitMQ topologies. It is also briefly described here: https://www.cloudamqp.com/blog/exchange-to-exchange-binding-in-rabbitmq.html A real-world example could be kind of RabbitMQ publisher/subscriber pattern implementation between microservices, that could be visualized as follows: ![rabbitmq](https://user-images.githubusercontent.com/13415865/166811854-7d8b50e9-85c4-448c-a514-1449b3778cae.png) In the above example (all the exchanges in this example are of the `topic` type): ``` App `Foo` publishes events to its own exchange AND subscribes to the events that app `Bar` publishes on its exchange. App `Bar` publishes events to its own exchange AND subscribes to the events that app `Foo` publishes on its exchange. App `...` subscribes to the events that apps `Foo` and `Bar` publish on their exchanges. ``` This approach might have few advantages, such as easier maintainability, dependency management and monitoring, or better separation between microservices. ### My thoughts I feel that the fact, that https://github.com/php-amqp/php-amqp has `\AMQPExchange::bind()` implemented is already sufficient reason to have this supported in `symfony/amqp-messenger`. I am aware this feature might be rarely used, but it's already there in the extension, and having the ability to use it within `amqp-messenger` seems reasonable to me. Commits ------- 9fd9049 [Messenger] Add AMQP exchange to exchange bindings
2 parents 84e2730 + 9fd9049 commit 46098d2

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ public function testItSetupsTheTTLConnection()
393393
$connection->publish('body');
394394
}
395395

396-
public function testBindingArguments()
396+
public function testQueueBindingArguments()
397397
{
398398
$amqpConnection = $this->createMock(\AMQPConnection::class);
399399
$amqpChannel = $this->createMock(\AMQPChannel::class);
@@ -418,6 +418,62 @@ public function testBindingArguments()
418418
$connection->publish('body');
419419
}
420420

421+
public function testExchangeBindingArguments()
422+
{
423+
$factory = new TestAmqpFactory(
424+
$this->createMock(\AMQPConnection::class),
425+
$this->createMock(\AMQPChannel::class),
426+
$this->createMock(\AMQPQueue::class),
427+
$amqpExchange = $this->createMock(\AMQPExchange::class)
428+
);
429+
430+
$amqpExchange->expects($this->once())->method('declareExchange');
431+
$amqpExchange->expects($this->exactly(4))->method('bind')
432+
->willReturnCallback(function (...$args) {
433+
static $series = [
434+
['exchange0', 'binding_key0', ['x-match' => 'all']],
435+
['exchange0', 'binding_key1', ['x-match' => 'all']],
436+
['exchange1', 'binding_key2', ['x-match' => 'any']],
437+
['exchange1', 'binding_key3', ['x-match' => 'any']],
438+
];
439+
440+
$expectedArgs = array_shift($series);
441+
$this->assertSame($expectedArgs, $args);
442+
})
443+
;
444+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
445+
446+
$dsn = 'amqp://localhost?exchange[type]=headers'.
447+
'&exchange[bindings][exchange0][binding_arguments][x-match]=all'.
448+
'&exchange[bindings][exchange0][binding_keys][0]=binding_key0'.
449+
'&exchange[bindings][exchange0][binding_keys][1]=binding_key1'.
450+
'&exchange[bindings][exchange1][binding_arguments][x-match]=any'.
451+
'&exchange[bindings][exchange1][binding_keys][0]=binding_key2'.
452+
'&exchange[bindings][exchange1][binding_keys][1]=binding_key3';
453+
454+
$connection = Connection::fromDsn($dsn, [], $factory);
455+
$connection->publish('body');
456+
}
457+
458+
public function testNoBindingKeysInExchangeBindings()
459+
{
460+
$this->expectException(\InvalidArgumentException::class);
461+
$this->expectExceptionMessage('The "binding_keys" option must be set to a non-empty array for exchange "exchange0".');
462+
463+
$factory = new TestAmqpFactory(
464+
$this->createMock(\AMQPConnection::class),
465+
$this->createMock(\AMQPChannel::class),
466+
$this->createMock(\AMQPQueue::class),
467+
$this->createMock(\AMQPExchange::class)
468+
);
469+
470+
$dsn = 'amqp://localhost?exchange[type]=headers'.
471+
'&exchange[bindings][exchange0][binding_arguments][x-match]=all';
472+
473+
$connection = Connection::fromDsn($dsn, [], $factory);
474+
$connection->publish('body');
475+
}
476+
421477
public function testItCanDisableTheSetup()
422478
{
423479
$factory = new TestAmqpFactory(

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class Connection
7979
'default_publish_routing_key',
8080
'flags',
8181
'arguments',
82+
'bindings',
8283
];
8384

8485
private AmqpFactory $amqpFactory;
@@ -140,6 +141,9 @@ public function __construct(
140141
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
141142
* * flags: Exchange flags (Default: AMQP_DURABLE)
142143
* * arguments: Extra arguments
144+
* * bindings[name]: An array of the source exchanges to bind this exchange to, keyed by the name. Binding direction: source exchange -> this exchange
145+
* * binding_keys: The binding/routing keys to be used for the binding
146+
* * binding_arguments: Additional binding arguments
143147
* * delay:
144148
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
145149
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
@@ -460,6 +464,15 @@ private function setupExchangeAndQueues(): void
460464
$exchange->declareExchange();
461465
}
462466

467+
foreach ($this->exchangeOptions['bindings'] ?? [] as $exchangeName => $exchangeConfig) {
468+
if (!\is_array($exchangeConfig['binding_keys'] ?? false) || !$exchangeConfig['binding_keys']) {
469+
throw new InvalidArgumentException(\sprintf('The "binding_keys" option must be set to a non-empty array for exchange "%s".', $exchangeName));
470+
}
471+
foreach ($exchangeConfig['binding_keys'] as $bindingKey) {
472+
$this->exchange()->bind($exchangeName, $bindingKey, $exchangeConfig['binding_arguments'] ?? []);
473+
}
474+
}
475+
463476
foreach ($this->queuesOptions as $queueName => $queueConfig) {
464477
$this->queue($queueName)->declareQueue();
465478
if ('' !== $this->exchangeOptions['name']) {

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ CHANGELOG
77
* Add `--exclude-receivers` option to the `messenger:consume command`
88
* Allow any `ServiceResetterInterface` implementation in `ResetServicesListener`
99
* Add `Symfony\Component\Messenger\Middleware\AddDefaultStampsMiddleware` and `Symfony\Component\Messenger\Message\DefaultStampsProviderInterface`
10+
* Add the possibility to configure exchange to exchange bindings in AMQP transport
1011

1112
7.3
1213
---

0 commit comments

Comments
 (0)