Skip to content

Commit fb2bd80

Browse files
authored
Merge pull request #108 from sheldonreiff/unique-jobs
Add support for unique jobs
2 parents 6c85ef3 + f34aa71 commit fb2bd80

File tree

14 files changed

+395
-16
lines changed

14 files changed

+395
-16
lines changed

docs/en/index.rst

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,16 @@ The following configuration should be present in the config array of your **conf
5959

6060
// Whether to store failed jobs in the queue_failed_jobs table. default: false
6161
'storeFailedJobs' => true,
62+
63+
// (optional) The cache configuration for storing unique job ids. `duration`
64+
// should be greater than the maximum length of time any job can be expected
65+
// to remain on the queue. Otherwise, duplicate jobs may be
66+
// possible. Defaults to +24 hours. Note that `File` engine is only suitable
67+
// for local development.
68+
// See https://book.cakephp.org/4/en/core-libraries/caching.html#configuring-cache-engines.
69+
'uniqueCache' => [
70+
'engine' => 'File',
71+
],
6272
]
6373
],
6474
// ...
@@ -114,6 +124,13 @@ A simple job that logs received messages would look like::
114124
*/
115125
public static $maxAttempts = 3;
116126

127+
/**
128+
* Whether there should be only one instance of a job on the queue at a time. (optional property)
129+
*
130+
* @var bool
131+
*/
132+
public static $shouldBeUnique = false;
133+
117134
public function execute(Message $message): ?string
118135
{
119136
$id = $message->getArgument('id');
@@ -154,6 +171,12 @@ Job Properties:
154171
provided, this value will override the value provided in the worker command
155172
line option ``--max-attempts``. If a value is not provided by the job or by
156173
the command line option, the job may be requeued an infinite number of times.
174+
- ``shouldBeUnique``: If ``true``, only one instance of the job, identified by
175+
it's class, method, and data, will be allowed to be present on the queue at a
176+
time. Subsequent pushes will be silently dropped. This is useful for
177+
idempotent operations where consecutive job executions have no benefit. For
178+
example, refreshing calculated data. If ``true``, the ``uniqueCache``
179+
configuration must be set.
157180

158181
Queueing
159182
--------

src/Command/JobCommand.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
namespace Cake\Queue\Command;
1818

1919
use Bake\Command\SimpleBakeCommand;
20+
use Cake\Console\Arguments;
2021
use Cake\Console\ConsoleOptionParser;
2122

2223
class JobCommand extends SimpleBakeCommand
@@ -47,6 +48,20 @@ public function template(): string
4748
return 'Cake/Queue.job';
4849
}
4950

51+
/**
52+
* @inheritDoc
53+
*/
54+
public function templateData(Arguments $arguments): array
55+
{
56+
$parentData = parent::templateData($arguments);
57+
58+
$data = [
59+
'isUnique' => $arguments->getOption('unique'),
60+
];
61+
62+
return array_merge($parentData, $data);
63+
}
64+
5065
/**
5166
* Gets the option parser instance and configures it.
5267
*
@@ -61,6 +76,11 @@ public function buildOptionParser(ConsoleOptionParser $parser): ConsoleOptionPar
6176
->setDescription('Bake a queue job class.')
6277
->addArgument('name', [
6378
'help' => 'The name of the queue job class to create.',
79+
])
80+
->addOption('unique', [
81+
'help' => 'Whether there should be only one instance of a job on the queue at a time.',
82+
'boolean' => true,
83+
'default' => false,
6484
]);
6585
}
6686
}

src/Command/WorkerCommand.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Cake\Log\Log;
2626
use Cake\Queue\Consumption\LimitAttemptsExtension;
2727
use Cake\Queue\Consumption\LimitConsumedMessagesExtension;
28+
use Cake\Queue\Consumption\RemoveUniqueJobIdFromCacheExtension;
2829
use Cake\Queue\Listener\FailedJobsListener;
2930
use Cake\Queue\Queue\Processor;
3031
use Cake\Queue\QueueManager;
@@ -132,6 +133,7 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
132133
$extensions = [
133134
new LoggerExtension($logger),
134135
$limitAttempsExtension,
136+
new RemoveUniqueJobIdFromCacheExtension('Cake/Queue.queueUnique'),
135137
];
136138

137139
if (!is_null($args->getOption('max-jobs'))) {
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Cake\Queue\Consumption;
5+
6+
use Cake\Cache\Cache;
7+
use Cake\Queue\Job\Message;
8+
use Cake\Queue\QueueManager;
9+
use Enqueue\Consumption\Context\MessageResult;
10+
use Enqueue\Consumption\MessageResultExtensionInterface;
11+
12+
class RemoveUniqueJobIdFromCacheExtension implements MessageResultExtensionInterface
13+
{
14+
/**
15+
* Cache engine name.
16+
*
17+
* @var string
18+
*/
19+
protected $cache;
20+
21+
/**
22+
* @param string $cache Cache engine name.
23+
* @return void
24+
*/
25+
public function __construct(string $cache)
26+
{
27+
$this->cache = $cache;
28+
}
29+
30+
/**
31+
* @param \Enqueue\Consumption\Context\MessageResult $context The result of the message after it was processed.
32+
* @return void
33+
*/
34+
public function onResult(MessageResult $context): void
35+
{
36+
$message = $context->getMessage();
37+
38+
$jobMessage = new Message($message, $context->getContext());
39+
40+
[$class, $method] = $jobMessage->getTarget();
41+
42+
/** @psalm-suppress InvalidPropertyFetch */
43+
if (empty($class::$shouldBeUnique)) {
44+
return;
45+
}
46+
47+
$data = $jobMessage->getArgument();
48+
49+
$uniqueId = QueueManager::getUniqueId($class, $method, $data);
50+
51+
Cache::delete($uniqueId, $this->cache);
52+
}
53+
}

src/Job/Message.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public function getCallable()
117117
/**
118118
* Get the target class and method.
119119
*
120-
* @return array
120+
* @return array{string, string}
121+
* @psalm-return array{class-string, string}
121122
*/
122123
public function getTarget(): array
123124
{
@@ -165,6 +166,7 @@ public function getMaxAttempts(): ?int
165166

166167
$class = $target[0];
167168

169+
/** @psalm-suppress InvalidPropertyFetch */
168170
return $class::$maxAttempts ?? null;
169171
}
170172

src/Listener/FailedJobsListener.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
namespace Cake\Queue\Listener;
1818

19-
use Cake\Event\EventInterface;
2019
use Cake\Event\EventListenerInterface;
2120
use Cake\ORM\Exception\PersistenceFailedException;
2221
use Cake\ORM\Locator\LocatorAwareTrait;
@@ -39,10 +38,10 @@ public function implementedEvents(): array
3938
}
4039

4140
/**
42-
* @param \Cake\Event\EventInterface $event EventInterface
41+
* @param object $event EventInterface.
4342
* @return void
4443
*/
45-
public function storeFailedJob(EventInterface $event): void
44+
public function storeFailedJob($event): void
4645
{
4746
/** @var \Cake\Queue\Job\Message $jobMessage */
4847
$jobMessage = $event->getSubject();

src/QueueManager.php

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
namespace Cake\Queue;
1818

1919
use BadMethodCallException;
20+
use Cake\Cache\Cache;
2021
use Cake\Core\App;
2122
use Cake\Log\Log;
2223
use Enqueue\Client\Message as ClientMessage;
@@ -119,6 +120,16 @@ public static function setConfig($key, $config = null): void
119120
}
120121
}
121122

123+
if (!empty($config['uniqueCache'])) {
124+
$cacheDefaults = [
125+
'duration' => '+24 hours',
126+
];
127+
128+
$cacheConfig = array_merge($cacheDefaults, $config['uniqueCache']);
129+
130+
Cache::setConfig('Cake/Queue.queueUnique', $cacheConfig);
131+
}
132+
122133
/** @psalm-suppress InvalidPropertyAssignmentValue */
123134
static::$_config[$key] = $config;
124135
}
@@ -209,7 +220,33 @@ public static function push($className, array $data = [], array $options = []):
209220

210221
$name = $options['config'] ?? 'default';
211222

212-
$config = static::getConfig($name);
223+
$config = static::getConfig($name) + [
224+
'logger' => null,
225+
];
226+
227+
$logger = $config['logger'] ? Log::engine($config['logger']) : null;
228+
229+
/** @psalm-suppress InvalidPropertyFetch */
230+
if (!empty($class::$shouldBeUnique)) {
231+
if (!Cache::getConfig('Cake/Queue.queueUnique')) {
232+
throw new InvalidArgumentException(
233+
"$class::\$shouldBeUnique is set to `true` but `uniqueCache` configuration is missing."
234+
);
235+
}
236+
237+
$uniqueId = static::getUniqueId($class, $method, $data);
238+
239+
if (Cache::read($uniqueId, 'Cake/Queue.queueUnique')) {
240+
if ($logger) {
241+
$logger->debug(
242+
"An identical instance of $class already exists on the queue. This push will be ignored."
243+
);
244+
}
245+
246+
return;
247+
}
248+
}
249+
213250
$queue = $options['queue'] ?? $config['queue'] ?? 'default';
214251

215252
$message = new ClientMessage([
@@ -237,5 +274,31 @@ public static function push($className, array $data = [], array $options = []):
237274

238275
$client = static::engine($name);
239276
$client->sendEvent($queue, $message);
277+
278+
/** @psalm-suppress InvalidPropertyFetch */
279+
if (!empty($class::$shouldBeUnique)) {
280+
$uniqueId = static::getUniqueId($class, $method, $data);
281+
282+
Cache::add($uniqueId, true, 'Cake/Queue.queueUnique');
283+
}
284+
}
285+
286+
/**
287+
* @param string $class Class name
288+
* @param string $method Method name
289+
* @param array $data Message data
290+
* @return string
291+
*/
292+
public static function getUniqueId(string $class, string $method, array $data): string
293+
{
294+
sort($data);
295+
296+
$hashInput = implode([
297+
$class,
298+
$method,
299+
json_encode($data),
300+
]);
301+
302+
return hash('md5', $hashInput);
240303
}
241304
}

templates/bake/job.twig

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ class {{ name }}Job implements JobInterface
3434
*/
3535
public static $maxAttempts = 3;
3636
37+
{% if isUnique %}
38+
/**
39+
* Whether there should be only one instance of a job on the queue at a time. (optional property)
40+
*
41+
* @var bool
42+
*/
43+
public static $shouldBeUnique = true;
44+
45+
{% endif %}
3746
/**
3847
* Executes logic for {{ name }}Job
3948
*

tests/TestCase/Consumption/LimitAttemptsExtensionTest.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static function dropConfigs()
4040

4141
public function testMessageShouldBeRequeuedIfMaxAttemptsIsNotSet()
4242
{
43-
$consume = $this->setupQeueue();
43+
$consume = $this->setupQueue();
4444

4545
QueueManager::push(RequeueJob::class);
4646

@@ -52,7 +52,7 @@ public function testMessageShouldBeRequeuedIfMaxAttemptsIsNotSet()
5252

5353
public function testFailedEventIsFiredWhenMaxAttemptsIsExceeded()
5454
{
55-
$consume = $this->setupQeueue();
55+
$consume = $this->setupQueue();
5656

5757
QueueManager::push(MaxAttemptsIsThreeJob::class, []);
5858

@@ -63,7 +63,7 @@ public function testFailedEventIsFiredWhenMaxAttemptsIsExceeded()
6363

6464
public function testMessageShouldBeRequeuedUntilMaxAttemptsIsReached()
6565
{
66-
$consume = $this->setupQeueue();
66+
$consume = $this->setupQueue();
6767

6868
QueueManager::push(MaxAttemptsIsThreeJob::class, []);
6969

@@ -74,7 +74,7 @@ public function testMessageShouldBeRequeuedUntilMaxAttemptsIsReached()
7474

7575
public function testMessageShouldBeRequeuedIfGlobalMaxAttemptsIsNotSet()
7676
{
77-
$consume = $this->setupQeueue();
77+
$consume = $this->setupQueue();
7878

7979
QueueManager::push(RequeueJob::class);
8080

@@ -86,7 +86,7 @@ public function testMessageShouldBeRequeuedIfGlobalMaxAttemptsIsNotSet()
8686

8787
public function testMessageShouldBeRequeuedUntilGlobalMaxAttemptsIsReached()
8888
{
89-
$consume = $this->setupQeueue([3]);
89+
$consume = $this->setupQueue([3]);
9090

9191
QueueManager::push(MaxAttemptsIsThreeJob::class, ['succeedAt' => 10]);
9292

@@ -95,7 +95,7 @@ public function testMessageShouldBeRequeuedUntilGlobalMaxAttemptsIsReached()
9595
$this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 3);
9696
}
9797

98-
protected function setupQeueue($extensionArgs = [])
98+
protected function setupQueue($extensionArgs = [])
9999
{
100100
Log::setConfig('debug', [
101101
'className' => 'Array',

0 commit comments

Comments
 (0)