Skip to content

Commit 4f07433

Browse files
author
scyzoryck
committed
Messenger - Add option to confirm message delivery in Amqp connection
1 parent 5526e45 commit 4f07433

File tree

4 files changed

+70
-2
lines changed

4 files changed

+70
-2
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,8 @@ CHANGELOG
66

77
* Introduced the AMQP bridge.
88
* Deprecated use of invalid options
9+
10+
5.2.0
11+
-----
12+
13+
* Add option to confirm message delivery

Tests/Fixtures/long_receiver.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
require_once $autoload;
1414

1515
use Symfony\Component\EventDispatcher\EventDispatcher;
16+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver;
17+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
1618
use Symfony\Component\Messenger\Envelope;
1719
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
1820
use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;
1921
use Symfony\Component\Messenger\MessageBusInterface;
20-
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver;
21-
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
2222
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
2323
use Symfony\Component\Messenger\Worker;
2424
use Symfony\Component\Serializer as SerializerComponent;

Tests/Transport/ConnectionTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,51 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
625625
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
626626
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
627627
}
628+
629+
public function testItPublishMessagesWithoutWaitingForConfirmation()
630+
{
631+
$factory = new TestAmqpFactory(
632+
$amqpConnection = $this->createMock(\AMQPConnection::class),
633+
$amqpChannel = $this->createMock(\AMQPChannel::class),
634+
$amqpQueue = $this->createMock(\AMQPQueue::class),
635+
$amqpExchange = $this->createMock(\AMQPExchange::class)
636+
);
637+
638+
$amqpChannel->expects($this->never())->method('waitForConfirm')->with(0.5);
639+
640+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
641+
$connection->publish('body');
642+
}
643+
644+
public function testSetChannelToConfirmMessage()
645+
{
646+
$factory = new TestAmqpFactory(
647+
$amqpConnection = $this->createMock(\AMQPConnection::class),
648+
$amqpChannel = $this->createMock(\AMQPChannel::class),
649+
$amqpQueue = $this->createMock(\AMQPQueue::class),
650+
$amqpExchange = $this->createMock(\AMQPExchange::class)
651+
);
652+
653+
$amqpChannel->expects($this->once())->method('confirmSelect');
654+
$amqpChannel->expects($this->once())->method('setConfirmCallback');
655+
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
656+
$connection->setup();
657+
}
658+
659+
public function testItCanPublishAndWaitForConfirmation()
660+
{
661+
$factory = new TestAmqpFactory(
662+
$amqpConnection = $this->createMock(\AMQPConnection::class),
663+
$amqpChannel = $this->createMock(\AMQPChannel::class),
664+
$amqpQueue = $this->createMock(\AMQPQueue::class),
665+
$amqpExchange = $this->createMock(\AMQPExchange::class)
666+
);
667+
668+
$amqpChannel->expects($this->once())->method('waitForConfirm')->with(0.5);
669+
670+
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
671+
$connection->publish('body');
672+
}
628673
}
629674

630675
class TestAmqpFactory extends AmqpFactory

Transport/Connection.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class Connection
5050
'heartbeat',
5151
'read_timeout',
5252
'write_timeout',
53+
'confirm_timeout',
5354
'connect_timeout',
5455
'cacert',
5556
'cert',
@@ -128,6 +129,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
128129
* * read_timeout: Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
129130
* * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
130131
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
132+
* * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
131133
* * queues[name]: An array of queues, keyed by the name
132134
* * binding_keys: The binding keys (if any) to bind to this queue
133135
* * binding_arguments: Arguments to be used while binding the queue.
@@ -325,6 +327,10 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
325327
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
326328
$attributes
327329
);
330+
331+
if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
332+
$this->channel()->waitForConfirm((float) $this->connectionOptions['confirm_timeout']);
333+
}
328334
}
329335

330336
private function setupDelay(int $delay, ?string $routingKey)
@@ -478,6 +484,18 @@ public function channel(): \AMQPChannel
478484
if (isset($this->connectionOptions['prefetch_count'])) {
479485
$this->amqpChannel->setPrefetchCount($this->connectionOptions['prefetch_count']);
480486
}
487+
488+
if ('' !== ($this->connectionOptions['confirm_timeout'] ?? '')) {
489+
$this->amqpChannel->confirmSelect();
490+
$this->amqpChannel->setConfirmCallback(
491+
static function (): bool {
492+
return false;
493+
},
494+
static function (): bool {
495+
return false;
496+
}
497+
);
498+
}
481499
}
482500

483501
return $this->amqpChannel;

0 commit comments

Comments
 (0)