Skip to content

Commit daa4994

Browse files
committed
[Mesenger] Add support for resetting container services after each messenger message.
Without this patch, services are not resetted. For example Monolog Finger Cross handler is never reset nor flushed. So if the first message trigger and "error" level message, all others message will log and overflow the buffer. Usage with framework: ```yaml framework: messenger: transports: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' reset_on_message: true failed: 'doctrine://default?queue_name=failed' sync: 'sync://' ```
1 parent baa5768 commit daa4994

File tree

9 files changed

+33
-1
lines changed

9 files changed

+33
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ CHANGELOG
1010
* Deprecate `get()`, `has()`, `getDoctrine()`, and `dispatchMessage()` in `AbstractController`, use method/constructor injection instead
1111
* Add `MicroKernelTrait::getBundlesPath` method to get bundles config path
1212
* Deprecate the `cache.adapter.doctrine` service
13+
* Add support for resetting container services after each messenger message.
1314

1415
5.3
1516
---

DependencyInjection/Configuration.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,6 +1333,10 @@ function ($a) {
13331333
->fixXmlConfig('option')
13341334
->children()
13351335
->scalarNode('dsn')->end()
1336+
->booleanNode('reset_on_message')
1337+
->defaultFalse()
1338+
->info('Reset container services after each message. Turn it on when the transport is async and run in a worker.')
1339+
->end()
13361340
->scalarNode('serializer')->defaultNull()->info('Service id of a custom serializer to use.')->end()
13371341
->arrayNode('options')
13381342
->normalizeKeys(false)

DependencyInjection/FrameworkExtension.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2013,6 +2013,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20132013

20142014
$senderAliases = [];
20152015
$transportRetryReferences = [];
2016+
$transportNamesForResetServices = [];
20162017
foreach ($config['transports'] as $name => $transport) {
20172018
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
20182019
$transportDefinition = (new Definition(TransportInterface::class))
@@ -2041,6 +2042,18 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20412042

20422043
$transportRetryReferences[$name] = new Reference($retryServiceId);
20432044
}
2045+
if ($transport['reset_on_message']) {
2046+
$transportNamesForResetServices[] = $name;
2047+
}
2048+
}
2049+
2050+
if ($transportNamesForResetServices) {
2051+
$container
2052+
->getDefinition('messenger.listener.reset_services')
2053+
->replaceArgument(1, $transportNamesForResetServices)
2054+
;
2055+
} else {
2056+
$container->removeDefinition('messenger.listener.reset_services');
20442057
}
20452058

20462059
$senderReferences = [];

Resources/config/messenger.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21+
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2122
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2223
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
2324
use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener;
@@ -195,6 +196,12 @@
195196
->tag('kernel.event_subscriber')
196197

197198
->set('messenger.listener.stop_worker_on_stop_exception_listener', StopWorkerOnCustomStopExceptionListener::class)
199+
200+
->set('messenger.listener.reset_services', ResetServicesListener::class)
201+
->args([
202+
service('services_resetter'),
203+
abstract_arg('receivers names'),
204+
])
198205
->tag('kernel.event_subscriber')
199206

200207
->set('messenger.routable_message_bus', RoutableMessageBus::class)

Resources/config/schema/symfony-1.0.xsd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,7 @@
505505
<xsd:attribute name="serializer" type="xsd:string" />
506506
<xsd:attribute name="dsn" type="xsd:string" />
507507
<xsd:attribute name="failure-transport" type="xsd:string" />
508+
<xsd:attribute name="reset-on-message" type="xsd:boolean" />
508509
</xsd:complexType>
509510

510511
<xsd:complexType name="messenger_retry_strategy">

Tests/DependencyInjection/Fixtures/php/messenger_transports.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
'default' => 'amqp://localhost/%2f/messages',
1212
'customised' => [
1313
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
14+
'reset_on_message' => true,
1415
'options' => ['queue' => ['name' => 'Queue']],
1516
'serializer' => 'messenger.transport.native_php_serializer',
1617
'retry_strategy' => [

Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<framework:messenger failure-transport="failed">
1111
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
1212
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
13-
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer">
13+
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer" reset-on-message="true">
1414
<framework:options>
1515
<framework:queue>
1616
<framework:name>Queue</framework:name>

Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ framework:
88
default: 'amqp://localhost/%2f/messages'
99
customised:
1010
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
11+
reset_on_message: true
1112
options:
1213
queue:
1314
name: Queue

Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ public function testMessenger()
723723
$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
724724
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
725725
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
726+
$this->assertFalse($container->hasDefinition('messenger.listener.reset_services'));
726727
}
727728

728729
public function testMessengerMultipleFailureTransports()
@@ -867,6 +868,9 @@ public function testMessengerTransports()
867868
return array_shift($values);
868869
}, $failureTransports);
869870
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
871+
872+
$this->assertTrue($container->hasDefinition('messenger.listener.reset_services'));
873+
$this->assertSame(['customised'], $container->getDefinition('messenger.listener.reset_services')->getArgument(1));
870874
}
871875

872876
public function testMessengerRouting()

0 commit comments

Comments
 (0)