Skip to content

Commit 1deee54

Browse files
committed
Improved Worker Reliability
Specifically, this improves the multi-worker forking process by having the parent process stick around and monitor the children. If a child process dies, and the supervisor has not been instructed to shut them all down, it will spawn a new child to replace it. It will pass signals down to all children. Currently, it only passes the signals explicitly watched by the Worker class. Children will be told that they have a parent process, and check periodically whether the supervisor is still running. This check is performed at the beginning of reserve() to ensure orphaned workers handle their condition properly. That is, if the supervisor is gone, the child will exit. This is intended to allow users to kill the supervisor and realistically expect that no new jobs will be processed until a new supervisor (or single-worker instance) is started. The other change, not related to worker-supervisor relationships, nevertheless still involves forking. Specifically, when a job process exits, the exit code is already being handled if it is greater than zero. However, if it is zero, the code prior to this commit assumed the exit was performed by PHP Resque itself. Usually, this is correct. The trouble comes when jobs exit without specifying an exit status - PHP defaults these to 0, for whatever reason. In these cases, the job status is not properly updated. Generally speaking, if you're writing jobs yourself - in their entirety - they will responsibly *not* use exit() at any point. But if your jobs use a framework, the chances of an unexpected exit() causing issues go up immensely. So, if the exit code is zero, and the job status hasn't been updated from queued or running, this commit's code causes the status to be updated to completed. This assumes that such jobs are properly written to only use exit status 0 for success, so some users might wish to change it to failed, but that's not best practice in the slightest, so I wouldn't encourage that. Signed-off-by: Daniel Hunsaker <[email protected]>
1 parent 968b7e6 commit 1deee54

File tree

2 files changed

+92
-19
lines changed

2 files changed

+92
-19
lines changed

bin/resque

Lines changed: 73 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -93,29 +93,83 @@ if(!empty($PREFIX)) {
9393
Resque_Redis::prefix($PREFIX);
9494
}
9595

96+
function cleanup_children($signal){
97+
$GLOBALS['send_signal'] = $signal;
98+
}
99+
96100
if($count > 1) {
97-
for($i = 0; $i < $count; ++$i) {
98-
$pid = Resque::fork();
99-
if($pid === false || $pid === -1) {
100-
$logger->log(Psr\Log\LogLevel::EMERGENCY, 'Could not fork worker {count}', array('count' => $i));
101-
die();
102-
}
103-
// Child, start the worker
104-
else if(!$pid) {
105-
$queues = explode(',', $QUEUE);
106-
$worker = new Resque_Worker($queues);
107-
$worker->setLogger($logger);
108-
$logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker));
109-
$worker->work($interval, $BLOCKING);
110-
break;
111-
}
112-
}
101+
$children = array();
102+
$GLOBALS['send_signal'] = FALSE;
103+
104+
$die_signals = array(SIGTERM, SIGINT, SIGQUIT);
105+
$all_signals = array_merge($die_signals, array(SIGUSR1, SIGUSR2, SIGCONT, SIGPIPE));
106+
107+
for($i = 0; $i < $count; ++$i) {
108+
$pid = Resque::fork();
109+
if($pid == -1) {
110+
die("Could not fork worker ".$i."\n");
111+
}
112+
// Child, start the worker
113+
elseif(!$pid) {
114+
$queues = explode(',', $QUEUE);
115+
$worker = new Resque_Worker($queues);
116+
$worker->logLevel = $logLevel;
117+
$worker->hasParent = TRUE;
118+
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
119+
$worker->work($interval);
120+
break;
121+
}
122+
else {
123+
$children[$pid] = 1;
124+
while (count($children) == $count){
125+
if (!isset($registered)) {
126+
declare(ticks = 1);
127+
foreach ($all_signals as $signal) {
128+
pcntl_signal($signal, "cleanup_children");
129+
}
130+
131+
$PIDFILE = getenv('PIDFILE');
132+
if ($PIDFILE) {
133+
file_put_contents($PIDFILE, getmypid()) or
134+
die('Could not write PID information to ' . $PIDFILE);
135+
}
136+
137+
$registered = TRUE;
138+
}
139+
140+
if(function_exists('setproctitle')) {
141+
setproctitle('resque-' . Resque::VERSION . ": Monitoring {$count} children: [".implode(',', array_keys($children))."]");
142+
}
143+
144+
$childPID = pcntl_waitpid(-1, $childStatus, WNOHANG);
145+
if ($childPID != 0) {
146+
fwrite(STDOUT, "\033[01;31m*** A child worker died: {$childPID}\033[00m\n");
147+
unset($children[$childPID]);
148+
$i--;
149+
}
150+
usleep(250000);
151+
if ($GLOBALS['send_signal'] !== FALSE){
152+
foreach ($children as $k => $v){
153+
posix_kill($k, $GLOBALS['send_signal']);
154+
if (in_array($GLOBALS['send_signal'], $die_signals)) {
155+
pcntl_waitpid($k, $childStatus);
156+
}
157+
}
158+
if (in_array($GLOBALS['send_signal'], $die_signals)) {
159+
exit;
160+
}
161+
$GLOBALS['send_signal'] = FALSE;
162+
}
163+
}
164+
}
165+
}
113166
}
114167
// Start a single worker
115168
else {
116-
$queues = explode(',', $QUEUE);
117-
$worker = new Resque_Worker($queues);
118-
$worker->setLogger($logger);
169+
$queues = explode(',', $QUEUE);
170+
$worker = new Resque_Worker($queues);
171+
$worker->logLevel = $logLevel;
172+
$worker->hasParent = FALSE;
119173

120174
$PIDFILE = getenv('PIDFILE');
121175
if ($PIDFILE) {

lib/Resque/Worker.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ class Resque_Worker
1616
*/
1717
public $logger;
1818

19+
/**
20+
* @var bool Whether this worker is running in a forked child process.
21+
*/
22+
public $hasParent = false;
23+
1924
/**
2025
* @var array Array of all associated queues for this worker.
2126
*/
@@ -219,6 +224,14 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
219224
'Job exited with exit code ' . $exitStatus
220225
));
221226
}
227+
else
228+
{
229+
if (in_array($job->getStatus(), array(Resque_Job_Status::STATUS_WAITING, Resque_Job_Status::STATUS_RUNNING)))
230+
{
231+
$job->updateStatus(Resque_Job_Status::STATUS_COMPLETE);
232+
$this->log('done ' . $job);
233+
}
234+
}
222235
}
223236

224237
$this->child = null;
@@ -256,6 +269,12 @@ public function perform(Resque_Job $job)
256269
*/
257270
public function reserve($blocking = false, $timeout = null)
258271
{
272+
if ($this->hasParent && !posix_kill(posix_getppid(), 0))
273+
{
274+
$this->shutdown();
275+
return false;
276+
}
277+
259278
$queues = $this->queues();
260279
if(!is_array($queues)) {
261280
return;

0 commit comments

Comments
 (0)