Skip to content

Commit a9578f7

Browse files
authored
Merge pull request #58 from dotkernel/issue-57
Issue #57: Implemented DLQ to queue
2 parents 62a8157 + 82d2b60 commit a9578f7

File tree

4 files changed

+91
-140
lines changed

4 files changed

+91
-140
lines changed

config/autoload/messenger.local.php.dist

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,33 @@ use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
88
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as SymfonySerializer;
99

1010
return [
11-
'symfony' => [
12-
'messenger' => [
13-
'transports' => [
11+
"symfony" => [
12+
"messenger" => [
13+
'transports' => [
1414
'redis_transport' => [
15-
'dsn' => 'redis://127.0.0.1:6379/messages',
16-
'options' => [], // Redis specific options
15+
'dsn' => 'redis://127.0.0.1:6379/messages',
16+
'serializer' => SymfonySerializer::class,
17+
'retry_strategy' => [
18+
'max_retries' => 3, //maximum number of times a message will be retried after the first failure
19+
'delay' => 1000, // initial delay before retrying a failed message, in milliseconds
20+
'multiplier' => 2, // factor to increase the delay for each subsequent retry
21+
'max_delay' => 0, // maximum delay between retries, in milliseconds
22+
],
23+
],
24+
// separate transport for failed messages
25+
'failed' => [
26+
'dsn' => 'redis://127.0.0.1:6379/failed',
1727
'serializer' => SymfonySerializer::class,
1828
],
1929
],
30+
// tells Messenger that the transport to store failed messages is "failed"
31+
'failure_transport' => 'failed',
2032
],
2133
],
22-
'dependencies' => [
23-
'factories' => [
24-
'redis_transport' => [TransportFactory::class, 'redis_transport'],
34+
"dependencies" => [
35+
"factories" => [
36+
"redis_transport" => [TransportFactory::class, 'redis_transport'],
37+
"failed" => [TransportFactory::class, 'failed'],
2538
SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(),
2639
],
2740
],

src/App/Message/Message.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,9 @@ public function getPayload(): array
1515
{
1616
return $this->payload;
1717
}
18+
19+
public function setPayload(array $payload): void
20+
{
21+
$this->payload = $payload;
22+
}
1823
}

src/App/Message/MessageHandler.php

Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@
66

77
use Dot\DependencyInjection\Attribute\Inject;
88
use Dot\Log\Logger;
9-
use Exception;
10-
use Symfony\Component\Messenger\Exception\ExceptionInterface;
119
use Symfony\Component\Messenger\MessageBusInterface;
12-
use Symfony\Component\Messenger\Stamp\DelayStamp;
1310
use Throwable;
1411

1512
class MessageHandler
@@ -27,53 +24,37 @@ public function __construct(
2724
}
2825

2926
/**
30-
* @throws ExceptionInterface
27+
* @throws Throwable
3128
*/
3229
public function __invoke(Message $message): void
3330
{
3431
$payload = $message->getPayload();
3532

3633
try {
37-
// Throwing an exception to satisfy PHPStan (replace with own code)
38-
// For proof of concept and testing purposes message "control" will automatically mark it as successfully
39-
// processed and logged as info
4034
if ($payload['foo'] === 'control') {
41-
$this->logger->info($payload['foo'] . ': was processed successfully');
42-
} else {
43-
throw new Exception('Failed to execute');
35+
//user control message to log successfully processed message
36+
$this->logger->info($payload['foo'] . ' processed successfully');
37+
} elseif ($payload['foo'] === 'retry') {
38+
//user retry message to test retry functionality
39+
throw new \RuntimeException("Intentional failure for testing retries");
4440
}
45-
} catch (Throwable $exception) {
46-
$this->logger->error($payload['foo'] . ' failed with message: '
47-
. $exception->getMessage() . ' after ' . ($payload['retry'] ?? 0) . ' retries');
48-
$this->retry($payload);
49-
}
50-
}
41+
} catch (\Throwable $e) {
42+
$retryCount = $payload['retry_count'] ?? 0;
5143

52-
/**
53-
* @throws ExceptionInterface
54-
*/
55-
public function retry(array $payload): void
56-
{
57-
if (! isset($payload['retry'])) {
58-
$this->bus->dispatch(new Message(['foo' => $payload['foo'], 'retry' => 1]), [
59-
new DelayStamp($this->config['fail-safe']['first_retry']),
60-
]);
61-
} else {
62-
$retry = $payload['retry'];
63-
switch ($retry) {
64-
case 1:
65-
$delay = $this->config['fail-safe']['second_retry'];
66-
$this->bus->dispatch(new Message(['foo' => $payload['foo'], 'retry' => ++$retry]), [
67-
new DelayStamp($delay),
68-
]);
69-
break;
70-
case 2:
71-
$delay = $this->config['fail-safe']['third_retry'];
72-
$this->bus->dispatch(new Message(['foo' => $payload['foo'], 'retry' => ++$retry]), [
73-
new DelayStamp($delay),
74-
]);
75-
break;
44+
if ($retryCount === 0) {
45+
$this->logger->error(
46+
"Message '{$payload['foo']}' failed because: " . $e->getMessage()
47+
);
48+
} else {
49+
$this->logger->error(
50+
"Message '{$payload['foo']}' failed because: " . $e->getMessage() . " Retry {$retryCount}"
51+
);
7652
}
53+
54+
$payload['retry_count'] = $retryCount + 1;
55+
$message->setPayload($payload);
56+
57+
throw $e;
7758
}
7859
}
7960
}

test/App/Message/MessageHandlerTest.php

Lines changed: 44 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,14 @@
1111
use Psr\Container\ContainerExceptionInterface;
1212
use Queue\App\Message\Message;
1313
use Queue\App\Message\MessageHandler;
14-
use Symfony\Component\Messenger\Envelope;
15-
use Symfony\Component\Messenger\Exception\ExceptionInterface;
14+
use RuntimeException;
1615
use Symfony\Component\Messenger\MessageBusInterface;
17-
use Symfony\Component\Messenger\Stamp\DelayStamp;
18-
19-
use const PHP_EOL;
2016

2117
class MessageHandlerTest extends TestCase
2218
{
2319
private MessageBusInterface|MockObject $bus;
20+
private Logger $logger;
21+
private array $config;
2422
private MessageHandler $handler;
2523

2624
/**
@@ -29,17 +27,16 @@ class MessageHandlerTest extends TestCase
2927
*/
3028
protected function setUp(): void
3129
{
32-
$this->bus = $this->createMock(MessageBusInterface::class);
33-
34-
$logger = new Logger([
30+
$this->bus = $this->createMock(MessageBusInterface::class);
31+
$this->logger = new Logger([
3532
'writers' => [
3633
'FileWriter' => [
3734
'name' => 'null',
3835
'level' => Logger::ALERT,
3936
],
4037
],
4138
]);
42-
$config = [
39+
$this->config = [
4340
'fail-safe' => [
4441
'first_retry' => 1000,
4542
'second_retry' => 2000,
@@ -50,107 +47,62 @@ protected function setUp(): void
5047
'protocol' => 'tcp',
5148
'host' => 'localhost',
5249
'port' => '8556',
53-
'eof' => PHP_EOL,
50+
'eof' => "\n",
5451
],
5552
],
5653
'application' => [
5754
'name' => 'dotkernel',
5855
],
5956
];
6057

61-
$this->handler = new MessageHandler($this->bus, $logger, $config);
58+
$this->handler = new MessageHandler($this->bus, $this->logger, $this->config);
6259
}
6360

64-
/**
65-
* @throws Exception
66-
* @throws ExceptionInterface
67-
*/
68-
public function testInvokeSuccessfulProcessing(): void
61+
public function testControlMessageDoesNotThrowAndDoesNotSetRetryCount(): void
6962
{
70-
$payload = ['foo' => 'control'];
71-
$message = $this->createMock(Message::class);
72-
$message->method('getPayload')->willReturn($payload);
63+
$handler = $this->handler;
7364

74-
$this->handler->__invoke($message);
65+
$message = new Message(['foo' => 'control']);
66+
$handler($message);
7567

76-
$this->expectNotToPerformAssertions();
68+
$payload = $message->getPayload();
69+
$this->assertArrayNotHasKey('retry_count', $payload);
7770
}
7871

79-
/**
80-
* @throws Exception
81-
* @throws ExceptionInterface
82-
*/
83-
public function testInvokeFailureTriggersFirstRetry(): void
72+
public function testRetryMessageThrowsExceptionAndSetsRetryCount(): void
8473
{
85-
$payload = ['foo' => 'fail'];
86-
$message = $this->createMock(Message::class);
87-
$message->method('getPayload')->willReturn($payload);
88-
89-
$this->bus->expects($this->once())
90-
->method('dispatch')
91-
->with(
92-
$this->callback(function ($msg) {
93-
return $msg instanceof Message
94-
&& $msg->getPayload()['foo'] === 'fail'
95-
&& $msg->getPayload()['retry'] === 1;
96-
}),
97-
$this->callback(function ($stamps) {
98-
return isset($stamps[0]) && $stamps[0] instanceof DelayStamp
99-
&& $stamps[0]->getDelay() === 1000;
100-
})
101-
)
102-
->willReturn(new Envelope($message));
103-
104-
$this->handler->__invoke($message);
105-
}
74+
$handler = $this->handler;
10675

107-
/**
108-
* @throws ExceptionInterface
109-
*/
110-
public function testRetrySecondTime(): void
111-
{
112-
$payload = ['foo' => 'retry_test', 'retry' => 1];
113-
114-
$this->bus->expects($this->once())
115-
->method('dispatch')
116-
->with(
117-
$this->callback(function ($msg) {
118-
return $msg instanceof Message
119-
&& $msg->getPayload()['retry'] === 2
120-
&& $msg->getPayload()['foo'] === 'retry_test';
121-
}),
122-
$this->callback(function ($stamps) {
123-
return isset($stamps[0]) && $stamps[0] instanceof DelayStamp
124-
&& $stamps[0]->getDelay() === 2000;
125-
})
126-
)
127-
->willReturn(new Envelope(new Message($payload)));
128-
129-
$this->handler->retry($payload);
76+
$message = new Message(['foo' => 'retry']);
77+
78+
$this->expectException(RuntimeException::class);
79+
$this->expectExceptionMessage("Intentional failure for testing retries");
80+
81+
try {
82+
$handler($message);
83+
} finally {
84+
$payload = $message->getPayload();
85+
$this->assertArrayHasKey('retry_count', $payload);
86+
$this->assertEquals(1, $payload['retry_count']); // first retry
87+
}
13088
}
13189

132-
/**
133-
* @throws ExceptionInterface
134-
*/
135-
public function testRetryThirdTime(): void
90+
public function testRetryMessageWithExistingRetryCountIncrementsIt(): void
13691
{
137-
$payload = ['foo' => 'retry_test', 'retry' => 2];
138-
139-
$this->bus->expects($this->once())
140-
->method('dispatch')
141-
->with(
142-
$this->callback(function ($msg) {
143-
return $msg instanceof Message
144-
&& $msg->getPayload()['retry'] === 3
145-
&& $msg->getPayload()['foo'] === 'retry_test';
146-
}),
147-
$this->callback(function ($stamps) {
148-
return isset($stamps[0]) && $stamps[0] instanceof DelayStamp
149-
&& $stamps[0]->getDelay() === 3000;
150-
})
151-
)
152-
->willReturn(new Envelope(new Message($payload)));
153-
154-
$this->handler->retry($payload);
92+
$handler = $this->handler;
93+
94+
$message = new Message([
95+
'foo' => 'retry',
96+
'retry_count' => 2,
97+
]);
98+
99+
$this->expectException(RuntimeException::class);
100+
101+
try {
102+
$handler($message);
103+
} finally {
104+
$payload = $message->getPayload();
105+
$this->assertEquals(3, $payload['retry_count']); // incremented from 2 → 3
106+
}
155107
}
156108
}

0 commit comments

Comments
 (0)