Skip to content

Commit 9fd9049

Browse files
Samik081fabpot
authored andcommitted
[Messenger] Add AMQP exchange to exchange bindings
1 parent 84e2730 commit 9fd9049

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ public function testItSetupsTheTTLConnection()
393393
$connection->publish('body');
394394
}
395395

396-
public function testBindingArguments()
396+
public function testQueueBindingArguments()
397397
{
398398
$amqpConnection = $this->createMock(\AMQPConnection::class);
399399
$amqpChannel = $this->createMock(\AMQPChannel::class);
@@ -418,6 +418,62 @@ public function testBindingArguments()
418418
$connection->publish('body');
419419
}
420420

421+
public function testExchangeBindingArguments()
422+
{
423+
$factory = new TestAmqpFactory(
424+
$this->createMock(\AMQPConnection::class),
425+
$this->createMock(\AMQPChannel::class),
426+
$this->createMock(\AMQPQueue::class),
427+
$amqpExchange = $this->createMock(\AMQPExchange::class)
428+
);
429+
430+
$amqpExchange->expects($this->once())->method('declareExchange');
431+
$amqpExchange->expects($this->exactly(4))->method('bind')
432+
->willReturnCallback(function (...$args) {
433+
static $series = [
434+
['exchange0', 'binding_key0', ['x-match' => 'all']],
435+
['exchange0', 'binding_key1', ['x-match' => 'all']],
436+
['exchange1', 'binding_key2', ['x-match' => 'any']],
437+
['exchange1', 'binding_key3', ['x-match' => 'any']],
438+
];
439+
440+
$expectedArgs = array_shift($series);
441+
$this->assertSame($expectedArgs, $args);
442+
})
443+
;
444+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
445+
446+
$dsn = 'amqp://localhost?exchange[type]=headers'.
447+
'&exchange[bindings][exchange0][binding_arguments][x-match]=all'.
448+
'&exchange[bindings][exchange0][binding_keys][0]=binding_key0'.
449+
'&exchange[bindings][exchange0][binding_keys][1]=binding_key1'.
450+
'&exchange[bindings][exchange1][binding_arguments][x-match]=any'.
451+
'&exchange[bindings][exchange1][binding_keys][0]=binding_key2'.
452+
'&exchange[bindings][exchange1][binding_keys][1]=binding_key3';
453+
454+
$connection = Connection::fromDsn($dsn, [], $factory);
455+
$connection->publish('body');
456+
}
457+
458+
public function testNoBindingKeysInExchangeBindings()
459+
{
460+
$this->expectException(\InvalidArgumentException::class);
461+
$this->expectExceptionMessage('The "binding_keys" option must be set to a non-empty array for exchange "exchange0".');
462+
463+
$factory = new TestAmqpFactory(
464+
$this->createMock(\AMQPConnection::class),
465+
$this->createMock(\AMQPChannel::class),
466+
$this->createMock(\AMQPQueue::class),
467+
$this->createMock(\AMQPExchange::class)
468+
);
469+
470+
$dsn = 'amqp://localhost?exchange[type]=headers'.
471+
'&exchange[bindings][exchange0][binding_arguments][x-match]=all';
472+
473+
$connection = Connection::fromDsn($dsn, [], $factory);
474+
$connection->publish('body');
475+
}
476+
421477
public function testItCanDisableTheSetup()
422478
{
423479
$factory = new TestAmqpFactory(

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class Connection
7979
'default_publish_routing_key',
8080
'flags',
8181
'arguments',
82+
'bindings',
8283
];
8384

8485
private AmqpFactory $amqpFactory;
@@ -140,6 +141,9 @@ public function __construct(
140141
* * default_publish_routing_key: Routing key to use when publishing, if none is specified on the message
141142
* * flags: Exchange flags (Default: AMQP_DURABLE)
142143
* * arguments: Extra arguments
144+
* * bindings[name]: An array of the source exchanges to bind this exchange to, keyed by the name. Binding direction: source exchange -> this exchange
145+
* * binding_keys: The binding/routing keys to be used for the binding
146+
* * binding_arguments: Additional binding arguments
143147
* * delay:
144148
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
145149
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
@@ -460,6 +464,15 @@ private function setupExchangeAndQueues(): void
460464
$exchange->declareExchange();
461465
}
462466

467+
foreach ($this->exchangeOptions['bindings'] ?? [] as $exchangeName => $exchangeConfig) {
468+
if (!\is_array($exchangeConfig['binding_keys'] ?? false) || !$exchangeConfig['binding_keys']) {
469+
throw new InvalidArgumentException(\sprintf('The "binding_keys" option must be set to a non-empty array for exchange "%s".', $exchangeName));
470+
}
471+
foreach ($exchangeConfig['binding_keys'] as $bindingKey) {
472+
$this->exchange()->bind($exchangeName, $bindingKey, $exchangeConfig['binding_arguments'] ?? []);
473+
}
474+
}
475+
463476
foreach ($this->queuesOptions as $queueName => $queueConfig) {
464477
$this->queue($queueName)->declareQueue();
465478
if ('' !== $this->exchangeOptions['name']) {

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ CHANGELOG
77
* Add `--exclude-receivers` option to the `messenger:consume command`
88
* Allow any `ServiceResetterInterface` implementation in `ResetServicesListener`
99
* Add `Symfony\Component\Messenger\Middleware\AddDefaultStampsMiddleware` and `Symfony\Component\Messenger\Message\DefaultStampsProviderInterface`
10+
* Add the possibility to configure exchange to exchange bindings in AMQP transport
1011

1112
7.3
1213
---

0 commit comments

Comments
 (0)