Skip to content

Commit 06a7f25

Browse files
authored
amqp - message is delivered on lost hearbeat (#431)
publisher acks
1 parent 71736d0 commit 06a7f25

File tree

11 files changed

+126
-64
lines changed

11 files changed

+126
-64
lines changed

packages/Amqp/src/AmqpBackedMessageChannelBuilder.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@ public static function create(
4848
);
4949
}
5050

51+
private function getAmqpOutboundChannelAdapter(): AmqpOutboundChannelAdapterBuilder
52+
{
53+
return $this->outboundChannelAdapter;
54+
}
55+
56+
public function withPublisherAcknowledgments(bool $enabled): self
57+
{
58+
$this->getAmqpOutboundChannelAdapter()->withPublisherAcknowledgments($enabled);
59+
60+
return $this;
61+
}
62+
5163
public function getMessageChannelName(): string
5264
{
5365
return $this->channelName;

packages/Amqp/src/AmqpOutboundChannelAdapter.php

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66

77
use AMQPChannelException;
88
use AMQPConnectionException;
9+
use Ecotone\Amqp\Transaction\AmqpTransactionInterceptor;
910
use Ecotone\Enqueue\CachedConnectionFactory;
1011
use Ecotone\Messaging\Channel\PollableChannel\Serialization\OutboundMessageConverter;
1112
use Ecotone\Messaging\Conversion\ConversionService;
1213
use Ecotone\Messaging\Message;
1314
use Ecotone\Messaging\MessageHandler;
15+
use Enqueue\AmqpExt\AmqpContext;
1416
use Interop\Amqp\AmqpMessage;
1517
use Interop\Amqp\Impl\AmqpTopic;
1618

@@ -29,16 +31,18 @@ class AmqpOutboundChannelAdapter implements MessageHandler
2931
private $initialized = false;
3032

3133
public function __construct(
32-
private CachedConnectionFactory $connectionFactory,
33-
private AmqpAdmin $amqpAdmin,
34-
private string $exchangeName,
35-
private ?string $routingKey,
36-
private ?string $routingKeyFromHeaderName,
37-
private ?string $exchangeFromHeaderName,
38-
private bool $defaultPersistentDelivery,
39-
private bool $autoDeclare,
40-
private OutboundMessageConverter $outboundMessageConverter,
41-
private ConversionService $conversionService
34+
private CachedConnectionFactory $connectionFactory,
35+
private AmqpAdmin $amqpAdmin,
36+
private string $exchangeName,
37+
private ?string $routingKey,
38+
private ?string $routingKeyFromHeaderName,
39+
private ?string $exchangeFromHeaderName,
40+
private bool $defaultPersistentDelivery,
41+
private bool $autoDeclare,
42+
private bool $publisherAcknowledgments,
43+
private OutboundMessageConverter $outboundMessageConverter,
44+
private ConversionService $conversionService,
45+
private AmqpTransactionInterceptor $amqpTransactionInterceptor,
4246
) {
4347
}
4448

@@ -76,17 +80,22 @@ public function handle(Message $message): void
7680
$messageToSend
7781
->setDeliveryMode($this->defaultPersistentDelivery ? AmqpMessage::DELIVERY_MODE_PERSISTENT : AmqpMessage::DELIVERY_MODE_NON_PERSISTENT);
7882

79-
try {
80-
$this->connectionFactory->getProducer()
81-
->setTimeToLive($outboundMessage->getTimeToLive())
82-
->setDelayStrategy(new HeadersExchangeDelayStrategy())
83-
->setDeliveryDelay($outboundMessage->getDeliveryDelay())
83+
/** @var AmqpContext $context */
84+
$context = $this->connectionFactory->createContext();
85+
if ($this->publisherAcknowledgments && !$this->amqpTransactionInterceptor->isRunningInTransaction()) {
86+
/** Ensures no messages are lost along the way when heartbeat is lost and ensures messages was peristed on the Broker side. Without this message can be simply "swallowed" without throwing exception */
87+
$context->getExtChannel()->confirmSelect();
88+
}
89+
90+
$this->connectionFactory->getProducer()
91+
->setTimeToLive($outboundMessage->getTimeToLive())
92+
->setDelayStrategy(new HeadersExchangeDelayStrategy())
93+
->setDeliveryDelay($outboundMessage->getDeliveryDelay())
8494
// this allow for having queue per delay instead of queue per delay + exchangeName
85-
->send(new AmqpTopic($exchangeName), $messageToSend);
86-
} catch (AMQPConnectionException|AMQPChannelException $exception) {
87-
$this->connectionFactory->reconnect();
95+
->send(new AmqpTopic($exchangeName), $messageToSend);
8896

89-
throw $exception;
97+
if ($this->publisherAcknowledgments && !$this->amqpTransactionInterceptor->isRunningInTransaction()) {
98+
$context->getExtChannel()->waitForConfirm();
9099
}
91100
}
92101
}

packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Ecotone\Amqp;
66

7+
use Ecotone\Amqp\Transaction\AmqpTransactionInterceptor;
78
use Ecotone\Enqueue\CachedConnectionFactory;
89
use Ecotone\Enqueue\EnqueueOutboundChannelAdapterBuilder;
910
use Ecotone\Messaging\Channel\PollableChannel\Serialization\OutboundMessageConverter;
@@ -26,6 +27,7 @@ class AmqpOutboundChannelAdapterBuilder extends EnqueueOutboundChannelAdapterBui
2627
private string $exchangeName;
2728
private bool $defaultPersistentDelivery = self::DEFAULT_PERSISTENT_MODE;
2829
private array $staticHeadersToAdd = [];
30+
private bool $publisherAcknowledgments = true;
2931

3032
private function __construct(string $exchangeName, string $amqpConnectionFactoryReferenceName)
3133
{
@@ -56,6 +58,13 @@ public function withDefaultRoutingKey(string $routingKey): self
5658
return $this;
5759
}
5860

61+
public function withPublisherAcknowledgments(bool $publisherAcknowledgments): self
62+
{
63+
$this->publisherAcknowledgments = $publisherAcknowledgments;
64+
65+
return $this;
66+
}
67+
5968
/**
6069
* @param string $headerName
6170
*
@@ -125,8 +134,10 @@ public function compile(MessagingContainerBuilder $builder): Definition
125134
$this->exchangeFromHeader,
126135
$this->defaultPersistentDelivery,
127136
$this->autoDeclare,
137+
$this->publisherAcknowledgments,
128138
$outboundMessageConverter,
129139
new Reference(ConversionService::REFERENCE_NAME),
140+
Reference::to(AmqpTransactionInterceptor::class),
130141
]);
131142
}
132143
}

packages/Amqp/src/AmqpReconnectableConnectionFactory.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ public function createContext(): Context
3737
$this->reconnect();
3838
}
3939

40-
return $this->connectionFactory->createContext();
40+
$context = $this->connectionFactory->createContext();
41+
$context->getExtChannel()->setConfirmCallback(fn() => false, fn() => throw new \RuntimeException("Message was failed to be persisted in RabbitMQ instance. Check RabbitMQ server logs."));
42+
43+
return $context;
4144
}
4245

4346
public function getConnectionInstanceId(): string
@@ -63,6 +66,10 @@ public function isDisconnected(?Context $context): bool
6366

6467
Assert::isSubclassOf($context, AmqpContext::class, 'Context must be ' . AmqpContext::class);
6568

69+
if (!$context->getExtChannel()->getConnection()->isConnected()) {
70+
return true;
71+
}
72+
6673
return ! $context->getExtChannel()->isConnected();
6774
}
6875

packages/Amqp/src/Transaction/AmqpTransactionInterceptor.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public function transactional(
9090
$this->logger->info(
9191
'AMQP transaction was roll backed',
9292
$message,
93-
$exception
93+
[
94+
'exception' => $exception
95+
]
9496
);
9597

9698
throw $exception;
@@ -104,4 +106,9 @@ public function transactional(
104106
$this->isRunningTransaction = false;
105107
return $result;
106108
}
109+
110+
public function isRunningInTransaction(): bool
111+
{
112+
return $this->isRunningTransaction;
113+
}
107114
}

packages/Amqp/src/Transaction/AmqpTransactionModule.php

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,14 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
5151
$amqpConfiguration = ExtensionObjectResolver::resolveUnique(AmqpConfiguration::class, $extensionObjects, AmqpConfiguration::createWithDefaults());
5252
;
5353

54-
$isTransactionWrapperEnabled = false;
5554
if ($amqpConfiguration->isTransactionOnAsynchronousEndpoints()) {
5655
$pointcut .= '||' . AsynchronousRunningEndpoint::class;
57-
$isTransactionWrapperEnabled = true;
5856
}
5957
if ($amqpConfiguration->isTransactionOnCommandBus()) {
60-
$pointcut .= '||' . CommandBus::class . '';
61-
$isTransactionWrapperEnabled = true;
58+
$pointcut .= '||' . CommandBus::class;
6259
}
6360
if ($amqpConfiguration->isTransactionOnConsoleCommands()) {
64-
$pointcut .= '||' . ConsoleCommand::class . '';
65-
$isTransactionWrapperEnabled = true;
61+
$pointcut .= '||' . ConsoleCommand::class;
6662
}
6763
if ($amqpConfiguration->getDefaultConnectionReferenceNames()) {
6864
$connectionFactories = $amqpConfiguration->getDefaultConnectionReferenceNames();

packages/Amqp/tests/Configuration/AmqpModuleTest.php

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,39 +48,6 @@ public function test_registering_amqp_backed_message_channel()
4848
);
4949
}
5050

51-
public function test_registering_amqp_backed_message_channel_with_application_media_type()
52-
{
53-
$amqpChannelBuilder = AmqpBackedMessageChannelBuilder::create('amqpChannel');
54-
$messagingSystem = MessagingSystemConfiguration::prepareWithDefaults(
55-
InMemoryModuleMessaging::createWith(
56-
[AmqpModule::create(InMemoryAnnotationFinder::createEmpty(), InterfaceToCallRegistry::createEmpty())],
57-
[
58-
ServiceConfiguration::createWithDefaults()
59-
->withDefaultSerializationMediaType(MediaType::APPLICATION_JSON),
60-
$amqpChannelBuilder,
61-
]
62-
)
63-
)
64-
->registerMessageChannel($amqpChannelBuilder)
65-
->registerConverter(new ArrayToJsonConverterBuilder())
66-
->buildMessagingSystemFromConfiguration(
67-
InMemoryReferenceSearchService::createWith(
68-
[
69-
AmqpConnectionFactory::class => $this->getCachedConnectionFactory(),
70-
]
71-
)
72-
);
73-
74-
/** @var PollableChannel $channel */
75-
$channel = $messagingSystem->getMessageChannelByName('amqpChannel');
76-
$channel->send(MessageBuilder::withPayload([1, 2, 3])->setContentType(MediaType::createApplicationXPHPArray())->build());
77-
78-
$this->assertEquals(
79-
'[1,2,3]',
80-
$channel->receive()->getPayload()
81-
);
82-
}
83-
8451
public function test_registering_amqp_configuration()
8552
{
8653
$amqpExchange = AmqpExchange::createDirectExchange('exchange');

packages/Amqp/tests/Integration/AmqpMessageChannelTest.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,36 @@ public function test_sending_and_receiving_message_from_amqp_message_channel()
6161
$this->assertNull($messageChannel->receiveWithTimeout(1));
6262
}
6363

64+
public function test_sending_and_receiving_without_delivery_guarantee()
65+
{
66+
$queueName = Uuid::uuid4()->toString();
67+
$messagePayload = 'some';
68+
69+
$ecotoneLite = EcotoneLite::bootstrapForTesting(
70+
containerOrAvailableServices: [
71+
AmqpConnectionFactory::class => $this->getCachedConnectionFactory(),
72+
],
73+
configuration: ServiceConfiguration::createWithDefaults()
74+
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE]))
75+
->withExtensionObjects([
76+
AmqpBackedMessageChannelBuilder::create($queueName)
77+
->withPublisherAcknowledgments(false),
78+
])
79+
);
80+
81+
/** @var PollableChannel $messageChannel */
82+
$messageChannel = $ecotoneLite->getMessageChannelByName($queueName);
83+
84+
$messageChannel->send(MessageBuilder::withPayload($messagePayload)->build());
85+
86+
$this->assertEquals(
87+
'some',
88+
$messageChannel->receiveWithTimeout(100)->getPayload()
89+
);
90+
91+
$this->assertNull($messageChannel->receiveWithTimeout(1));
92+
}
93+
6494
public function test_sending_and_receiving_message_from_amqp_using_consumer()
6595
{
6696
$queueName = 'orders';

packages/Amqp/tests/Integration/GeneralAmqpTest.php

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Ecotone\Lite\Test\FlowTestSupport;
99
use Ecotone\Messaging\Config\ModulePackageList;
1010
use Ecotone\Messaging\Config\ServiceConfiguration;
11+
use Ecotone\Messaging\Handler\Logger\EchoLogger;
1112
use Enqueue\AmqpExt\AmqpConnectionFactory;
1213
use Test\Ecotone\Amqp\AmqpMessagingTestCase;
1314
use Test\Ecotone\Amqp\Fixture\Order\OrderErrorHandler;
@@ -42,6 +43,27 @@ public function test_products_are_on_list_after_being_ordered(): void
4243
);
4344
}
4445

46+
public function test_messages_are_delivered_after_lost_heartbeat(): void
47+
{
48+
$ecotone = $this->bootstrapEcotone(
49+
namespaces: ['Test\Ecotone\Amqp\Fixture\Order'],
50+
services: [new OrderService(), new OrderErrorHandler(),
51+
// 'logger' => new EchoLogger()
52+
],
53+
amqpConfig: ['heartbeat' => 2]
54+
);
55+
56+
$ecotone->sendCommandWithRoutingKey('order.register', 'milk');
57+
sleep(5);
58+
$ecotone->sendCommandWithRoutingKey('order.register', 'salt');
59+
sleep(5);
60+
$ecotone->sendCommandWithRoutingKey('order.register', 'sunflower');
61+
$ecotone->run('orders');
62+
$ecotone->run('orders');
63+
$ecotone->run('orders');
64+
self::assertEquals(['milk', 'salt', 'sunflower'], $ecotone->sendQueryWithRouting('order.getOrders'));
65+
}
66+
4567
public function test_adding_product_to_shopping_cart_with_publisher_and_consumer(): void
4668
{
4769
$ecotone = $this->bootstrapEcotone(
@@ -58,10 +80,10 @@ public function test_adding_product_to_shopping_cart_with_publisher_and_consumer
5880
);
5981
}
6082

61-
private function bootstrapEcotone(array $namespaces, array $services): FlowTestSupport
83+
private function bootstrapEcotone(array $namespaces, array $services, array $amqpConfig = []): FlowTestSupport
6284
{
6385
return EcotoneLite::bootstrapFlowTesting(
64-
containerOrAvailableServices: array_merge([AmqpConnectionFactory::class => $this->getCachedConnectionFactory()], $services),
86+
containerOrAvailableServices: array_merge([AmqpConnectionFactory::class => $this->getCachedConnectionFactory($amqpConfig)], $services),
6587
configuration: ServiceConfiguration::createWithDefaults()
6688
->withEnvironment('prod')
6789
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))

packages/Ecotone/src/Messaging/Channel/PollableChannel/SendRetries/SendRetryChannelInterceptor.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public function afterSendCompletion(Message $message, MessageChannel $messageCha
4141
if ($exception !== null) {
4242
$attempt = 1;
4343
while ($this->retryTemplate->canBeCalledNextTime($attempt)) {
44-
$this->logger->info("Message was not sent to {$this->relatedChannel} due to exception. Trying to self-heal by doing retry attempt: {$attempt}/{$this->retryTemplate->getMaxAttempts()}", [
44+
$this->logger->info("Message was not sent to {$this->relatedChannel} due to exception. Trying to self-heal by doing retry attempt: {$attempt}/{$this->retryTemplate->getMaxAttempts()}. Exception message: `{$exception->getMessage()}`", [
4545
'exception' => $exception->getMessage(),
4646
'relatedChannel' => $this->relatedChannel,
4747
]);
@@ -60,7 +60,7 @@ public function afterSendCompletion(Message $message, MessageChannel $messageCha
6060
}
6161
}
6262

63-
$this->logger->error("Message was not sent to {$this->relatedChannel} due to exception. No more retries will be done", [
63+
$this->logger->error("Message was not sent to {$this->relatedChannel} due to exception. No more retries will be done. Exception message: `{$exception->getMessage()}`", [
6464
'exception' => $exception->getMessage(),
6565
'relatedChannel' => $this->relatedChannel,
6666
]);

0 commit comments

Comments
 (0)