Skip to content

Commit 47a75b1

Browse files
committed
Merge branch 'master' into cake5-merge
# Conflicts: # .github/workflows/ci.yml
2 parents eb8f2d6 + 172ec7c commit 47a75b1

File tree

18 files changed

+180
-68
lines changed

18 files changed

+180
-68
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ The plugin consists of a CakePHP shell wrapper and Queueing libraries for the [p
1111

1212
## Installation
1313

14-
You can install this plugin into your CakePHP application using [Composer](http://getcomposer.org).
14+
You can install this plugin into your CakePHP application using [Composer](https://getcomposer.org).
1515

1616
Run the following command
1717
```sh

docs.Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Generate the HTML output.
2-
FROM markstory/cakephp-docs-builder as builder
2+
FROM ghcr.io/cakephp/docs-builder as builder
33

44
COPY docs /data/docs
55

@@ -8,7 +8,7 @@ RUN cd /data/docs-builder && \
88
make website LANGS="en" SOURCE=/data/docs DEST=/data/website/
99

1010
# Build a small nginx container with just the static site in it.
11-
FROM markstory/cakephp-docs-builder:runtime as runtime
11+
FROM ghcr.io/cakephp/docs-builder:runtime as runtime
1212

1313
# Configure search index script
1414
ENV LANGS="en"

docs/en/index.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ A simple job that logs received messages would look like::
118118
use LogTrait;
119119

120120
/**
121-
* The maximum number of times the job may be attempted.
121+
* The maximum number of times the job may be attempted. (optional property)
122122
*
123123
* @var int|null
124124
*/

src/Command/JobCommand.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,11 @@ public function templateData(Arguments $arguments): array
5555
{
5656
$parentData = parent::templateData($arguments);
5757

58+
$maxAttempts = $arguments->getOption('max-attempts');
59+
5860
$data = [
5961
'isUnique' => $arguments->getOption('unique'),
62+
'maxAttempts' => $maxAttempts ? (int)$maxAttempts : null,
6063
];
6164

6265
return array_merge($parentData, $data);
@@ -77,6 +80,10 @@ public function buildOptionParser(ConsoleOptionParser $parser): ConsoleOptionPar
7780
->addArgument('name', [
7881
'help' => 'The name of the queue job class to create.',
7982
])
83+
->addOption('max-attempts', [
84+
'help' => 'The maximum number of times the job may be attempted.',
85+
'default' => null,
86+
])
8087
->addOption('unique', [
8188
'help' => 'Whether there should be only one instance of a job on the queue at a time.',
8289
'boolean' => true,

src/Command/WorkerCommand.php

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,12 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
129129

130130
$limitAttempsExtension->getEventManager()->on(new FailedJobsListener());
131131

132+
$configKey = (string)$args->getOption('config');
133+
$config = QueueManager::getConfig($configKey);
134+
132135
$extensions = [
133136
new LoggerExtension($logger),
134137
$limitAttempsExtension,
135-
new RemoveUniqueJobIdFromCacheExtension('Cake/Queue.queueUnique'),
136138
];
137139

138140
if (!is_null($args->getOption('max-jobs'))) {
@@ -145,6 +147,10 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger):
145147
$extensions[] = new LimitConsumptionTimeExtension($endTime);
146148
}
147149

150+
if (isset($config['uniqueCacheKey'])) {
151+
$extensions[] = new RemoveUniqueJobIdFromCacheExtension($config['uniqueCacheKey']);
152+
}
153+
148154
return new ChainExtension($extensions);
149155
}
150156

@@ -171,16 +177,16 @@ protected function getLogger(Arguments $args): LoggerInterface
171177
*/
172178
public function execute(Arguments $args, ConsoleIo $io): int
173179
{
174-
$logger = $this->getLogger($args);
175-
$processor = new Processor($logger, $this->container);
176-
$extension = $this->getQueueExtension($args, $logger);
177-
178180
$config = (string)$args->getOption('config');
179181
if (!Configure::check(sprintf('Queue.%s', $config))) {
180182
$io->error(sprintf('Configuration key "%s" was not found', $config));
181183
$this->abort();
182184
}
183185

186+
$logger = $this->getLogger($args);
187+
$processor = new Processor($logger, $this->container);
188+
$extension = $this->getQueueExtension($args, $logger);
189+
184190
$hasListener = Configure::check(sprintf('Queue.%s.listener', $config));
185191
if ($hasListener) {
186192
$listenerClassName = Configure::read(sprintf('Queue.%s.listener', $config));
@@ -197,7 +203,7 @@ public function execute(Arguments $args, ConsoleIo $io): int
197203
$queue = $args->getOption('queue')
198204
? (string)$args->getOption('queue')
199205
: Configure::read("Queue.{$config}.queue", 'default');
200-
$processorName = $args->getOption('processor') ? (string)$args->getOption('processor') : null;
206+
$processorName = $args->getOption('processor') ? (string)$args->getOption('processor') : 'default';
201207

202208
$client->bindTopic($queue, $processor, $processorName);
203209
$client->consume($extension);

src/Consumption/LimitAttemptsExtension.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,11 @@ public function onResult(MessageResult $context): void
6464
$attemptNumber = $message->getProperty(self::ATTEMPTS_PROPERTY, 0) + 1;
6565

6666
if ($attemptNumber >= $maxAttempts) {
67-
$originalResult = $context->getResult();
68-
6967
$context->changeResult(
7068
Result::reject(sprintf('The maximum number of %d allowed attempts was reached.', $maxAttempts))
7169
);
7270

73-
$exception = $originalResult instanceof Result ? $originalResult->getReason() : null;
71+
$exception = (string)$message->getProperty('jobException');
7472

7573
$this->dispatchEvent(
7674
'Consumption.LimitAttemptsExtension.failed',

src/Queue/Processor.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
use Cake\Queue\Job\Message;
2222
use Enqueue\Consumption\Result;
2323
use Error;
24-
use Exception;
2524
use Interop\Queue\Context;
2625
use Interop\Queue\Message as QueueMessage;
2726
use Interop\Queue\Processor as InteropProcessor;
2827
use Psr\Log\LoggerInterface;
2928
use Psr\Log\NullLogger;
3029
use RuntimeException;
30+
use Throwable;
3131

3232
class Processor implements InteropProcessor
3333
{
@@ -83,18 +83,20 @@ public function process(QueueMessage $message, Context $context): string|object
8383

8484
try {
8585
$response = $this->processMessage($jobMessage);
86-
} catch (Exception $e) {
86+
} catch (Throwable $e) {
87+
$message->setProperty('jobException', $e);
88+
8789
$this->logger->debug(sprintf('Message encountered exception: %s', $e->getMessage()));
8890
$this->dispatchEvent('Processor.message.exception', [
8991
'message' => $jobMessage,
9092
'exception' => $e,
9193
]);
9294

93-
return Result::requeue(sprintf('Exception occurred while processing message: %s', (string)$e));
95+
return Result::requeue('Exception occurred while processing message');
9496
}
9597

9698
if ($response === InteropProcessor::ACK) {
97-
$this->logger->debug('Message processed sucessfully');
99+
$this->logger->debug('Message processed successfully');
98100
$this->dispatchEvent('Processor.message.success', ['message' => $jobMessage]);
99101

100102
return InteropProcessor::ACK;

src/QueueManager.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ public static function setConfig(string|array $key, ?array $config = null): void
8989
}
9090

9191
return;
92+
} elseif (is_array($key)) {
93+
throw new LogicException('If config is not null, key must be a string.');
9294
}
9395

9496
if (isset(static::$_config[$key])) {
@@ -127,7 +129,9 @@ public static function setConfig(string|array $key, ?array $config = null): void
127129

128130
$cacheConfig = array_merge($cacheDefaults, $config['uniqueCache']);
129131

130-
Cache::setConfig('Cake/Queue.queueUnique', $cacheConfig);
132+
$config['uniqueCacheKey'] = "Cake/Queue.queueUnique.{$key}";
133+
134+
Cache::setConfig($config['uniqueCacheKey'], $cacheConfig);
131135
}
132136

133137
/** @psalm-suppress InvalidPropertyAssignmentValue */
@@ -228,15 +232,15 @@ public static function push(string|array $className, array $data = [], array $op
228232

229233
/** @psalm-suppress InvalidPropertyFetch */
230234
if (!empty($class::$shouldBeUnique)) {
231-
if (!Cache::getConfig('Cake/Queue.queueUnique')) {
235+
if (empty($config['uniqueCache'])) {
232236
throw new InvalidArgumentException(
233237
"$class::\$shouldBeUnique is set to `true` but `uniqueCache` configuration is missing."
234238
);
235239
}
236240

237241
$uniqueId = static::getUniqueId($class, $method, $data);
238242

239-
if (Cache::read($uniqueId, 'Cake/Queue.queueUnique')) {
243+
if (Cache::read($uniqueId, $config['uniqueCacheKey'])) {
240244
if ($logger) {
241245
$logger->debug(
242246
"An identical instance of $class already exists on the queue. This push will be ignored."
@@ -279,7 +283,7 @@ public static function push(string|array $className, array $data = [], array $op
279283
if (!empty($class::$shouldBeUnique)) {
280284
$uniqueId = static::getUniqueId($class, $method, $data);
281285

282-
Cache::add($uniqueId, true, 'Cake/Queue.queueUnique');
286+
Cache::add($uniqueId, true, $config['uniqueCacheKey']);
283287
}
284288
}
285289

templates/bake/job.twig

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ use Interop\Queue\Processor;
2727
*/
2828
class {{ name }}Job implements JobInterface
2929
{
30+
{% if maxAttempts %}
3031
/**
3132
* The maximum number of times the job may be attempted.
3233
*
3334
* @var int|null
3435
*/
35-
public static $maxAttempts = 3;
36+
public static $maxAttempts = {{ maxAttempts }};
3637
38+
{% endif %}
3739
{% if isUnique %}
3840
/**
3941
* Whether there should be only one instance of a job on the queue at a time. (optional property)

tests/TestCase/Command/WorkerCommandTest.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,4 +293,31 @@ public function testQueueProcessesJobWithDIService()
293293

294294
$this->assertDebugLogContains('Debug job was run with service infotext');
295295
}
296+
297+
/**
298+
* Test that queue will process when a unique cache is configured.
299+
*
300+
* @runInSeparateProcess
301+
*/
302+
public function testQueueProcessesWithUniqueCacheConfigured()
303+
{
304+
$config = [
305+
'queue' => 'default',
306+
'url' => 'file:///' . TMP . DS . 'queue',
307+
'receiveTimeout' => 100,
308+
'uniqueCache' => [
309+
'engine' => 'File',
310+
],
311+
];
312+
Configure::write('Queue', ['default' => $config]);
313+
314+
Log::setConfig('debug', [
315+
'className' => 'Array',
316+
'levels' => ['notice', 'info', 'debug'],
317+
]);
318+
319+
$this->exec('queue worker --max-jobs=1 --logger=debug --verbose');
320+
321+
$this->assertDebugLogContains('Consumption has started');
322+
}
296323
}

0 commit comments

Comments
 (0)