Skip to content

Commit 48a7feb

Browse files
author
Chaitanya Kuber
committed
2 parents 3b59fa5 + e6464f4 commit 48a7feb

File tree

5 files changed

+70
-21
lines changed

5 files changed

+70
-21
lines changed

CHANGELOG.markdown

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44
* Use `require_once` when including php-resque after the app has been included in the sample resque.php to prevent include conflicts (andrewjshults)
55
* Wrap job arguments in an array to improve compatibility with ruby resque (warezthebeef)
66
* Fix a bug where the worker would spin out of control taking the server with it, if the redis connection was interrupted even briefly. Use SIGPIPE to trap this scenario cleanly. (d11wtq)
7+
* Added support of Redis prefix (namespaces) (hlegius)
8+
* When reserving jobs, check if the payload received from popping a queue is a valid object (fix bug whereby jobs are reserved based on an erroneous payload) (salimane)
9+
* Re-enable autoload for class_exists in Job.php (humancopy)
10+
* Fix lost jobs when there is more than one worker process started by the same parent process (salimane)
11+
* Move include for resque before APP_INCLUDE is loaded in, so that way resque is available for the app
12+
* Avoid working with dirty worker IDs (salimane)
13+
714

815
## 1.1 (2011-03-27) ##
916

lib/Resque.php

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,23 @@ class Resque
1919
*/
2020
public static $redis = null;
2121

22+
/**
23+
* @var mixed Host/port conbination separated by a colon, or a nested
24+
* array of server swith host/port pairs
25+
*/
26+
protected static $redisServer = null;
27+
28+
/**
29+
* @var int ID of Redis database to select.
30+
*/
31+
protected static $redisDatabase = 0;
32+
33+
/**
34+
* @var int PID of current process. Used to detect changes when forking
35+
* and implement "thread" safety to avoid race conditions.
36+
*/
37+
protected static $pid = null;
38+
2239
/**
2340
* Given a host/port combination separated by a colon, set it as
2441
* the redis server that Resque will talk to.
@@ -29,17 +46,9 @@ class Resque
2946
*/
3047
public static function setBackend($server, $database = 0)
3148
{
32-
if(is_array($server)) {
33-
require_once dirname(__FILE__) . '/Resque/RedisCluster.php';
34-
self::$redis = new Resque_RedisCluster($server);
35-
}
36-
else {
37-
list($host, $port) = explode(':', $server);
38-
require_once dirname(__FILE__) . '/Resque/Redis.php';
39-
self::$redis = new Resque_Redis($host, $port);
40-
}
41-
42-
self::redis()->select($database);
49+
self::$redisServer = $server;
50+
self::$redisDatabase = $database;
51+
self::$redis = null;
4352
}
4453

4554
/**
@@ -49,10 +58,40 @@ public static function setBackend($server, $database = 0)
4958
*/
5059
public static function redis()
5160
{
52-
if(is_null(self::$redis)) {
53-
self::setBackend('localhost:6379');
61+
// Detect when the PID of the current process has changed (from a fork, etc)
62+
// and force a reconnect to redis.
63+
$pid = getmypid();
64+
if (self::$pid !== $pid) {
65+
self::$redis = null;
66+
self::$pid = $pid;
67+
}
68+
69+
if(!is_null(self::$redis)) {
70+
return self::$redis;
71+
}
72+
73+
$server = self::$redisServer;
74+
if (empty($server)) {
75+
$server = 'localhost:6379';
76+
}
77+
78+
if(is_array($server)) {
79+
require_once dirname(__FILE__) . '/Resque/RedisCluster.php';
80+
self::$redis = new Resque_RedisCluster($server);
81+
}
82+
else {
83+
if (strpos($server, 'unix:') === false) {
84+
list($host, $port) = explode(':', $server);
85+
}
86+
else {
87+
$host = $server;
88+
$port = null;
89+
}
90+
require_once dirname(__FILE__) . '/Resque/Redis.php';
91+
self::$redis = new Resque_Redis($host, $port);
5492
}
5593

94+
self::$redis->select(self::$redisDatabase);
5695
return self::$redis;
5796
}
5897

lib/Resque/Redis.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class Resque_Redis extends Redisent
3939
'setnx',
4040
'incr',
4141
'incrby',
42-
'decrby',
42+
'decr',
4343
'decrby',
4444
'rpush',
4545
'lpush',

lib/Resque/Worker.php

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public static function exists($workerId)
9696
*/
9797
public static function find($workerId)
9898
{
99-
if(!self::exists($workerId)) {
99+
if(!self::exists($workerId) || false === strpos($workerId, ":")) {
100100
return false;
101101
}
102102

@@ -449,12 +449,14 @@ public function pruneDeadWorkers()
449449
$workerPids = $this->workerPids();
450450
$workers = self::all();
451451
foreach($workers as $worker) {
452-
list($host, $pid, $queues) = explode(':', (string)$worker, 3);
453-
if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
454-
continue;
455-
}
456-
$this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE);
457-
$worker->unregisterWorker();
452+
if (is_object($worker)) {
453+
list($host, $pid, $queues) = explode(':', (string)$worker, 3);
454+
if($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) {
455+
continue;
456+
}
457+
$this->log('Pruning dead worker: ' . (string)$worker, self::LOG_VERBOSE);
458+
$worker->unregisterWorker();
459+
}
458460
}
459461
}
460462

test/Resque/Tests/bootstrap.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
// Include Resque
2222
require_once RESQUE_LIB . 'Resque.php';
2323
require_once RESQUE_LIB . 'Resque/Worker.php';
24+
require_once RESQUE_LIB . 'Resque/Redis.php';
2425

2526
// Attempt to start our own redis instance for tesitng.
2627
exec('which redis-server', $output, $returnVar);

0 commit comments

Comments
 (0)