Skip to content

Commit f3d2883

Browse files
committed
Add support for unique jobs
1 parent 906bc11 commit f3d2883

File tree

13 files changed

+392
-13
lines changed

13 files changed

+392
-13
lines changed

docs/en/index.rst

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,16 @@ The following configuration should be present in the config array of your **conf
5959

6060
// Whether to store failed jobs in the queue_failed_jobs table. default: false
6161
'storeFailedJobs' => true,
62+
63+
// (optional) The cache configuration for storing unique job ids. `duration`
64+
// should be greater than the maximum length of time any job can be expected
65+
// to remain on the queue. Otherwise, duplicate jobs may be
66+
// possible. Defaults to +24 hours. Note that `File` engine is only suitable
67+
// for local development.
68+
// See https://book.cakephp.org/4/en/core-libraries/caching.html#configuring-cache-engines.
69+
'uniqueCache' => [
70+
'engine' => 'File',
71+
],
6272
]
6373
],
6474
// ...
@@ -111,6 +121,13 @@ Create a Job class::
111121
*/
112122
public static $maxAttempts = 3;
113123

124+
/**
125+
* Whether there should be only one instance of a job on the queue at a time. (optional property)
126+
*
127+
* @var bool
128+
*/
129+
public static $shouldBeUnique = false;
130+
114131
public function execute(Message $message): ?string
115132
{
116133
$id = $message->getArgument('id');
@@ -151,6 +168,12 @@ Properties:
151168
provided, this value will override the value provided in the worker command
152169
line option ``--max-attempts``. If a value is not provided by the job or by
153170
the command line option, the job may be requeued an infinite number of times.
171+
- ``shouldBeUnique``: If ``true``, only one instance of the job, identified by
172+
it's class, method, and data, will be allowed to be present on the queue at a
173+
time. Subsequent pushes will be silently dropped. This is useful for
174+
idempotent operations where consecutive job executions have no benefit. For
175+
example, refreshing calculated data. If ``true``, the ``uniqueCache``
176+
configuration must be set.
154177

155178
Queueing
156179
--------

src/Command/JobCommand.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
namespace Cake\Queue\Command;
1818

1919
use Bake\Command\SimpleBakeCommand;
20+
use Cake\Console\Arguments;
2021
use Cake\Console\ConsoleOptionParser;
2122

2223
class JobCommand extends SimpleBakeCommand
@@ -47,6 +48,20 @@ public function template(): string
4748
return 'Cake/Queue.job';
4849
}
4950

51+
/**
52+
* @inheritDoc
53+
*/
54+
public function templateData(Arguments $arguments): array
55+
{
56+
$parentData = parent::templateData($arguments);
57+
58+
$data = [
59+
'isUnique' => $arguments->getOption('unique'),
60+
];
61+
62+
return array_merge($parentData, $data);
63+
}
64+
5065
/**
5166
* Gets the option parser instance and configures it.
5267
*
@@ -61,6 +76,11 @@ public function buildOptionParser(ConsoleOptionParser $parser): ConsoleOptionPar
6176
->setDescription('Bake a queue job class.')
6277
->addArgument('name', [
6378
'help' => 'The name of the queue job class to create.',
79+
])
80+
->addOption('unique', [
81+
'help' => 'Whether there should be only one instance of a job on the queue at a time.',
82+
'boolean' => true,
83+
'default' => false,
6484
]);
6585
}
6686
}

src/Command/WorkerCommand.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use Cake\Log\Log;
2525
use Cake\Queue\Consumption\LimitAttemptsExtension;
2626
use Cake\Queue\Consumption\LimitConsumedMessagesExtension;
27+
use Cake\Queue\Consumption\RemoveUniqueJobIdFromCacheExtension;
2728
use Cake\Queue\Listener\FailedJobsListener;
2829
use Cake\Queue\Queue\Processor;
2930
use Cake\Queue\QueueManager;
@@ -117,6 +118,7 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
117118
$extensions = [
118119
new LoggerExtension($logger),
119120
$limitAttempsExtension,
121+
new RemoveUniqueJobIdFromCacheExtension('Cake/Queue.queueUnique'),
120122
];
121123

122124
if (!is_null($args->getOption('max-jobs'))) {
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Cake\Queue\Consumption;
5+
6+
use Cake\Cache\Cache;
7+
use Cake\Queue\Job\Message;
8+
use Cake\Queue\QueueManager;
9+
use Enqueue\Consumption\Context\MessageResult;
10+
use Enqueue\Consumption\MessageResultExtensionInterface;
11+
12+
class RemoveUniqueJobIdFromCacheExtension implements MessageResultExtensionInterface
13+
{
14+
/**
15+
* Cache engine name.
16+
*
17+
* @var string
18+
*/
19+
protected $cache;
20+
21+
/**
22+
* @param string $cache Cache engine name.
23+
* @return void
24+
*/
25+
public function __construct(string $cache)
26+
{
27+
$this->cache = $cache;
28+
}
29+
30+
/**
31+
* @param \Enqueue\Consumption\Context\MessageResult $context The result of the message after it was processed.
32+
* @return void
33+
*/
34+
public function onResult(MessageResult $context): void
35+
{
36+
$message = $context->getMessage();
37+
38+
$jobMessage = new Message($message, $context->getContext());
39+
40+
/** @psalm-var class-string $class */
41+
[$class, $method] = $jobMessage->getTarget();
42+
43+
if (empty($class::$shouldBeUnique)) {
44+
return;
45+
}
46+
47+
$data = $jobMessage->getArgument();
48+
49+
$uniqueId = QueueManager::getUniqueId($class, $method, $data);
50+
51+
Cache::delete($uniqueId, $this->cache);
52+
}
53+
}

src/Job/Message.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public function getCallable()
104104
/**
105105
* Get the target class and method.
106106
*
107-
* @return array
107+
* @return array{string, string}
108+
* @psalm-return array{class-string, string}
108109
*/
109110
public function getTarget(): array
110111
{
@@ -150,6 +151,7 @@ public function getMaxAttempts(): ?int
150151
{
151152
$target = $this->getTarget();
152153

154+
/** @psalm-var class-string $class */
153155
$class = $target[0];
154156

155157
return $class::$maxAttempts ?? null;

src/QueueManager.php

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
namespace Cake\Queue;
1818

1919
use BadMethodCallException;
20+
use Cake\Cache\Cache;
2021
use Cake\Core\App;
2122
use Cake\Log\Log;
2223
use Enqueue\Client\Message as ClientMessage;
@@ -119,6 +120,16 @@ public static function setConfig($key, $config = null): void
119120
}
120121
}
121122

123+
if (!empty($config['uniqueCache'])) {
124+
$cacheDefaults = [
125+
'duration' => '+24 hours',
126+
];
127+
128+
$cacheConfig = array_merge($cacheDefaults, $config['uniqueCache']);
129+
130+
Cache::setConfig('Cake/Queue.queueUnique', $cacheConfig);
131+
}
132+
122133
/** @psalm-suppress InvalidPropertyAssignmentValue */
123134
static::$_config[$key] = $config;
124135
}
@@ -209,7 +220,33 @@ public static function push($className, array $data = [], array $options = []):
209220

210221
$name = $options['config'] ?? 'default';
211222

212-
$config = static::getConfig($name);
223+
$config = static::getConfig($name) + [
224+
'logger' => null,
225+
];
226+
227+
$logger = $config['logger'] ? Log::engine($config['logger']) : null;
228+
229+
/** @psalm-suppress InvalidPropertyFetch */
230+
if (!empty($class::$shouldBeUnique)) {
231+
if (!Cache::getConfig('Cake/Queue.queueUnique')) {
232+
throw new InvalidArgumentException(
233+
"$class::\$shouldBeUnique is set to `true` but `uniqueCache` configuration is missing."
234+
);
235+
}
236+
237+
$uniqueId = static::getUniqueId($class, $method, $data);
238+
239+
if (Cache::read($uniqueId, 'Cake/Queue.queueUnique')) {
240+
if ($logger) {
241+
$logger->debug(
242+
"An identical instance of $class already exists on the queue. This push will be ignored."
243+
);
244+
}
245+
246+
return;
247+
}
248+
}
249+
213250
$queue = $options['queue'] ?? $config['queue'] ?? 'default';
214251

215252
$message = new ClientMessage([
@@ -237,5 +274,30 @@ public static function push($className, array $data = [], array $options = []):
237274

238275
$client = static::engine($name);
239276
$client->sendEvent($queue, $message);
277+
278+
if (!empty($class::$shouldBeUnique)) {
279+
$uniqueId = static::getUniqueId($class, $method, $data);
280+
281+
Cache::add($uniqueId, true, 'Cake/Queue.queueUnique');
282+
}
283+
}
284+
285+
/**
286+
* @param string $class Class name
287+
* @param string $method Method name
288+
* @param array $data Message data
289+
* @return string
290+
*/
291+
public static function getUniqueId(string $class, string $method, array $data): string
292+
{
293+
sort($data);
294+
295+
$hashInput = implode([
296+
$class,
297+
$method,
298+
json_encode($data),
299+
]);
300+
301+
return hash('md5', $hashInput);
240302
}
241303
}

templates/bake/job.twig

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ class {{ name }}Job implements JobInterface
3434
*/
3535
public static $maxAttempts = 3;
3636
37+
{% if isUnique %}
38+
/**
39+
* Whether there should be only one instance of a job on the queue at a time. (optional property)
40+
*
41+
* @var bool
42+
*/
43+
public static $shouldBeUnique = true;
44+
45+
{% endif %}
3746
/**
3847
* Executes logic for {{ name }}Job
3948
*

tests/TestCase/Consumption/LimitAttemptsExtensionTest.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static function dropConfigs()
4040

4141
public function testMessageShouldBeRequeuedIfMaxAttemptsIsNotSet()
4242
{
43-
$consume = $this->setupQeueue();
43+
$consume = $this->setupQueue();
4444

4545
QueueManager::push(RequeueJob::class);
4646

@@ -52,7 +52,7 @@ public function testMessageShouldBeRequeuedIfMaxAttemptsIsNotSet()
5252

5353
public function testFailedEventIsFiredWhenMaxAttemptsIsExceeded()
5454
{
55-
$consume = $this->setupQeueue();
55+
$consume = $this->setupQueue();
5656

5757
QueueManager::push(MaxAttemptsIsThreeJob::class, []);
5858

@@ -63,7 +63,7 @@ public function testFailedEventIsFiredWhenMaxAttemptsIsExceeded()
6363

6464
public function testMessageShouldBeRequeuedUntilMaxAttemptsIsReached()
6565
{
66-
$consume = $this->setupQeueue();
66+
$consume = $this->setupQueue();
6767

6868
QueueManager::push(MaxAttemptsIsThreeJob::class, []);
6969

@@ -74,7 +74,7 @@ public function testMessageShouldBeRequeuedUntilMaxAttemptsIsReached()
7474

7575
public function testMessageShouldBeRequeuedIfGlobalMaxAttemptsIsNotSet()
7676
{
77-
$consume = $this->setupQeueue();
77+
$consume = $this->setupQueue();
7878

7979
QueueManager::push(RequeueJob::class);
8080

@@ -86,7 +86,7 @@ public function testMessageShouldBeRequeuedIfGlobalMaxAttemptsIsNotSet()
8686

8787
public function testMessageShouldBeRequeuedUntilGlobalMaxAttemptsIsReached()
8888
{
89-
$consume = $this->setupQeueue([3]);
89+
$consume = $this->setupQueue([3]);
9090

9191
QueueManager::push(MaxAttemptsIsThreeJob::class, ['succeedAt' => 10]);
9292

@@ -95,7 +95,7 @@ public function testMessageShouldBeRequeuedUntilGlobalMaxAttemptsIsReached()
9595
$this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 3);
9696
}
9797

98-
protected function setupQeueue($extensionArgs = [])
98+
protected function setupQueue($extensionArgs = [])
9999
{
100100
Log::setConfig('debug', [
101101
'className' => 'Array',

0 commit comments

Comments
 (0)