Skip to content

Commit 99fb5f1

Browse files
committed
[Messenger] Notify transports which messages are still being processed, using pcntl_alarm()
1 parent 0f5e29b commit 99fb5f1

File tree

8 files changed

+87
-4
lines changed

8 files changed

+87
-4
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.2
5+
---
6+
7+
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Beanstalkd that the job is still being processed, in order to avoid timeouts
8+
49
5.2.0
510
-----
611

Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1717
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1818
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
19+
use Symfony\Component\Messenger\Envelope;
1920
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -78,6 +79,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7879
$receiver->get();
7980
}
8081

82+
public function testKeepalive()
83+
{
84+
$serializer = $this->createSerializer();
85+
86+
$connection = $this->createMock(Connection::class);
87+
$connection->expects($this->once())->method('keepalive')->with(1);
88+
89+
$receiver = new BeanstalkdReceiver($connection, $serializer);
90+
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
91+
}
92+
8193
private function createBeanstalkdEnvelope(): array
8294
{
8395
return [

Tests/Transport/BeanstalkdTransportTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransport;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
@@ -50,6 +51,18 @@ public function testReceivesMessages()
5051
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5152
}
5253

54+
public function testKeepalive()
55+
{
56+
$transport = $this->getTransport(
57+
null,
58+
$connection = $this->createMock(Connection::class),
59+
);
60+
61+
$connection->expects($this->once())->method('keepalive')->with(1);
62+
63+
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
64+
}
65+
5366
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): BeanstalkdTransport
5467
{
5568
$serializer ??= $this->createMock(SerializerInterface::class);

Tests/Transport/ConnectionTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,37 @@ public function testSendWhenABeanstalkdExceptionOccurs()
330330

331331
$connection->send($body, $headers, $delay);
332332
}
333+
334+
public function testKeepalive()
335+
{
336+
$id = 123456;
337+
338+
$tube = 'baz';
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
342+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
343+
344+
$connection = new Connection(['tube_name' => $tube], $client);
345+
346+
$connection->keepalive((string) $id);
347+
}
348+
349+
public function testKeepaliveWhenABeanstalkdExceptionOccurs()
350+
{
351+
$id = 123456;
352+
353+
$tube = 'baz123';
354+
355+
$exception = new ServerException('baz error');
356+
357+
$client = $this->createMock(PheanstalkInterface::class);
358+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
359+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
360+
361+
$connection = new Connection(['tube_name' => $tube], $client);
362+
363+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
364+
$connection->keepalive((string) $id);
365+
}
333366
}

Transport/BeanstalkdReceiver.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
18-
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1919
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2020
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2121

2222
/**
2323
* @author Antonio Pauletich <[email protected]>
2424
*/
25-
class BeanstalkdReceiver implements ReceiverInterface, MessageCountAwareInterface
25+
class BeanstalkdReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface
2626
{
2727
private SerializerInterface $serializer;
2828

@@ -65,6 +65,11 @@ public function reject(Envelope $envelope): void
6565
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
6666
}
6767

68+
public function keepalive(Envelope $envelope): void
69+
{
70+
$this->connection->keepalive($this->findBeanstalkdReceivedStamp($envelope)->getId());
71+
}
72+
6873
public function getMessageCount(): int
6974
{
7075
return $this->connection->getMessageCount();

Transport/BeanstalkdTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1718
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -20,7 +21,7 @@
2021
/**
2122
* @author Antonio Pauletich <[email protected]>
2223
*/
23-
class BeanstalkdTransport implements TransportInterface, MessageCountAwareInterface
24+
class BeanstalkdTransport implements TransportInterface, KeepaliveReceiverInterface, MessageCountAwareInterface
2425
{
2526
private SerializerInterface $serializer;
2627
private BeanstalkdReceiver $receiver;
@@ -48,6 +49,11 @@ public function reject(Envelope $envelope): void
4849
$this->getReceiver()->reject($envelope);
4950
}
5051

52+
public function keepalive(Envelope $envelope): void
53+
{
54+
$this->getReceiver()->keepalive($envelope);
55+
}
56+
5157
public function getMessageCount(): int
5258
{
5359
return $this->getReceiver()->getMessageCount();

Transport/Connection.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ public function reject(string $id): void
180180
}
181181
}
182182

183+
public function keepalive(string $id): void
184+
{
185+
try {
186+
$this->client->useTube($this->tube)->touch(new JobId((int) $id));
187+
} catch (Exception $exception) {
188+
throw new TransportException($exception->getMessage(), 0, $exception);
189+
}
190+
}
191+
183192
public function getMessageCount(): int
184193
{
185194
try {

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php": ">=8.2",
1616
"pda/pheanstalk": "^4.0",
17-
"symfony/messenger": "^6.4|^7.0"
17+
"symfony/messenger": "^7.2"
1818
},
1919
"require-dev": {
2020
"symfony/property-access": "^6.4|^7.0",

0 commit comments

Comments
 (0)