Skip to content

Commit 7d3e54f

Browse files
committed
bug #39608 [Messenger] Setup queues once in AMQP (jderusse)
This PR was merged into the 5.3-dev branch. Discussion ---------- [Messenger] Setup queues once in AMQP | Q | A | ------------- | --- | Branch? | 5.x | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | Fix #39605, Fix #38092, Fix #32172 | License | MIT | Doc PR | - To ease the setup, this PR also merge setup of exchange AND delayExchange. /cc @Nyholm Commits ------- c2e84c610a Setup queues once in AMQP
2 parents 805b6e1 + d5bc035 commit 7d3e54f

File tree

2 files changed

+34
-30
lines changed

2 files changed

+34
-30
lines changed

Tests/Transport/ConnectionTest.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,24 @@ public function testItCanDisableTheSetup()
421421
$connection->publish('body');
422422
}
423423

424+
public function testItSetupQueuesOnce()
425+
{
426+
$factory = new TestAmqpFactory(
427+
$amqpConnection = $this->createMock(\AMQPConnection::class),
428+
$amqpChannel = $this->createMock(\AMQPChannel::class),
429+
$amqpQueue = $this->createMock(\AMQPQueue::class),
430+
$amqpExchange = $this->createMock(\AMQPExchange::class)
431+
);
432+
433+
$amqpExchange->expects($this->once())->method('declareExchange');
434+
$amqpQueue->expects($this->once())->method('declareQueue');
435+
$amqpQueue->expects($this->once())->method('bind');
436+
437+
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => true], $factory);
438+
$connection->publish('body');
439+
$connection->publish('body');
440+
}
441+
424442
public function testSetChannelPrefetchWhenSetup()
425443
{
426444
$factory = new TestAmqpFactory(

Transport/Connection.php

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ class Connection
7979
private $exchangeOptions;
8080
private $queuesOptions;
8181
private $amqpFactory;
82+
private $autoSetupExchange;
83+
private $autoSetup;
8284

8385
/**
8486
* @var \AMQPChannel|null
@@ -112,6 +114,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
112114
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
113115
],
114116
], $connectionOptions);
117+
$this->autoSetupExchange = $this->autoSetup = $connectionOptions['auto_setup'] ?? true;
115118
$this->exchangeOptions = $exchangeOptions;
116119
$this->queuesOptions = $queuesOptions;
117120
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
@@ -207,6 +210,9 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
207210
$exchangeOptions = $amqpOptions['exchange'];
208211
$queuesOptions = $amqpOptions['queues'];
209212
unset($amqpOptions['queues'], $amqpOptions['exchange']);
213+
if (isset($amqpOptions['auto_setup'])) {
214+
$amqpOptions['auto_setup'] = filter_var($amqpOptions['auto_setup'], \FILTER_VALIDATE_BOOLEAN);
215+
}
210216

211217
$queuesOptions = array_map(function ($queueOptions) {
212218
if (!\is_array($queueOptions)) {
@@ -285,7 +291,7 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, A
285291
return;
286292
}
287293

288-
if ($this->shouldSetup()) {
294+
if ($this->autoSetupExchange) {
289295
$this->setupExchangeAndQueues();
290296
}
291297

@@ -347,7 +353,7 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
347353

348354
private function setupDelay(int $delay, ?string $routingKey)
349355
{
350-
if ($this->shouldSetup()) {
356+
if ($this->autoSetup) {
351357
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
352358
}
353359

@@ -418,23 +424,12 @@ public function get(string $queueName): ?\AMQPEnvelope
418424
{
419425
$this->clearWhenDisconnected();
420426

421-
if ($this->shouldSetup()) {
427+
if ($this->autoSetupExchange) {
422428
$this->setupExchangeAndQueues();
423429
}
424430

425-
try {
426-
if (false !== $message = $this->queue($queueName)->get()) {
427-
return $message;
428-
}
429-
} catch (\AMQPQueueException $e) {
430-
if (404 === $e->getCode() && $this->shouldSetup()) {
431-
// If we get a 404 for the queue, it means we need to set up the exchange & queue.
432-
$this->setupExchangeAndQueues();
433-
434-
return $this->get($queueName);
435-
}
436-
437-
throw $e;
431+
if (false !== $message = $this->queue($queueName)->get()) {
432+
return $message;
438433
}
439434

440435
return null;
@@ -452,8 +447,11 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AM
452447

453448
public function setup(): void
454449
{
455-
$this->setupExchangeAndQueues();
450+
if ($this->autoSetupExchange) {
451+
$this->setupExchangeAndQueues();
452+
}
456453
$this->getDelayExchange()->declareExchange();
454+
$this->autoSetup = false;
457455
}
458456

459457
private function setupExchangeAndQueues(): void
@@ -466,6 +464,7 @@ private function setupExchangeAndQueues(): void
466464
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
467465
}
468466
}
467+
$this->autoSetupExchange = false;
469468
}
470469

471470
/**
@@ -558,19 +557,6 @@ private function clearWhenDisconnected(): void
558557
}
559558
}
560559

561-
private function shouldSetup(): bool
562-
{
563-
if (!\array_key_exists('auto_setup', $this->connectionOptions)) {
564-
return true;
565-
}
566-
567-
if (\in_array($this->connectionOptions['auto_setup'], [false, 'false'], true)) {
568-
return false;
569-
}
570-
571-
return true;
572-
}
573-
574560
private function getDefaultPublishRoutingKey(): ?string
575561
{
576562
return $this->exchangeOptions['default_publish_routing_key'] ?? null;

0 commit comments

Comments
 (0)