Skip to content

Commit 970c07b

Browse files
silasjoistenfabpot
authored andcommitted
[Messenger] Add keepalive support
1 parent 7b0cdc8 commit 970c07b

File tree

8 files changed

+172
-3
lines changed

8 files changed

+172
-3
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.3
5+
---
6+
7+
* Add "keepalive" support
8+
49
7.1
510
---
611

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,97 @@ public function testSendLastInsertIdReturnsInteger()
299299
self::assertSame('1', $id);
300300
}
301301

302+
public function testKeepalive()
303+
{
304+
$queryBuilder = $this->getQueryBuilderMock();
305+
$driverConnection = $this->getDBALConnectionMock();
306+
307+
$connection = new Connection(['redeliver_timeout' => 30, 'table_name' => 'messenger_messages'], $driverConnection);
308+
309+
$queryBuilder->expects($this->once())
310+
->method('update')
311+
->with('messenger_messages')
312+
->willReturnSelf();
313+
314+
$queryBuilder->expects($this->once())
315+
->method('set')
316+
->with('delivered_at', '?')
317+
->willReturnSelf();
318+
319+
$queryBuilder->expects($this->once())
320+
->method('where')
321+
->with('id = ?')
322+
->willReturnSelf();
323+
324+
$driverConnection->expects($this->once())
325+
->method('beginTransaction');
326+
327+
$driverConnection->expects($this->once())
328+
->method('createQueryBuilder')
329+
->willReturn($queryBuilder);
330+
331+
$driverConnection->expects($this->once())
332+
->method('commit');
333+
334+
$connection->keepalive('1');
335+
}
336+
337+
public function testKeepaliveRollback()
338+
{
339+
$queryBuilder = $this->getQueryBuilderMock();
340+
$driverConnection = $this->getDBALConnectionMock();
341+
342+
$connection = new Connection(['redeliver_timeout' => 30, 'table_name' => 'messenger_messages'], $driverConnection);
343+
344+
$queryBuilder->expects($this->once())
345+
->method('update')
346+
->with('messenger_messages')
347+
->willReturnSelf();
348+
349+
$queryBuilder->expects($this->once())
350+
->method('set')
351+
->with('delivered_at', '?')
352+
->willReturnSelf();
353+
354+
$queryBuilder->expects($this->once())
355+
->method('where')
356+
->with('id = ?')
357+
->willReturnSelf();
358+
359+
$driverConnection->expects($this->once())
360+
->method('beginTransaction');
361+
362+
$driverConnection->expects($this->once())
363+
->method('createQueryBuilder')
364+
->willReturn($queryBuilder);
365+
366+
$driverConnection->expects($this->once())
367+
->method('executeStatement')
368+
->willThrowException($this->createMock(DBALException::class));
369+
370+
$driverConnection->expects($this->never())
371+
->method('commit');
372+
373+
$driverConnection->expects($this->once())
374+
->method('rollBack');
375+
376+
$this->expectException(TransportException::class);
377+
378+
$connection->keepalive('1');
379+
}
380+
381+
public function testKeepaliveThrowsExceptionWhenRedeliverTimeoutIsLessThenInterval()
382+
{
383+
$driverConnection = $this->getDBALConnectionMock();
384+
385+
$connection = new Connection(['redeliver_timeout' => 30], $driverConnection);
386+
387+
$this->expectException(TransportException::class);
388+
$this->expectExceptionMessage('Doctrine redeliver_timeout (30s) cannot be smaller than the keepalive interval (60s).');
389+
390+
$connection->keepalive('1', 60);
391+
}
392+
302393
private function getDBALConnectionMock()
303394
{
304395
$driverConnection = $this->createMock(DBALConnection::class);

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,22 @@ public function testRejectThrowsException()
353353
$receiver->reject($envelope);
354354
}
355355

356+
public function testKeepalive()
357+
{
358+
$serializer = $this->createSerializer();
359+
$connection = $this->createMock(Connection::class);
360+
361+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
362+
$receiver = new DoctrineReceiver($connection, $serializer);
363+
364+
$connection
365+
->expects($this->once())
366+
->method('keepalive')
367+
->with('1');
368+
369+
$receiver->keepalive($envelope);
370+
}
371+
356372
private function createDoctrineEnvelope(): array
357373
{
358374
return [

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use PHPUnit\Framework\TestCase;
1717
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
1818
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
19+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineReceivedStamp;
1920
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
2021
use Symfony\Component\Messenger\Envelope;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -69,6 +70,22 @@ public function testConfigureSchema()
6970
$transport->configureSchema($schema, $dbalConnection, static fn () => true);
7071
}
7172

73+
public function testKeepalive()
74+
{
75+
$transport = $this->getTransport(
76+
null,
77+
$connection = $this->createMock(Connection::class)
78+
);
79+
80+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
81+
82+
$connection->expects($this->once())
83+
->method('keepalive')
84+
->with('1');
85+
86+
$transport->keepalive($envelope);
87+
}
88+
7289
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): DoctrineTransport
7390
{
7491
$serializer ??= $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,34 @@ public function reject(string $id): bool
286286
}
287287
}
288288

289+
public function keepalive(string $id, ?int $seconds = null): void
290+
{
291+
// Check if the redeliver timeout is smaller than the keepalive interval
292+
if (null !== $seconds && $this->configuration['redeliver_timeout'] < $seconds) {
293+
throw new TransportException(\sprintf('Doctrine redeliver_timeout (%ds) cannot be smaller than the keepalive interval (%ds).', $this->configuration['redeliver_timeout'], $seconds));
294+
}
295+
296+
$this->driverConnection->beginTransaction();
297+
try {
298+
$queryBuilder = $this->driverConnection->createQueryBuilder()
299+
->update($this->configuration['table_name'])
300+
->set('delivered_at', '?')
301+
->where('id = ?');
302+
$now = new \DateTimeImmutable('UTC');
303+
$this->executeStatement($queryBuilder->getSQL(), [
304+
$now,
305+
$id,
306+
], [
307+
Types::DATETIME_IMMUTABLE,
308+
]);
309+
310+
$this->driverConnection->commit();
311+
} catch (\Throwable $e) {
312+
$this->driverConnection->rollBack();
313+
throw new TransportException($e->getMessage(), 0, $e);
314+
}
315+
}
316+
289317
public function setup(): void
290318
{
291319
$configuration = $this->driverConnection->getConfiguration();

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1919
use Symfony\Component\Messenger\Exception\TransportException;
2020
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
21+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
2122
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
2223
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2324
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -26,7 +27,7 @@
2627
/**
2728
* @author Vincent Touzet <[email protected]>
2829
*/
29-
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface
30+
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface, KeepaliveReceiverInterface
3031
{
3132
private const MAX_RETRIES = 3;
3233
private int $retryingSafetyCounter = 0;
@@ -72,6 +73,11 @@ public function ack(Envelope $envelope): void
7273
});
7374
}
7475

76+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
77+
{
78+
$this->connection->keepalive($this->findDoctrineReceivedStamp($envelope)->getId(), $seconds);
79+
}
80+
7581
public function reject(Envelope $envelope): void
7682
{
7783
$this->withRetryableExceptionRetry(function () use ($envelope) {

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Doctrine\DBAL\Schema\Schema;
1616
use Doctrine\DBAL\Schema\Table;
1717
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -24,7 +25,7 @@
2425
/**
2526
* @author Vincent Touzet <[email protected]>
2627
*/
27-
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
28+
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface, KeepaliveReceiverInterface
2829
{
2930
private DoctrineReceiver $receiver;
3031
private DoctrineSender $sender;
@@ -50,6 +51,11 @@ public function reject(Envelope $envelope): void
5051
$this->getReceiver()->reject($envelope);
5152
}
5253

54+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
55+
{
56+
$this->getReceiver()->keepalive($envelope, $seconds);
57+
}
58+
5359
public function getMessageCount(): int
5460
{
5561
return $this->getReceiver()->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php": ">=8.2",
2020
"doctrine/dbal": "^3.6|^4",
21-
"symfony/messenger": "^6.4|^7.0",
21+
"symfony/messenger": "^7.2",
2222
"symfony/service-contracts": "^2.5|^3"
2323
},
2424
"require-dev": {

0 commit comments

Comments
 (0)