Skip to content

Commit 7843a18

Browse files
committed
feature #37759 [Messenger] - Add option to confirm message delivery in Amqp connection (scyzoryck)
This PR was merged into the 5.2-dev branch. Discussion ---------- [Messenger] - Add option to confirm message delivery in Amqp connection | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | - | License | MIT | Doc PR | - Hello! My first PR here. Sometimes we need to be sure that messages are delivered to Amqp. To make it work we need to wait for the confirmation from the Amqp server. So let's wait for it confirmation if confirmation timeout is defined. Default behaviour are not modified - waiting for confirmation will impact performance of sending message. Commits ------- 5682d76b2a Messenger - Add option to confirm message delivery in Amqp connection
2 parents 3881773 + 4f07433 commit 7843a18

File tree

4 files changed

+70
-2
lines changed

4 files changed

+70
-2
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,8 @@ CHANGELOG
66

77
* Introduced the AMQP bridge.
88
* Deprecated use of invalid options
9+
10+
5.2.0
11+
-----
12+
13+
* Add option to confirm message delivery

Tests/Fixtures/long_receiver.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
require_once $autoload;
1414

1515
use Symfony\Component\EventDispatcher\EventDispatcher;
16+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver;
17+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
1618
use Symfony\Component\Messenger\Envelope;
1719
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
1820
use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;
1921
use Symfony\Component\Messenger\MessageBusInterface;
20-
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver;
21-
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
2222
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
2323
use Symfony\Component\Messenger\Worker;
2424
use Symfony\Component\Serializer as SerializerComponent;

Tests/Transport/ConnectionTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,51 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
625625
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
626626
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', \AMQP_IMMEDIATE, ['delivery_mode' => 2]));
627627
}
628+
629+
public function testItPublishMessagesWithoutWaitingForConfirmation()
630+
{
631+
$factory = new TestAmqpFactory(
632+
$amqpConnection = $this->createMock(\AMQPConnection::class),
633+
$amqpChannel = $this->createMock(\AMQPChannel::class),
634+
$amqpQueue = $this->createMock(\AMQPQueue::class),
635+
$amqpExchange = $this->createMock(\AMQPExchange::class)
636+
);
637+
638+
$amqpChannel->expects($this->never())->method('waitForConfirm')->with(0.5);
639+
640+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
641+
$connection->publish('body');
642+
}
643+
644+
public function testSetChannelToConfirmMessage()
645+
{
646+
$factory = new TestAmqpFactory(
647+
$amqpConnection = $this->createMock(\AMQPConnection::class),
648+
$amqpChannel = $this->createMock(\AMQPChannel::class),
649+
$amqpQueue = $this->createMock(\AMQPQueue::class),
650+
$amqpExchange = $this->createMock(\AMQPExchange::class)
651+
);
652+
653+
$amqpChannel->expects($this->once())->method('confirmSelect');
654+
$amqpChannel->expects($this->once())->method('setConfirmCallback');
655+
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
656+
$connection->setup();
657+
}
658+
659+
public function testItCanPublishAndWaitForConfirmation()
660+
{
661+
$factory = new TestAmqpFactory(
662+
$amqpConnection = $this->createMock(\AMQPConnection::class),
663+
$amqpChannel = $this->createMock(\AMQPChannel::class),
664+
$amqpQueue = $this->createMock(\AMQPQueue::class),
665+
$amqpExchange = $this->createMock(\AMQPExchange::class)
666+
);
667+
668+
$amqpChannel->expects($this->once())->method('waitForConfirm')->with(0.5);
669+
670+
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
671+
$connection->publish('body');
672+
}
628673
}
629674

630675
class TestAmqpFactory extends AmqpFactory

Transport/Connection.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class Connection
5050
'heartbeat',
5151
'read_timeout',
5252
'write_timeout',
53+
'confirm_timeout',
5354
'connect_timeout',
5455
'cacert',
5556
'cert',
@@ -128,6 +129,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
128129
* * read_timeout: Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
129130
* * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
130131
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
132+
* * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
131133
* * queues[name]: An array of queues, keyed by the name
132134
* * binding_keys: The binding keys (if any) to bind to this queue
133135
* * binding_arguments: Arguments to be used while binding the queue.
@@ -325,6 +327,10 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
325327
$amqpStamp ? $amqpStamp->getFlags() : \AMQP_NOPARAM,
326328
$attributes
327329
);
330+
331+
if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
332+
$this->channel()->waitForConfirm((float) $this->connectionOptions['confirm_timeout']);
333+
}
328334
}
329335

330336
private function setupDelay(int $delay, ?string $routingKey)
@@ -478,6 +484,18 @@ public function channel(): \AMQPChannel
478484
if (isset($this->connectionOptions['prefetch_count'])) {
479485
$this->amqpChannel->setPrefetchCount($this->connectionOptions['prefetch_count']);
480486
}
487+
488+
if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
489+
$this->amqpChannel->confirmSelect();
490+
$this->amqpChannel->setConfirmCallback(
491+
static function (): bool {
492+
return false;
493+
},
494+
static function (): bool {
495+
return false;
496+
}
497+
);
498+
}
481499
}
482500

483501
return $this->amqpChannel;

0 commit comments

Comments
 (0)