Skip to content

Commit f18789f

Browse files
committed
feature symfony#53508 [Console][Messenger] Asynchronously notify transports which messages are still being processed (HypeMC)
This PR was merged into the 7.2 branch. Discussion ---------- [Console][Messenger] Asynchronously notify transports which messages are still being processed | Q | A | ------------- | --- | Branch? | 7.2 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | - | License | MIT Certain transports, such as Beanstalkd, have a timeout that determines how long a certain message is considered as "in progress". If the message is not acknowledged or rejected within that time period, it is returned to the "ready" queue. This could cause potential issues when it takes a handler longer to finish processing the message. In those instances, the message being processed could be returned to the ready queue and taken by another instance of the receiver. This would result in the message being processed multiple times, or even worse, in an infinite loop. Usually, transports that have a timeout also have the option to be notified that the message is still being processed, so the timeout can be postponed. The idea of this PR is to utilize the `SignalableCommandInterface` and `pcntl_alarm()` to notify transports which messages are still being processed. Since the `SignalRegistry` has [asynchronous signals enabled](https://github.com/symfony/symfony/blob/34915f6e16f04537eb18d9d2c303ec375e63cc4b/src/Symfony/Component/Console/SignalRegistry/SignalRegistry.php#L21) by default, the whole process would happen asynchronously. I've named this feature "keepalive" for lack of a better name. Currently, I've added this option only to the Beanstalkd transport since that's the one I'm familiar with and use, but as far as I was able to gather, at least one other transport supports this feature. Amazon SQS has a visibility timeout which can be [increased](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#changing-message-visibility-timeout). This visibility timeout seems to be the same thing as Beanstalkd's TTR. (Disclaimer: I've never used Amazon SQS, so if I got something wrong, please let me know.) I've split the PR into two commits so it would (hopefully) be easier to review: 1. I've extracted the first commit into a separate PR: symfony#53533 2. The second commit adds the keepalive feature to the Messenger component. Commits ------- 57b556a [Messenger] Notify transports which messages are still being processed, using `pcntl_alarm()`
2 parents 6864dbe + 57b556a commit f18789f

File tree

15 files changed

+263
-8
lines changed

15 files changed

+263
-8
lines changed

src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.2
5+
---
6+
7+
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Beanstalkd that the job is still being processed, in order to avoid timeouts
8+
49
5.2.0
510
-----
611

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1717
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1818
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
19+
use Symfony\Component\Messenger\Envelope;
1920
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -78,6 +79,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7879
$receiver->get();
7980
}
8081

82+
public function testKeepalive()
83+
{
84+
$serializer = $this->createSerializer();
85+
86+
$connection = $this->createMock(Connection::class);
87+
$connection->expects($this->once())->method('keepalive')->with(1);
88+
89+
$receiver = new BeanstalkdReceiver($connection, $serializer);
90+
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
91+
}
92+
8193
private function createBeanstalkdEnvelope(): array
8294
{
8395
return [

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdTransportTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransport;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
@@ -50,6 +51,18 @@ public function testReceivesMessages()
5051
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5152
}
5253

54+
public function testKeepalive()
55+
{
56+
$transport = $this->getTransport(
57+
null,
58+
$connection = $this->createMock(Connection::class),
59+
);
60+
61+
$connection->expects($this->once())->method('keepalive')->with(1);
62+
63+
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
64+
}
65+
5366
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): BeanstalkdTransport
5467
{
5568
$serializer ??= $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,37 @@ public function testSendWhenABeanstalkdExceptionOccurs()
330330

331331
$connection->send($body, $headers, $delay);
332332
}
333+
334+
public function testKeepalive()
335+
{
336+
$id = 123456;
337+
338+
$tube = 'baz';
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
342+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
343+
344+
$connection = new Connection(['tube_name' => $tube], $client);
345+
346+
$connection->keepalive((string) $id);
347+
}
348+
349+
public function testKeepaliveWhenABeanstalkdExceptionOccurs()
350+
{
351+
$id = 123456;
352+
353+
$tube = 'baz123';
354+
355+
$exception = new ServerException('baz error');
356+
357+
$client = $this->createMock(PheanstalkInterface::class);
358+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
359+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
360+
361+
$connection = new Connection(['tube_name' => $tube], $client);
362+
363+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
364+
$connection->keepalive((string) $id);
365+
}
333366
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
18-
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1919
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2020
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2121

2222
/**
2323
* @author Antonio Pauletich <[email protected]>
2424
*/
25-
class BeanstalkdReceiver implements ReceiverInterface, MessageCountAwareInterface
25+
class BeanstalkdReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface
2626
{
2727
private SerializerInterface $serializer;
2828

@@ -65,6 +65,11 @@ public function reject(Envelope $envelope): void
6565
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
6666
}
6767

68+
public function keepalive(Envelope $envelope): void
69+
{
70+
$this->connection->keepalive($this->findBeanstalkdReceivedStamp($envelope)->getId());
71+
}
72+
6873
public function getMessageCount(): int
6974
{
7075
return $this->connection->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1718
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -20,7 +21,7 @@
2021
/**
2122
* @author Antonio Pauletich <[email protected]>
2223
*/
23-
class BeanstalkdTransport implements TransportInterface, MessageCountAwareInterface
24+
class BeanstalkdTransport implements TransportInterface, KeepaliveReceiverInterface, MessageCountAwareInterface
2425
{
2526
private SerializerInterface $serializer;
2627
private BeanstalkdReceiver $receiver;
@@ -48,6 +49,11 @@ public function reject(Envelope $envelope): void
4849
$this->getReceiver()->reject($envelope);
4950
}
5051

52+
public function keepalive(Envelope $envelope): void
53+
{
54+
$this->getReceiver()->keepalive($envelope);
55+
}
56+
5157
public function getMessageCount(): int
5258
{
5359
return $this->getReceiver()->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ public function reject(string $id): void
180180
}
181181
}
182182

183+
public function keepalive(string $id): void
184+
{
185+
try {
186+
$this->client->useTube($this->tube)->touch(new JobId((int) $id));
187+
} catch (Exception $exception) {
188+
throw new TransportException($exception->getMessage(), 0, $exception);
189+
}
190+
}
191+
183192
public function getMessageCount(): int
184193
{
185194
try {

src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php": ">=8.2",
1616
"pda/pheanstalk": "^4.0",
17-
"symfony/messenger": "^6.4|^7.0"
17+
"symfony/messenger": "^7.2"
1818
},
1919
"require-dev": {
2020
"symfony/property-access": "^6.4|^7.0",

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ CHANGELOG
1010
* Add `--format` option to the `messenger:stats` command
1111
* Add `getRetryDelay()` method to `RecoverableExceptionInterface`
1212
* Add `skip` option to `messenger:failed:retry` command when run interactively to skip message and requeue it
13+
* Add the ability to asynchronously notify transports about which messages are still being processed by the worker, using `pcntl_alarm()`
1314

1415
7.1
1516
---

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
4444
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
4545
{
46+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
47+
4648
private ?Worker $worker = null;
4749

4850
public function __construct(
@@ -75,6 +77,7 @@ protected function configure(): void
7577
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
7678
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
7779
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
80+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
7881
])
7982
->setHelp(<<<'EOF'
8083
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -124,6 +127,13 @@ protected function configure(): void
124127
;
125128
}
126129

130+
protected function initialize(InputInterface $input, OutputInterface $output): void
131+
{
132+
if ($input->hasParameterOption('--keepalive')) {
133+
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
134+
}
135+
}
136+
127137
protected function interact(InputInterface $input, OutputInterface $output): void
128138
{
129139
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
@@ -264,7 +274,7 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
264274

265275
public function getSubscribedSignals(): array
266276
{
267-
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT] : []);
277+
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []);
268278
}
269279

270280
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
@@ -273,6 +283,14 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
273283
return false;
274284
}
275285

286+
if (\SIGALRM === $signal) {
287+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
288+
289+
$this->worker->keepalive();
290+
291+
return false;
292+
}
293+
276294
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
277295

278296
$this->worker->stop();

0 commit comments

Comments
 (0)