Skip to content

Commit 01ec8f2

Browse files
derekmdfideloper
andauthored
Attempt to gracefully exit queue:worker (#32992)
App::terminating() callbacks don't get called in some cases when the queue:worker is shutdown: * memory limit exceeded * queue:restart has been called * database connection is lost * --stop-when-empty option is enabled * on SIGTERM interruption Instead of immediately calling exit(), bubble up the exit code and return it in WorkCommand@handle(). Async SIGALRM interruptions still calls exit() and won't invoke App::terminating(). Co-authored-by: Chris Fidao <[email protected]>
1 parent 7dc59ce commit 01ec8f2

File tree

4 files changed

+179
-30
lines changed

4 files changed

+179
-30
lines changed

src/Illuminate/Queue/Console/WorkCommand.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public function __construct(Worker $worker, Cache $cache)
7373
/**
7474
* Execute the console command.
7575
*
76-
* @return void
76+
* @return int|null
7777
*/
7878
public function handle()
7979
{
@@ -94,7 +94,7 @@ public function handle()
9494
// connection being run for the queue operation currently being executed.
9595
$queue = $this->getQueue($connection);
9696

97-
$this->runWorker(
97+
return $this->runWorker(
9898
$connection, $queue
9999
);
100100
}
@@ -104,7 +104,7 @@ public function handle()
104104
*
105105
* @param string $connection
106106
* @param string $queue
107-
* @return array
107+
* @return int|null
108108
*/
109109
protected function runWorker($connection, $queue)
110110
{

src/Illuminate/Queue/Worker.php

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ class Worker
1919
{
2020
use DetectsLostConnections;
2121

22+
const EXIT_SUCCESS = 0;
23+
const EXIT_ERROR = 1;
24+
const EXIT_MEMORY_LIMIT = 12;
25+
2226
/**
2327
* The name of the worker.
2428
*
@@ -108,7 +112,7 @@ public function __construct(QueueManager $manager,
108112
* @param string $connectionName
109113
* @param string $queue
110114
* @param \Illuminate\Queue\WorkerOptions $options
111-
* @return void
115+
* @return int
112116
*/
113117
public function daemon($connectionName, $queue, WorkerOptions $options)
114118
{
@@ -123,7 +127,11 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
123127
// if it is we will just pause this worker for a given amount of time and
124128
// make sure we do not need to kill this worker process off completely.
125129
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
126-
$this->pauseWorker($options, $lastRestart);
130+
$status = $this->pauseWorker($options, $lastRestart);
131+
132+
if (! is_null($status)) {
133+
return $this->stop($status);
134+
}
127135

128136
continue;
129137
}
@@ -155,7 +163,11 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
155163
// Finally, we will check to see if we have exceeded our memory limits or if
156164
// the queue should restart based on other indications. If so, we'll stop
157165
// this worker and let whatever is "monitoring" it restart the process.
158-
$this->stopIfNecessary($options, $lastRestart, $job);
166+
$status = $this->stopIfNecessary($options, $lastRestart, $job);
167+
168+
if (! is_null($status)) {
169+
return $this->stop($status);
170+
}
159171
}
160172
}
161173

@@ -178,7 +190,7 @@ protected function registerTimeoutHandler($job, WorkerOptions $options)
178190
);
179191
}
180192

181-
$this->kill(1);
193+
$this->kill(static::EXIT_ERROR);
182194
});
183195

184196
pcntl_alarm(
@@ -228,33 +240,33 @@ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $que
228240
*
229241
* @param \Illuminate\Queue\WorkerOptions $options
230242
* @param int $lastRestart
231-
* @return void
243+
* @return int|null
232244
*/
233245
protected function pauseWorker(WorkerOptions $options, $lastRestart)
234246
{
235247
$this->sleep($options->sleep > 0 ? $options->sleep : 1);
236248

237-
$this->stopIfNecessary($options, $lastRestart);
249+
return $this->stopIfNecessary($options, $lastRestart);
238250
}
239251

240252
/**
241-
* Stop the process if necessary.
253+
* Determine the exit code to stop the process if necessary.
242254
*
243255
* @param \Illuminate\Queue\WorkerOptions $options
244256
* @param int $lastRestart
245257
* @param mixed $job
246-
* @return void
258+
* @return int|null
247259
*/
248260
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
249261
{
250262
if ($this->shouldQuit) {
251-
$this->stop();
263+
return static::EXIT_SUCCESS;
252264
} elseif ($this->memoryExceeded($options->memory)) {
253-
$this->stop(12);
265+
return static::EXIT_MEMORY_LIMIT;
254266
} elseif ($this->queueShouldRestart($lastRestart)) {
255-
$this->stop();
267+
return static::EXIT_SUCCESS;
256268
} elseif ($options->stopWhenEmpty && is_null($job)) {
257-
$this->stop();
269+
return static::EXIT_SUCCESS;
258270
}
259271
}
260272

@@ -647,13 +659,13 @@ public function memoryExceeded($memoryLimit)
647659
* Stop listening and bail out of the script.
648660
*
649661
* @param int $status
650-
* @return void
662+
* @return int
651663
*/
652664
public function stop($status = 0)
653665
{
654666
$this->events->dispatch(new WorkerStopping($status));
655667

656-
exit($status);
668+
return $status;
657669
}
658670

659671
/**
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
<?php
2+
3+
namespace Illuminate\Tests\Integration\Queue;
4+
5+
use Illuminate\Bus\Queueable;
6+
use Illuminate\Contracts\Queue\ShouldQueue;
7+
use Illuminate\Database\Schema\Blueprint;
8+
use Illuminate\Foundation\Bus\Dispatchable;
9+
use Orchestra\Testbench\TestCase;
10+
use Queue;
11+
12+
/**
13+
* @group integration
14+
*/
15+
class WorkCommandTest extends TestCase
16+
{
17+
protected function getEnvironmentSetUp($app)
18+
{
19+
$app['config']->set('app.debug', 'true');
20+
21+
$app['config']->set('database.default', 'testbench');
22+
$app['config']->set('database.connections.testbench', [
23+
'driver' => 'sqlite',
24+
'database' => ':memory:',
25+
'prefix' => '',
26+
]);
27+
28+
$app['db']->connection()->getSchemaBuilder()->create('jobs', function (Blueprint $table) {
29+
$table->bigIncrements('id');
30+
$table->string('queue');
31+
$table->longText('payload');
32+
$table->tinyInteger('attempts')->unsigned();
33+
$table->unsignedInteger('reserved_at')->nullable();
34+
$table->unsignedInteger('available_at');
35+
$table->unsignedInteger('created_at');
36+
$table->index(['queue', 'reserved_at']);
37+
});
38+
}
39+
40+
protected function tearDown(): void
41+
{
42+
$this->app['db']->connection()->getSchemaBuilder()->drop('jobs');
43+
44+
parent::tearDown();
45+
46+
FirstJob::$ran = false;
47+
SecondJob::$ran = false;
48+
}
49+
50+
public function testRunningOneJob()
51+
{
52+
Queue::connection('database')->push(new FirstJob);
53+
Queue::connection('database')->push(new SecondJob);
54+
55+
$this->artisan('queue:work', [
56+
'connection' => 'database',
57+
'--once' => true,
58+
'--memory' => 1024,
59+
])->assertExitCode(0);
60+
61+
$this->assertSame(1, Queue::connection('database')->size());
62+
$this->assertTrue(FirstJob::$ran);
63+
$this->assertFalse(SecondJob::$ran);
64+
}
65+
66+
public function testDaemon()
67+
{
68+
Queue::connection('database')->push(new FirstJob);
69+
Queue::connection('database')->push(new SecondJob);
70+
71+
$this->artisan('queue:work', [
72+
'connection' => 'database',
73+
'--daemon' => true,
74+
'--stop-when-empty' => true,
75+
'--memory' => 1024,
76+
])->assertExitCode(0);
77+
78+
$this->assertSame(0, Queue::connection('database')->size());
79+
$this->assertTrue(FirstJob::$ran);
80+
$this->assertTrue(SecondJob::$ran);
81+
}
82+
83+
public function testMemoryExceeded()
84+
{
85+
Queue::connection('database')->push(new FirstJob);
86+
Queue::connection('database')->push(new SecondJob);
87+
88+
$this->artisan('queue:work', [
89+
'connection' => 'database',
90+
'--daemon' => true,
91+
'--stop-when-empty' => true,
92+
'--memory' => 0.1,
93+
])->assertExitCode(12);
94+
95+
// Memory limit isn't checked until after the first job is attempted.
96+
$this->assertSame(1, Queue::connection('database')->size());
97+
$this->assertTrue(FirstJob::$ran);
98+
$this->assertFalse(SecondJob::$ran);
99+
}
100+
}
101+
102+
class FirstJob implements ShouldQueue
103+
{
104+
use Dispatchable, Queueable;
105+
106+
public static $ran = false;
107+
108+
public function handle()
109+
{
110+
static::$ran = true;
111+
}
112+
}
113+
114+
class SecondJob implements ShouldQueue
115+
{
116+
use Dispatchable, Queueable;
117+
118+
public static $ran = false;
119+
120+
public function handle()
121+
{
122+
static::$ran = true;
123+
}
124+
}

tests/Queue/QueueWorkerTest.php

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,21 +59,36 @@ public function testWorkerCanWorkUntilQueueIsEmpty()
5959
$secondJob = new WorkerFakeJob,
6060
]]);
6161

62-
try {
63-
$worker->daemon('default', 'queue', $workerOptions);
62+
$status = $worker->daemon('default', 'queue', $workerOptions);
6463

65-
$this->fail('Expected LoopBreakerException to be thrown.');
66-
} catch (LoopBreakerException $e) {
67-
$this->assertTrue($firstJob->fired);
64+
$this->assertTrue($secondJob->fired);
6865

69-
$this->assertTrue($secondJob->fired);
66+
$this->assertSame(0, $status);
7067

71-
$this->assertSame(0, $worker->stoppedWithStatus);
68+
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->twice();
7269

73-
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->twice();
70+
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->twice();
71+
}
7472

75-
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->twice();
76-
}
73+
public function testWorkerStopsWhenMemoryExceeded()
74+
{
75+
$workerOptions = new WorkerOptions;
76+
77+
$worker = $this->getWorker('default', ['queue' => [
78+
$firstJob = new WorkerFakeJob,
79+
$secondJob = new WorkerFakeJob,
80+
]]);
81+
$worker->stopOnMemoryExceeded = true;
82+
83+
$status = $worker->daemon('default', 'queue', $workerOptions);
84+
85+
$this->assertTrue($firstJob->fired);
86+
$this->assertFalse($secondJob->fired);
87+
$this->assertSame(12, $status);
88+
89+
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->once();
90+
91+
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->once();
7792
}
7893

7994
public function testJobCanBeFiredBasedOnPriority()
@@ -393,9 +408,7 @@ public function sleep($seconds)
393408

394409
public function stop($status = 0)
395410
{
396-
$this->stoppedWithStatus = $status;
397-
398-
throw new LoopBreakerException;
411+
return $status;
399412
}
400413

401414
public function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)

0 commit comments

Comments
 (0)