Skip to content

Commit 6ee878f

Browse files
monteirochalasr
authored andcommitted
Messenger multiple failed transports
1 parent d2ba040 commit 6ee878f

12 files changed

+253
-18
lines changed

DependencyInjection/Configuration.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,10 @@ function ($a) {
13131313
->prototype('variable')
13141314
->end()
13151315
->end()
1316+
->scalarNode('failure_transport')
1317+
->defaultNull()
1318+
->info('Transport name to send failed messages to (after all retries have failed).')
1319+
->end()
13161320
->arrayNode('retry_strategy')
13171321
->addDefaultsIfNotSet()
13181322
->beforeNormalization()

DependencyInjection/FrameworkExtension.php

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1915,15 +1915,38 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19151915
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
19161916
}
19171917

1918+
$failureTransports = [];
1919+
if ($config['failure_transport']) {
1920+
if (!isset($config['transports'][$config['failure_transport']])) {
1921+
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
1922+
}
1923+
1924+
$container->setAlias('messenger.failure_transports.default', 'messenger.transport.'.$config['failure_transport']);
1925+
$failureTransports[] = $config['failure_transport'];
1926+
}
1927+
1928+
$failureTransportsByName = [];
1929+
foreach ($config['transports'] as $name => $transport) {
1930+
if ($transport['failure_transport']) {
1931+
$failureTransports[] = $transport['failure_transport'];
1932+
$failureTransportsByName[$name] = $transport['failure_transport'];
1933+
} elseif ($config['failure_transport']) {
1934+
$failureTransportsByName[$name] = $config['failure_transport'];
1935+
}
1936+
}
1937+
19181938
$senderAliases = [];
19191939
$transportRetryReferences = [];
19201940
foreach ($config['transports'] as $name => $transport) {
19211941
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
1922-
19231942
$transportDefinition = (new Definition(TransportInterface::class))
19241943
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
19251944
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
1926-
->addTag('messenger.receiver', ['alias' => $name])
1945+
->addTag('messenger.receiver', [
1946+
'alias' => $name,
1947+
'is_failure_transport' => \in_array($name, $failureTransports),
1948+
]
1949+
)
19271950
;
19281951
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
19291952
$senderAliases[$name] = $transportId;
@@ -1954,6 +1977,18 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19541977
$senderReferences[$serviceId] = new Reference($serviceId);
19551978
}
19561979

1980+
foreach ($config['transports'] as $name => $transport) {
1981+
if ($transport['failure_transport']) {
1982+
if (!isset($senderReferences[$transport['failure_transport']])) {
1983+
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport']));
1984+
}
1985+
}
1986+
}
1987+
1988+
$failureTransportReferencesByTransportName = array_map(function ($failureTransportName) use ($senderReferences) {
1989+
return $senderReferences[$failureTransportName];
1990+
}, $failureTransportsByName);
1991+
19571992
$messageToSendersMapping = [];
19581993
foreach ($config['routing'] as $message => $messageConfiguration) {
19591994
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
@@ -1984,19 +2019,17 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19842019
$container->getDefinition('messenger.retry_strategy_locator')
19852020
->replaceArgument(0, $transportRetryReferences);
19862021

1987-
if ($config['failure_transport']) {
1988-
if (!isset($senderReferences[$config['failure_transport']])) {
1989-
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
1990-
}
1991-
1992-
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1993-
->replaceArgument(0, $senderReferences[$config['failure_transport']]);
2022+
if (\count($failureTransports) > 0) {
19942023
$container->getDefinition('console.command.messenger_failed_messages_retry')
19952024
->replaceArgument(0, $config['failure_transport']);
19962025
$container->getDefinition('console.command.messenger_failed_messages_show')
19972026
->replaceArgument(0, $config['failure_transport']);
19982027
$container->getDefinition('console.command.messenger_failed_messages_remove')
19992028
->replaceArgument(0, $config['failure_transport']);
2029+
2030+
$failureTransportsByTransportNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportReferencesByTransportName);
2031+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
2032+
->replaceArgument(0, $failureTransportsByTransportNameServiceLocator);
20002033
} else {
20012034
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
20022035
$container->removeDefinition('console.command.messenger_failed_messages_retry');

Resources/config/console.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@
165165

166166
->set('console.command.messenger_failed_messages_retry', FailedMessagesRetryCommand::class)
167167
->args([
168-
abstract_arg('Receiver name'),
169-
abstract_arg('Receiver'),
168+
abstract_arg('Default failure receiver name'),
169+
abstract_arg('Receivers'),
170170
service('messenger.routable_message_bus'),
171171
service('event_dispatcher'),
172172
service('logger'),
@@ -175,15 +175,15 @@
175175

176176
->set('console.command.messenger_failed_messages_show', FailedMessagesShowCommand::class)
177177
->args([
178-
abstract_arg('Receiver name'),
179-
abstract_arg('Receiver'),
178+
abstract_arg('Default failure receiver name'),
179+
abstract_arg('Receivers'),
180180
])
181181
->tag('console.command')
182182

183183
->set('console.command.messenger_failed_messages_remove', FailedMessagesRemoveCommand::class)
184184
->args([
185-
abstract_arg('Receiver name'),
186-
abstract_arg('Receiver'),
185+
abstract_arg('Default failure receiver name'),
186+
abstract_arg('Receivers'),
187187
])
188188
->tag('console.command')
189189

Resources/config/messenger.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172

173173
->set('messenger.failure.send_failed_message_to_failure_transport_listener', SendFailedMessageToFailureTransportListener::class)
174174
->args([
175-
abstract_arg('failure transport'),
175+
abstract_arg('failure transports'),
176176
service('logger')->ignoreOnInvalid(),
177177
])
178178
->tag('kernel.event_subscriber')

Resources/config/schema/symfony-1.0.xsd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,7 @@
491491
<xsd:attribute name="name" type="xsd:string" />
492492
<xsd:attribute name="serializer" type="xsd:string" />
493493
<xsd:attribute name="dsn" type="xsd:string" />
494+
<xsd:attribute name="failure-transport" type="xsd:string" />
494495
</xsd:complexType>
495496

496497
<xsd:complexType name="messenger_retry_strategy">
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'transports' => [
6+
'transport_1' => [
7+
'dsn' => 'null://',
8+
'failure_transport' => 'failure_transport_1'
9+
],
10+
'transport_2' => 'null://',
11+
'transport_3' => [
12+
'dsn' => 'null://',
13+
'failure_transport' => 'failure_transport_3'
14+
],
15+
'failure_transport_1' => 'null://',
16+
'failure_transport_3' => 'null://'
17+
],
18+
],
19+
]);
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'failure_transport' => 'failure_transport_global',
6+
'transports' => [
7+
'transport_1' => [
8+
'dsn' => 'null://',
9+
'failure_transport' => 'failure_transport_1'
10+
],
11+
'transport_2' => 'null://',
12+
'transport_3' => [
13+
'dsn' => 'null://',
14+
'failure_transport' => 'failure_transport_3'
15+
],
16+
'failure_transport_global' => 'null://',
17+
'failure_transport_1' => 'null://',
18+
'failure_transport_3' => 'null://',
19+
],
20+
],
21+
]);
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger>
10+
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_1" dsn="null://" />
14+
<framework:transport name="failure_transport_3" dsn="null://" />
15+
</framework:messenger>
16+
</framework:config>
17+
</container>
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger failure-transport="failure_transport_global">
10+
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_global" dsn="null://" />
14+
<framework:transport name="failure_transport_1" dsn="null://" />
15+
<framework:transport name="failure_transport_3" dsn="null://" />
16+
</framework:messenger>
17+
</framework:config>
18+
</container>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
framework:
2+
messenger:
3+
transports:
4+
transport_1:
5+
dsn: 'null://'
6+
failure_transport: failure_transport_1
7+
transport_2: 'null://'
8+
transport_3:
9+
dsn: 'null://'
10+
failure_transport: failure_transport_3
11+
failure_transport_1: 'null://'
12+
failure_transport_3: 'null://'

0 commit comments

Comments
 (0)