Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Worker
const EXIT_SUCCESS = 0;
const EXIT_ERROR = 1;
const EXIT_MEMORY_LIMIT = 12;
const EXIT_CACHE_FAILED = 13;

/**
* The name of the worker.
Expand Down Expand Up @@ -92,13 +93,27 @@ class Worker
*/
public $paused = false;

/**
* Indicates if the worker restart cache retrieval fails.
*
* @var bool
*/
public $cacheFailed = false;

/**
* The callbacks used to pop jobs from queues.
*
* @var callable[]
*/
protected static $popCallbacks = [];

/**
* The custom exit code to be used when cache retrieval fails.
*
* @var int|null
*/
public static $cacheFailedExitCode;

/**
* The custom exit code to be used when memory is exceeded.
*
Expand Down Expand Up @@ -283,6 +298,7 @@ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $que
{
return ! ((($this->isDownForMaintenance)() && ! $options->force) ||
$this->paused ||
$this->cacheFailed ||
$this->events->until(new Looping($connectionName, $queue)) === false);
}

Expand Down Expand Up @@ -315,6 +331,7 @@ protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $startT
return match (true) {
$this->shouldQuit => static::EXIT_SUCCESS,
$this->memoryExceeded($options->memory) => static::$memoryExceededExitCode ?? static::EXIT_MEMORY_LIMIT,
$this->cacheFailed => static::$cacheFailedExitCode ?? static::EXIT_CACHE_FAILED,
$this->queueShouldRestart($lastRestart) => static::EXIT_SUCCESS,
$options->stopWhenEmpty && is_null($job) => static::EXIT_SUCCESS,
$options->maxTime && hrtime(true) / 1e9 - $startTime >= $options->maxTime => static::EXIT_SUCCESS,
Expand Down Expand Up @@ -733,7 +750,12 @@ protected function queueShouldRestart($lastRestart)
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
try {
return $this->cache->get('illuminate:queue:restart');
} catch (Throwable $e) {
$this->exceptions->report($e);
$this->cacheFailed = true;
}
}
}

Expand Down
36 changes: 36 additions & 0 deletions tests/Integration/Queue/WorkCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@

namespace Illuminate\Tests\Integration\Queue;

use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Cache\CacheManager;
use Illuminate\Cache\Repository;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\UniqueConstraintViolationException;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Foundation\Testing\DatabaseMigrations;
use Illuminate\Queue\Worker;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Exceptions;
use Illuminate\Support\Facades\Queue;
use Mockery as m;
use Orchestra\Testbench\Attributes\WithMigration;
use RuntimeException;

Expand Down Expand Up @@ -184,6 +189,37 @@ public function testMemoryExitCode()
Worker::$memoryExceededExitCode = null;
}

public function testCacheErrorExitCode()
{
$this->markTestSkippedWhenUsingQueueDrivers(['redis', 'beanstalkd']);

Worker::$cacheFailedExitCode = 100;

Queue::push(new FirstJob);

Cache::swap($cacheManager = m::mock(CacheManager::class)->makePartial());

$repository = m::mock(Repository::class);

$cacheManager->shouldReceive('driver')
->twice()
->andReturn($repository);

$repository->shouldReceive('get')
->once()
->with('illuminate:queue:restart')
->andThrow(new Exception('Cache read failed'));

$this->artisan('queue:work', [
'--daemon' => true,
])->assertExitCode(100);

$this->assertSame(1, Queue::size());
$this->assertFalse(FirstJob::$ran);

Worker::$cacheFailedExitCode = null;
}

public function testFailedJobListenerOnlyRunsOnce()
{
$this->markTestSkippedWhenUsingQueueDrivers(['redis', 'beanstalkd']);
Expand Down
30 changes: 25 additions & 5 deletions tests/Queue/QueueWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
namespace Illuminate\Tests\Queue;

use Exception;
use Illuminate\Cache\Repository;
use Illuminate\Container\Container;
use Illuminate\Contracts\Cache\Store;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Contracts\Queue\Job as QueueJobContract;
Expand Down Expand Up @@ -387,6 +389,29 @@ public function testWorkerPicksJobUsingCustomCallbacks()
Worker::popUsing('myworker', null);
}

public function testWorkerHandlesCacheFailed()
{
$workerOptions = new WorkerOptions();
$workerOptions->stopWhenEmpty = true;

$mockStore = m::mock(Store::class);

$mockStore->expects('get')
->with('illuminate:queue:restart')
->andThrow(new Exception('Cache read failed'));

$worker = $this->getWorker('default', ['queue' => [
$firstJob = new WorkerFakeJob,
]]);

$worker->setCache(new Repository($mockStore));

$worker->daemon('default', 'queue', $workerOptions);

$this->assertFalse($firstJob->fired);
$this->assertTrue($worker->cacheFailed);
}

public function testWorkerStartingIsDispatched()
{
$workerOptions = new WorkerOptions();
Expand Down Expand Up @@ -457,11 +482,6 @@ public function stop($status = 0, $options = null)
return $status;
}

public function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
{
return ! ($this->isDownForMaintenance)();
}

public function memoryExceeded($memoryLimit)
{
return $this->stopOnMemoryExceeded;
Expand Down