Skip to content

Commit a34e816

Browse files
author
Kevin Lopez
committed
fix: rethrow runtime errors during queue dispatch
1 parent 6d54b91 commit a34e816

5 files changed

Lines changed: 186 additions & 38 deletions

File tree

src/Commands/QueueWork.php

Lines changed: 62 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -314,40 +314,19 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
314314
);
315315

316316
try {
317-
// Load payload metadata
318-
$payloadMetadata = PayloadMetadata::fromArray($payload['metadata'] ?? []);
319-
320-
// Renew lock if needed
321-
$this->renewLock($payloadMetadata);
322-
323-
$class = $config->resolveJobClass($payload['job']);
324-
$job = new $class($payload['data']);
325-
$job->process();
326-
327-
// Mark as done
328-
service('queue')->done($work);
329-
330-
// Emit job processing completed event
331-
QueueEventManager::jobProcessingCompleted(
332-
handler: service('queue')->name(),
333-
queue: $work->queue,
334-
job: $work,
335-
processingTime: microtime(true) - $startTime,
336-
metadata: [
337-
'worker_id' => $this->workerId,
338-
],
339-
);
340-
341-
CLI::write('The processing of this job was successful', 'green');
342-
343-
// Check chained jobs
344-
$this->processNextJobInChain($payloadMetadata);
345-
} catch (Throwable $err) {
346-
if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) {
347-
// Schedule for later
348-
service('queue')->later($work, $retryAfter ?? $job->getRetryAfter());
349-
} else {
350-
// Mark as failed
317+
try {
318+
// Load payload metadata
319+
$payloadMetadata = PayloadMetadata::fromArray($payload['metadata'] ?? []);
320+
321+
// Renew lock if needed
322+
$this->renewLock($payloadMetadata);
323+
324+
$class = $config->resolveJobClass($payload['job']);
325+
$job = new $class($payload['data']);
326+
} catch (Exception $err) {
327+
// Mark dispatch-time exceptions as failed jobs, but allow
328+
// PHP runtime errors to escape so the worker process can be
329+
// recycled with fresh runtime state.
351330
QueueEventManager::jobFailed(
352331
handler: service('queue')->name(),
353332
queue: $work->queue,
@@ -360,11 +339,58 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
360339
);
361340

362341
service('queue')->failed($work, $err, $config->keepFailedJobs);
342+
CLI::write('The processing of this job failed', 'red');
343+
344+
return;
345+
}
346+
347+
try {
348+
$job->process();
349+
350+
// Mark as done
351+
service('queue')->done($work);
352+
353+
// Emit job processing completed event
354+
QueueEventManager::jobProcessingCompleted(
355+
handler: service('queue')->name(),
356+
queue: $work->queue,
357+
job: $work,
358+
processingTime: microtime(true) - $startTime,
359+
metadata: [
360+
'worker_id' => $this->workerId,
361+
],
362+
);
363+
364+
CLI::write('The processing of this job was successful', 'green');
365+
366+
// Check chained jobs
367+
$this->processNextJobInChain($payloadMetadata);
368+
} catch (Throwable $err) {
369+
if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) {
370+
// Schedule for later
371+
service('queue')->later($work, $retryAfter ?? $job->getRetryAfter());
372+
} else {
373+
// Mark as failed
374+
QueueEventManager::jobFailed(
375+
handler: service('queue')->name(),
376+
queue: $work->queue,
377+
job: $work,
378+
exception: $err,
379+
processingTime: microtime(true) - $startTime,
380+
metadata: [
381+
'worker_id' => $this->workerId,
382+
],
383+
);
384+
385+
service('queue')->failed($work, $err, $config->keepFailedJobs);
386+
}
387+
CLI::write('The processing of this job failed', 'red');
363388
}
364-
CLI::write('The processing of this job failed', 'red');
365389
} finally {
366390
// Remove lock if needed
367-
$this->clearLock($payloadMetadata);
391+
if ($payloadMetadata instanceof PayloadMetadata) {
392+
$this->clearLock($payloadMetadata);
393+
}
368394

369395
timer()->stop('work');
370396
CLI::write(sprintf('It took: %s sec', timer()->getElapsedTime('work')) . PHP_EOL, 'cyan');

tests/Commands/QueueWorkTest.php

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use CodeIgniter\I18n\Time;
1919
use CodeIgniter\Queue\Models\QueueJobModel;
2020
use CodeIgniter\Test\Filters\CITestStreamFilter;
21+
use Error;
2122
use Tests\Support\CLITestCase;
2223

2324
/**
@@ -86,6 +87,67 @@ public function testRunWithQueueFailed(): void
8687
$this->assertSame('No job available. Stopping.', $this->getLine(7));
8788
}
8889

90+
public function testRunRethrowsRuntimeErrorDuringJobDispatch(): void
91+
{
92+
Time::setTestNow('2023-12-19 14:15:16');
93+
94+
fake(QueueJobModel::class, [
95+
'connection' => 'database',
96+
'queue' => 'test',
97+
'payload' => ['job' => 'constructor-runtime-error', 'data' => ['key' => 'value']],
98+
'priority' => 'default',
99+
'status' => 0,
100+
'attempts' => 0,
101+
'available_at' => 1_702_977_074,
102+
]);
103+
104+
CITestStreamFilter::registration();
105+
CITestStreamFilter::addOutputFilter();
106+
$outputBufferLevel = ob_get_level();
107+
108+
try {
109+
command('queue:work test sleep 1 --stop-when-empty');
110+
111+
$this->fail('Expected dispatch-time runtime errors to bubble out of queue:work.');
112+
} catch (Error $error) {
113+
$this->assertSame('Runtime error during job construction.', $error->getMessage());
114+
} finally {
115+
while (ob_get_level() > $outputBufferLevel) {
116+
ob_end_clean();
117+
}
118+
119+
CITestStreamFilter::removeOutputFilter();
120+
}
121+
}
122+
123+
public function testRunRecordsTypeErrorThrownDuringJobProcessing(): void
124+
{
125+
Time::setTestNow('2023-12-19 14:15:16');
126+
127+
fake(QueueJobModel::class, [
128+
'connection' => 'database',
129+
'queue' => 'test',
130+
'payload' => ['job' => 'process-type-error', 'data' => ['key' => 'value']],
131+
'priority' => 'default',
132+
'status' => 0,
133+
'attempts' => 0,
134+
'available_at' => 1_702_977_074,
135+
]);
136+
137+
CITestStreamFilter::registration();
138+
CITestStreamFilter::addOutputFilter();
139+
140+
$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
141+
$this->parseOutput(CITestStreamFilter::$buffer);
142+
143+
CITestStreamFilter::removeOutputFilter();
144+
145+
$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
146+
$this->assertSame('Starting a new job: process-type-error, with ID: 1', $this->getLine(3));
147+
$this->assertSame('The processing of this job failed', $this->getLine(4));
148+
$this->assertSame('No job available. Stopping.', $this->getLine(7));
149+
}
150+
89151
public function testRunWithQueueSucceed(): void
90152
{
91153
Time::setTestNow('2023-12-19 14:15:16');

tests/_support/Config/Queue.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
use CodeIgniter\Queue\Handlers\PredisHandler;
1919
use CodeIgniter\Queue\Handlers\RabbitMQHandler;
2020
use CodeIgniter\Queue\Handlers\RedisHandler;
21+
use Tests\Support\Jobs\ConstructorError;
2122
use Tests\Support\Jobs\Failure;
23+
use Tests\Support\Jobs\ProcessTypeError;
2224
use Tests\Support\Jobs\Success;
2325

2426
class Queue extends BaseQueue
@@ -112,7 +114,9 @@ class Queue extends BaseQueue
112114
* Your jobs handlers.
113115
*/
114116
public array $jobHandlers = [
115-
'success' => Success::class,
116-
'failure' => Failure::class,
117+
'success' => Success::class,
118+
'failure' => Failure::class,
119+
'constructor-runtime-error' => ConstructorError::class,
120+
'process-type-error' => ProcessTypeError::class,
117121
];
118122
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* This file is part of CodeIgniter Queue.
7+
*
8+
* (c) CodeIgniter Foundation <admin@codeigniter.com>
9+
*
10+
* For the full copyright and license information, please view
11+
* the LICENSE file that was distributed with this source code.
12+
*/
13+
14+
namespace Tests\Support\Jobs;
15+
16+
use CodeIgniter\Queue\BaseJob;
17+
use Error;
18+
19+
class ConstructorError extends BaseJob
20+
{
21+
public function __construct(array $data)
22+
{
23+
parent::__construct($data);
24+
25+
throw new Error('Runtime error during job construction.');
26+
}
27+
28+
public function process(): void
29+
{
30+
}
31+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* This file is part of CodeIgniter Queue.
7+
*
8+
* (c) CodeIgniter Foundation <admin@codeigniter.com>
9+
*
10+
* For the full copyright and license information, please view
11+
* the LICENSE file that was distributed with this source code.
12+
*/
13+
14+
namespace Tests\Support\Jobs;
15+
16+
use CodeIgniter\Queue\BaseJob;
17+
use TypeError;
18+
19+
class ProcessTypeError extends BaseJob
20+
{
21+
public function process(): void
22+
{
23+
throw new TypeError('Runtime type error during job processing.');
24+
}
25+
}

0 commit comments

Comments
 (0)