Skip to content

Commit 6836c76

Browse files
committed
refactoring to JobStrategy, adding FastCGI option, fixing unit test issues
Merging changes from chrisboulton#81
1 parent 50b987c commit 6836c76

File tree

12 files changed

+467
-69
lines changed

12 files changed

+467
-69
lines changed

README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,14 @@ use a custom prefix to separate the Resque data:
293293
$ PREFIX=my-app-name bin/resque
294294
```
295295

296+
### Job Srategys ###
297+
298+
Php-resque implements multiple ways to seperate the worker process
299+
from the job process to improce resilience. Supported platforms
300+
default to the fork strategy, falling back to in-process execution.
301+
Specific strategys can be chosen by supplyingthe `JOB_STRATEGY`
302+
environment variable.
303+
296304
### Forking ###
297305

298306
Similarly to the Ruby versions, supported platforms will immediately
@@ -303,6 +311,29 @@ The difference with php-resque is that if a forked child does not
303311
exit nicely (PHP error or such), php-resque will automatically fail
304312
the job.
305313

314+
$ JOB_STRATEGY=fork php resque.php
315+
316+
#### Fastcgi ####
317+
318+
The fastcgi strategy executes jobs over a fastcgi connection to php-fpm.
319+
It may offer a lower overhead per job in environments with lots of very short
320+
jobs.
321+
322+
$ JOB_STRATEGY=fastcgi php resque.php
323+
324+
Fastcgi accepts two additional parameters. `FASTCGI_LOCATION` sets the
325+
location of the php-fpm server. This can either be a host:port combination
326+
or a path to a unix socket. `FASTCGI_SCRIPT` sets the path to the script used
327+
to receive and run the job in the php-fpm process.
328+
329+
#### In Process ####
330+
331+
For cases when the other two strategys are not available the in-process
332+
strategy will run jobs in the same process as the worker. This is not
333+
recommended as failures in the job may turn into failures in the worker.
334+
335+
$ JOB_STRATEGY=inprocess php resque.php
336+
306337
### Signals ###
307338

308339
Signals also work on supported platforms exactly as in the Ruby

bin/resque

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
<?php
33

44
// Find and initialize Composer
5+
use Resque\JobStrategy\Fastcgi;
6+
use Resque\JobStrategy\Fork;
7+
use Resque\JobStrategy\InProcess;
58
use Resque\Redis;
69
use Resque\Resque;
710
use Resque\Log;
@@ -13,8 +16,6 @@ $files = array(
1316
__DIR__ . '/../../../../autoload.php',
1417
__DIR__ . '/../vendor/autoload.php',
1518
);
16-
17-
$found = false;
1819
foreach ($files as $file) {
1920
if (file_exists($file)) {
2021
require_once $file;
@@ -31,7 +32,7 @@ if (!class_exists('Composer\Autoload\ClassLoader', false)) {
3132
}
3233

3334
$QUEUE = getenv('QUEUE');
34-
if(empty($QUEUE)) {
35+
if (empty($QUEUE)) {
3536
die("Set QUEUE env var containing the list of queues to work.\n");
3637
}
3738

@@ -45,7 +46,7 @@ $REDIS_BACKEND = getenv('REDIS_BACKEND');
4546

4647
// A redis database number
4748
$REDIS_BACKEND_DB = getenv('REDIS_BACKEND_DB');
48-
if(!empty($REDIS_BACKEND)) {
49+
if (!empty($REDIS_BACKEND)) {
4950
if (empty($REDIS_BACKEND_DB))
5051
Resque::setBackend($REDIS_BACKEND);
5152
else
@@ -56,15 +57,14 @@ $logLevel = false;
5657
$LOGGING = getenv('LOGGING');
5758
$VERBOSE = getenv('VERBOSE');
5859
$VVERBOSE = getenv('VVERBOSE');
59-
if(!empty($LOGGING) || !empty($VERBOSE)) {
60+
if (!empty($LOGGING) || !empty($VERBOSE)) {
6061
$logLevel = true;
61-
}
62-
else if(!empty($VVERBOSE)) {
62+
} elseif (!empty($VVERBOSE)) {
6363
$logLevel = true;
6464
}
6565

6666
$APP_INCLUDE = getenv('APP_INCLUDE');
67-
if($APP_INCLUDE) {
67+
if ($APP_INCLUDE) {
6868
if(!file_exists($APP_INCLUDE)) {
6969
die('APP_INCLUDE ('.$APP_INCLUDE.") does not exist.\n");
7070
}
@@ -82,34 +82,72 @@ $BLOCKING = getenv('BLOCKING') !== FALSE;
8282

8383
$interval = 5;
8484
$INTERVAL = getenv('INTERVAL');
85-
if(!empty($INTERVAL)) {
85+
if (!empty($INTERVAL)) {
8686
$interval = $INTERVAL;
8787
}
8888

8989
$count = 1;
9090
$COUNT = getenv('COUNT');
91-
if(!empty($COUNT) && $COUNT > 1) {
91+
if (!empty($COUNT) && $COUNT > 1) {
9292
$count = $COUNT;
9393
}
9494

9595
$PREFIX = getenv('PREFIX');
96-
if(!empty($PREFIX)) {
96+
if (!empty($PREFIX)) {
9797
$logger->log(Psr\Log\LogLevel::INFO, 'Prefix set to {prefix}', array('prefix' => $PREFIX));
9898
Redis::prefix($PREFIX);
9999
}
100100

101-
if($count > 1) {
102-
for($i = 0; $i < $count; ++$i) {
101+
$jobStrategy = null;
102+
$JOB_STRATEGY = getenv('JOB_STRATEGY');
103+
switch ($JOB_STRATEGY) {
104+
case 'inprocess':
105+
$jobStrategy = new InProcess;
106+
break;
107+
case 'fork':
108+
$jobStrategy = new Fork;
109+
break;
110+
case 'fastcgi':
111+
$fastcgiLocation = '127.0.0.1:9000';
112+
$FASTCGI_LOCATION = getenv('FASTCGI_LOCATION');
113+
if (!empty($FASTCGI_LOCATION)) {
114+
$fastcgiLocation = $FASTCGI_LOCATION;
115+
}
116+
117+
$fastcgiScript = __DIR__.'/../extras/fastcgi_worker.php';
118+
$FASTCGI_SCRIPT = getenv('FASTCGI_SCRIPT');
119+
if (!empty($FASTCGI_SCRIPT)) {
120+
$fastcgiScript = $FASTCGI_SCRIPT;
121+
}
122+
123+
require_once __DIR__.'/../lib/JobStrategy/Fastcgi.php';
124+
$jobStrategy = new Fastcgi(
125+
$fastcgiLocation,
126+
$fastcgiScript,
127+
array(
128+
'APP_INCLUDE' => $APP_INCLUDE,
129+
'REDIS_BACKEND' => $REDIS_BACKEND,
130+
)
131+
);
132+
break;
133+
}
134+
135+
136+
if ($count > 1) {
137+
for ($i = 0; $i < $count; ++$i) {
103138
$pid = Resque::fork();
104-
if($pid == -1) {
139+
if ($pid == -1) {
105140
$logger->log(Psr\Log\LogLevel::EMERGENCY, 'Could not fork worker {count}', array('count' => $i));
106141
die();
107142
}
108143
// Child, start the worker
109-
else if(!$pid) {
144+
elseif (!$pid) {
110145
$queues = explode(',', $QUEUE);
111146
$worker = new Worker($queues);
112147
$worker->setLogger($logger);
148+
if ($jobStrategy) {
149+
$worker->setJobStrategy($jobStrategy);
150+
}
113151
$logger->log(Psr\Log\LogLevel::NOTICE, 'Starting worker {worker}', array('worker' => $worker));
114152
$worker->work($interval, $BLOCKING);
115153
break;
@@ -121,6 +159,9 @@ else {
121159
$queues = explode(',', $QUEUE);
122160
$worker = new Worker($queues);
123161
$worker->setLogger($logger);
162+
if ($jobStrategy) {
163+
$worker->setJobStrategy($jobStrategy);
164+
}
124165

125166
$PIDFILE = getenv('PIDFILE');
126167
if ($PIDFILE) {

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
},
3030
"suggest": {
3131
"ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.",
32-
"ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available."
32+
"ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available.",
33+
"ebernhardson/fastcgi": "Allows php-resque to execute jobs via php-fpm."
3334
},
3435
"require-dev": {
3536
"phpunit/phpunit": "3.7.*"

extras/fastcgi_worker.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
use Resque\Resque;
4+
5+
if (!isset($_SERVER['RESQUE_JOB'])) {
6+
header('Status: 500 No Job');
7+
return;
8+
}
9+
10+
// Look for parent project's Composer autoloader
11+
$path = __DIR__.'/../../../vendor/autoload.php';
12+
if (!file_exists($path)) {
13+
// Fallback to this project's autoloader
14+
$path = __DIR__.'/../vendor/autoload.php';
15+
}
16+
// Die if Composer hasn't been run yet
17+
require_once $path;
18+
19+
if (isset($_SERVER['REDIS_BACKEND'])) {
20+
Resque::setBackend($_SERVER['REDIS_BACKEND']);
21+
}
22+
23+
try {
24+
if (isset($_SERVER['APP_INCLUDE'])) {
25+
require_once $_SERVER['APP_INCLUDE'];
26+
}
27+
28+
$job = unserialize(urldecode($_SERVER['RESQUE_JOB']));
29+
$job->worker->perform($job);
30+
} catch (\Exception $e) {
31+
if (isset($job)) {
32+
$job->fail($e);
33+
} else {
34+
header('Status: 500');
35+
}
36+
}

lib/Failure.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
<?php
22
namespace Resque;
33

4-
use Exception;
5-
64
/**
75
* Failed Resque job.
86
*
@@ -25,7 +23,7 @@ class Failure
2523
* @param \Resque\Worker $worker Instance of Resque_Worker that was running this job when it failed.
2624
* @param string $queue The name of the queue that this job was fetched from.
2725
*/
28-
public static function create($payload, Exception $exception, Worker $worker, $queue)
26+
public static function create($payload, \Exception $exception, Worker $worker, $queue)
2927
{
3028
$backend = self::getBackend();
3129
new $backend($payload, $exception, $worker, $queue);
@@ -39,7 +37,7 @@ public static function create($payload, Exception $exception, Worker $worker, $q
3937
public static function getBackend()
4038
{
4139
if (self::$backend === null) {
42-
self::$backend = 'Resque\Failure\Resque_Failure_Redis';
40+
self::$backend = 'Resque\Failure\Redis';
4341
}
4442

4543
return self::$backend;
File renamed without changes.

lib/JobStrategy/Fastcgi.php

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
<?php
2+
namespace Resque\JobStrategy;
3+
4+
use Psr\Log\LogLevel;
5+
use Resque\Worker;
6+
use Resque\Job;
7+
use EBernhardson\FastCGI\Client;
8+
use EBernhardson\FastCGI\CommunicationException;
9+
10+
/**
11+
* @package Resque/JobStrategy
12+
* @author Erik Bernhardson <[email protected]>
13+
* @license http://www.opensource.org/licenses/mit-license.php
14+
*/
15+
class Fastcgi implements StrategyInterface
16+
{
17+
/**
18+
* @var bool True when waiting for a response from FastCGI server
19+
*/
20+
private $waiting = false;
21+
22+
/**
23+
* @var array Default environment for FastCGI requests
24+
*/
25+
protected $requestData = array(
26+
'GATEWAY_INTERFACE' => 'FastCGI/1.0',
27+
'REQUEST_METHOD' => 'GET',
28+
'SERVER_SOFTWARE' => 'php-resque-fastcgi/1.3-dev',
29+
'REMOTE_ADDR' => '127.0.0.1',
30+
'REMOTE_PORT' => 8888,
31+
'SERVER_ADDR' => '127.0.0.1',
32+
'SERVER_PORT' => 8888,
33+
'SERVER_PROTOCOL' => 'HTTP/1.1'
34+
);
35+
36+
/** @var string */
37+
private $location;
38+
/** @var Client */
39+
private $fcgi;
40+
/** @var Worker */
41+
private $worker;
42+
43+
/**
44+
* @param string $location When the location contains a `:` it will be considered a host/port pair
45+
* otherwise a unix socket path
46+
* @param string $script Absolute path to the script that will load resque and perform the job
47+
* @param array $environment Additional environment variables available in $_SERVER to the FastCGI script
48+
*/
49+
public function __construct($location, $script, $environment = array())
50+
{
51+
$this->location = $location;
52+
53+
$port = false;
54+
if (false !== strpos($location, ':')) {
55+
list($location, $port) = explode(':', $location, 2);
56+
}
57+
58+
$this->fcgi = new Client($location, $port);
59+
$this->fcgi->setKeepAlive(true);
60+
61+
$this->requestData = $environment + $this->requestData + array(
62+
'SCRIPT_FILENAME' => $script,
63+
'SERVER_NAME' => php_uname('n'),
64+
'RESQUE_DIR' => __DIR__.'/../../../',
65+
);
66+
}
67+
68+
/**
69+
* @param Worker $worker
70+
*/
71+
public function setWorker(Worker $worker)
72+
{
73+
$this->worker = $worker;
74+
}
75+
76+
/**
77+
* Executes the provided job over a FastCGI connection
78+
*
79+
* @param Job $job
80+
*/
81+
public function perform(Job $job)
82+
{
83+
$status = 'Requested fcgi job execution from ' . $this->location . ' at ' . strftime('%F %T');
84+
$this->worker->updateProcLine($status);
85+
$this->worker->logger->log(LogLevel::INFO, $status);
86+
87+
$this->waiting = true;
88+
89+
try {
90+
$this->fcgi->request(array(
91+
'RESQUE_JOB' => urlencode(serialize($job)),
92+
) + $this->requestData, '');
93+
94+
$response = $this->fcgi->response();
95+
$this->waiting = false;
96+
} catch (CommunicationException $e) {
97+
$this->waiting = false;
98+
$job->fail($e);
99+
return;
100+
}
101+
102+
if ($response['statusCode'] !== 200) {
103+
$job->fail(new \Exception(sprintf(
104+
'FastCGI job returned non-200 status code: %s Stdout: %s Stderr: %s',
105+
$response['headers']['status'],
106+
$response['body'],
107+
$response['stderr']
108+
)));
109+
}
110+
}
111+
112+
/**
113+
* Shutdown the worker process.
114+
*/
115+
public function shutdown()
116+
{
117+
if ($this->waiting === false) {
118+
$this->worker->logger->log(LogLevel::INFO, 'No child to kill.');
119+
} else {
120+
$this->worker->logger->log(LogLevel::INFO, 'Closing fcgi connection with job in progress.');
121+
}
122+
$this->fcgi->close();
123+
}
124+
}

0 commit comments

Comments
 (0)