Skip to content

Commit 91073ab

Browse files
committed
Create LimitAttemptsExtension
1 parent cf1fedc commit 91073ab

File tree

13 files changed

+370
-13
lines changed

13 files changed

+370
-13
lines changed

docs/en/index.rst

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ Create a Job class::
8585
{
8686
use LogTrait;
8787

88+
/**
89+
* The maximum number of times the job may be attempted.
90+
*
91+
* @var int|null
92+
*/
93+
public static $maxAttempts = 3;
94+
8895
public function execute(Message $message): ?string
8996
{
9097
$id = $message->getArgument('id');
@@ -118,6 +125,14 @@ The job **may** also return a null value, which is interpreted as
118125
``Processor::ACK``. Failure to respond with a valid type will result in an
119126
interpreted message failure and requeue of the message.
120127

128+
Properties:
129+
130+
- ``maxAttempts``: The maximum number of times the job may be requeued as a result
131+
of an exception or by explicitly returning ``Processor::REQUEUE``. If
132+
provided, this value will override the value provided in the worker command
133+
line option ``--max-attempts``. If a value is not provided by the job or by
134+
the command line option, the job may be requeued an infinite number of times.
135+
121136
Queueing
122137
--------
123138

@@ -244,6 +259,7 @@ This shell can take a few different options:
244259
- ``--logger`` (default: ``stdout``): Name of a configured logger
245260
- ``--max-jobs`` (default: ``null``): Maximum number of jobs to process. Worker will exit after limit is reached.
246261
- ``--max-runtime`` (default: ``null``): Maximum number of seconds to run. Worker will exit after limit is reached.
262+
- ``--max-attempts`` (default: ``null``): Maximum number of times each job will be attempted. Maximum attempts defined on a job will override this value.
247263
- ``--verbose`` or ``-v`` (default: ``null``): Provide verbose output, displaying the current values for:
248264

249265
- Max Iterations

src/Command/WorkerCommand.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use Cake\Console\ConsoleOptionParser;
2323
use Cake\Core\Configure;
2424
use Cake\Log\Log;
25+
use Cake\Queue\Consumption\LimitAttemptsExtension;
2526
use Cake\Queue\Consumption\LimitConsumedMessagesExtension;
2627
use Cake\Queue\Queue\Processor;
2728
use Cake\Queue\QueueManager;
@@ -86,6 +87,12 @@ public function getOptionParser(): ConsoleOptionParser
8687
'default' => null,
8788
'short' => 'r',
8889
]);
90+
$parser->addOption('max-attempts', [
91+
'help' => 'Maximum number of times each job will be attempted.'
92+
. ' Maximum attempts defined on a job will override this value.',
93+
'default' => null,
94+
'short' => 'a',
95+
]);
8996
$parser->setDescription(
9097
'Runs a queue worker that consumes from the named queue.'
9198
);
@@ -104,6 +111,7 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
104111
{
105112
$extensions = [
106113
new LoggerExtension($logger),
114+
new LimitAttemptsExtension((int)$args->getOption('max-attempts') ?: null),
107115
];
108116

109117
if (!is_null($args->getOption('max-jobs'))) {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Cake\Queue\Consumption;
5+
6+
use Cake\Queue\Job\Message;
7+
use Enqueue\Consumption\Context\MessageResult;
8+
use Enqueue\Consumption\MessageResultExtensionInterface;
9+
use Enqueue\Consumption\Result;
10+
use Interop\Queue\Processor;
11+
12+
class LimitAttemptsExtension implements MessageResultExtensionInterface
13+
{
14+
/**
15+
* The property key used to set the number of times a message was attempted.
16+
*
17+
* @var string
18+
*/
19+
public const ATTEMPTS_PROPERTY = 'attempts';
20+
21+
/**
22+
* The maximum number of times a job may be attempted. $maxAttempts defined on a
23+
* Job will override this value.
24+
*
25+
* @var int|null
26+
*/
27+
protected $maxAttempts;
28+
29+
/**
30+
* @param int|null $maxAttempts The maximum number of times a job may be attempted.
31+
* @return void
32+
*/
33+
public function __construct(?int $maxAttempts = null)
34+
{
35+
$this->maxAttempts = $maxAttempts;
36+
}
37+
38+
/**
39+
* @param \Enqueue\Consumption\Context\MessageResult $context The result of the message after it was processed.
40+
* @return void
41+
*/
42+
public function onResult(MessageResult $context): void
43+
{
44+
if ($context->getResult() !== Processor::REQUEUE) {
45+
return;
46+
}
47+
48+
$message = $context->getMessage();
49+
50+
$jobMessage = new Message($message, $context->getContext());
51+
52+
$maxAttempts = $jobMessage->getMaxAttempts() ?? $this->maxAttempts;
53+
54+
if ($maxAttempts === null) {
55+
return;
56+
}
57+
58+
$attemptNumber = $message->getProperty(self::ATTEMPTS_PROPERTY, 0) + 1;
59+
60+
if ($attemptNumber >= $maxAttempts) {
61+
$context->changeResult(
62+
Result::reject(sprintf('The maximum number of %d allowed attempts was reached.', $maxAttempts))
63+
);
64+
65+
return;
66+
}
67+
68+
$newMessage = clone $message;
69+
$newMessage->setProperty(self::ATTEMPTS_PROPERTY, $attemptNumber);
70+
71+
$queueContext = $context->getContext();
72+
$producer = $queueContext->createProducer();
73+
$consumer = $context->getConsumer();
74+
$producer->send($consumer->getQueue(), $newMessage);
75+
76+
$context->changeResult(
77+
Result::reject('A copy of the message was sent with an incremented attempt count.')
78+
);
79+
}
80+
}

src/Job/Message.php

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,20 @@ public function getCallable()
9494
return $this->callable;
9595
}
9696

97+
$target = $this->getTarget();
98+
99+
$this->callable = Closure::fromCallable([new $target[0](), $target[1]]);
100+
101+
return $this->callable;
102+
}
103+
104+
/**
105+
* Get the target class and method.
106+
*
107+
* @return array
108+
*/
109+
protected function getTarget(): array
110+
{
97111
$target = $this->parsedBody['class'] ?? null;
98112

99113
if (!is_array($target) || count($target) !== 2) {
@@ -103,9 +117,7 @@ public function getCallable()
103117
));
104118
}
105119

106-
$this->callable = Closure::fromCallable([new $target[0](), $target[1]]);
107-
108-
return $this->callable;
120+
return $target;
109121
}
110122

111123
/**
@@ -129,6 +141,20 @@ public function getArgument($key = null, $default = null)
129141
return Hash::get($data, $key, $default);
130142
}
131143

144+
/**
145+
* The maximum number of attempts allowed by the job.
146+
*
147+
* @return null|int
148+
*/
149+
public function getMaxAttempts(): ?int
150+
{
151+
$target = $this->getTarget();
152+
153+
$class = $target[0];
154+
155+
return $class::$maxAttempts ?? null;
156+
}
157+
132158
/**
133159
* @return string
134160
* @psalm-suppress InvalidToString

templates/bake/job.twig

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ use Interop\Queue\Processor;
2727
*/
2828
class {{ name }}Job implements JobInterface
2929
{
30+
/**
31+
* The maximum number of times the job may be attempted.
32+
*
33+
* @var int|null
34+
*/
35+
public static $maxAttempts = 3;
36+
3037
/**
3138
* Executes logic for {{ name }}Job
3239
*

tests/TestCase/Command/WorkerCommandTest.php

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
use Cake\Core\Configure;
2020
use Cake\Log\Log;
2121
use Cake\Queue\QueueManager;
22+
use Cake\Queue\Test\TestCase\DebugLogTrait;
2223
use Cake\TestSuite\ConsoleIntegrationTestTrait;
2324
use Cake\TestSuite\TestCase;
2425
use TestApp\Job\LogToDebugJob;
26+
use TestApp\Job\RequeueJob;
2527
use TestApp\WelcomeMailerListener;
2628

2729
/**
@@ -32,6 +34,7 @@
3234
class WorkerCommandTest extends TestCase
3335
{
3436
use ConsoleIntegrationTestTrait;
37+
use DebugLogTrait;
3538

3639
public function setUp(): void
3740
{
@@ -240,16 +243,31 @@ public function testQueueProcessesJobWithOtherQueue()
240243
$this->assertDebugLogContains('Debug job was run');
241244
}
242245

243-
protected function assertDebugLogContains($expected): void
246+
/**
247+
* Test max-attempts option
248+
*
249+
* @runInSeparateProcess
250+
*/
251+
public function testQueueProcessesJobWithMaxAttempts()
244252
{
245-
$log = Log::engine('debug');
246-
$found = false;
247-
foreach ($log->read() as $line) {
248-
if (strpos($line, $expected) !== false) {
249-
$found = true;
250-
break;
251-
}
252-
}
253-
$this->assertTrue($found, "Did not find `{$expected}` in logs.");
253+
$config = [
254+
'queue' => 'default',
255+
'url' => 'file:///' . TMP . DS . 'queue',
256+
'receiveTimeout' => 100,
257+
];
258+
Configure::write('Queue', ['default' => $config]);
259+
260+
Log::setConfig('debug', [
261+
'className' => 'Array',
262+
'levels' => ['notice', 'info', 'debug'],
263+
]);
264+
265+
QueueManager::setConfig('default', $config);
266+
QueueManager::push(RequeueJob::class);
267+
QueueManager::drop('default');
268+
269+
$this->exec('queue worker --max-attempts=3 --max-jobs=1 --logger=debug --verbose');
270+
271+
$this->assertDebugLogContainsExactly('RequeueJob is requeueing', 3);
254272
}
255273
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Cake\Queue\Test\TestCase\Job;
5+
6+
use Cake\Log\Log;
7+
use Cake\Queue\Consumption\LimitAttemptsExtension;
8+
use Cake\Queue\Consumption\LimitConsumedMessagesExtension;
9+
use Cake\Queue\Queue\Processor as QueueProcessor;
10+
use Cake\Queue\QueueManager;
11+
use Cake\Queue\Test\TestCase\DebugLogTrait;
12+
use Cake\TestSuite\TestCase;
13+
use Enqueue\Consumption\ChainExtension;
14+
use Psr\Log\NullLogger;
15+
use TestApp\Job\MaxAttemptsIsThreeJob;
16+
use TestApp\Job\RequeueJob;
17+
18+
class LimitAttemptsExtensionTest extends TestCase
19+
{
20+
use DebugLogTrait;
21+
22+
/**
23+
* @beforeClass
24+
* @after
25+
*/
26+
public static function dropConfigs()
27+
{
28+
Log::drop('debug');
29+
QueueManager::drop('default');
30+
}
31+
32+
public function testMessageShouldBeRequeuedIfMaxAttemptsIsNotSet()
33+
{
34+
$consume = $this->setupQeueue();
35+
36+
QueueManager::push(RequeueJob::class);
37+
38+
$consume();
39+
40+
$count = $this->debugLogCount('RequeueJob is requeueing');
41+
$this->assertGreaterThanOrEqual(10, $count);
42+
}
43+
44+
public function testMessageShouldBeRequeuedUntilMaxAttemptsIsReached()
45+
{
46+
$consume = $this->setupQeueue();
47+
48+
QueueManager::push(MaxAttemptsIsThreeJob::class, []);
49+
50+
$consume();
51+
52+
$this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 3);
53+
}
54+
55+
public function testMessageShouldBeRequeuedIfGlobalMaxAttemptsIsNotSet()
56+
{
57+
$consume = $this->setupQeueue();
58+
59+
QueueManager::push(RequeueJob::class);
60+
61+
$consume();
62+
63+
$count = $this->debugLogCount('RequeueJob is requeueing');
64+
$this->assertGreaterThanOrEqual(10, $count);
65+
}
66+
67+
public function testMessageShouldBeRequeuedUntilGlobalMaxAttemptsIsReached()
68+
{
69+
$consume = $this->setupQeueue([3]);
70+
71+
QueueManager::push(MaxAttemptsIsThreeJob::class, ['succeedAt' => 10]);
72+
73+
$consume();
74+
75+
$this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 3);
76+
}
77+
78+
protected function setupQeueue($extensionArgs = [])
79+
{
80+
Log::setConfig('debug', [
81+
'className' => 'Array',
82+
'levels' => ['debug'],
83+
]);
84+
85+
QueueManager::setConfig('default', [
86+
'url' => 'file:///' . TMP . DS . uniqid('queue'),
87+
'receiveTimeout' => 100,
88+
]);
89+
90+
$client = QueueManager::engine('default');
91+
92+
$processor = new QueueProcessor(new NullLogger());
93+
$client->bindTopic('default', $processor);
94+
95+
$extension = new ChainExtension([
96+
new LimitConsumedMessagesExtension(1),
97+
new LimitAttemptsExtension(...$extensionArgs),
98+
]);
99+
100+
return function () use ($client, $extension) {
101+
$client->consume($extension);
102+
};
103+
}
104+
}

0 commit comments

Comments
 (0)