Skip to content

Commit 41bf8c1

Browse files
committed
Basic support for blocking list pop
1 parent 64cd187 commit 41bf8c1

File tree

5 files changed

+38
-29
lines changed

5 files changed

+38
-29
lines changed

lib/Resque.php

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ class Resque
1414
{
1515
const VERSION = '1.0';
1616

17+
const DEFAULT_INTERVAL = 5;
18+
1719
/**
1820
* @var Resque_Redis Instance of Resque_Redis that talks to redis.
1921
*/
@@ -115,14 +117,19 @@ public static function push($queue, $item)
115117
* @param string $queue The name of the queue to fetch an item from.
116118
* @return array Decoded item from the queue.
117119
*/
118-
public static function pop($queue)
120+
public static function pop($queue, $interval = null)
119121
{
120-
$item = self::redis()->lpop('queue:' . $queue);
122+
if($interval == null) {
123+
$item = self::redis()->lpop('queue:' . $queue);
124+
} else {
125+
$item = self::redis()->blpop('queue:' . $queue, $interval ?: Resque::DEFAULT_INTERVAL);
126+
}
127+
121128
if(!$item) {
122129
return;
123130
}
124131

125-
return json_decode($item, true);
132+
return json_decode($interval == 0 ? $item : $item[1], true);
126133
}
127134

128135
/**
@@ -168,10 +175,10 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals
168175
* @param string $queue Queue to fetch next available job from.
169176
* @return Resque_Job Instance of Resque_Job to be processed, false if none or error.
170177
*/
171-
public static function reserve($queue)
178+
public static function reserve($queue, $interval = null)
172179
{
173180
require_once dirname(__FILE__) . '/Resque/Job.php';
174-
return Resque_Job::reserve($queue);
181+
return Resque_Job::reserve($queue, $interval);
175182
}
176183

177184
/**

lib/Resque/Job.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ public static function create($queue, $class, $args = null, $monitor = false)
8383
* @param string $queue The name of the queue to check for a job in.
8484
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
8585
*/
86-
public static function reserve($queue)
86+
public static function reserve($queue, $interval = null)
8787
{
88-
$payload = Resque::pop($queue);
88+
$payload = Resque::pop($queue, $interval);
8989
if(!is_array($payload)) {
9090
return false;
9191
}
@@ -156,7 +156,7 @@ public function getInstance()
156156
);
157157
}
158158

159-
$this->instance = new $this->payload['class']();
159+
$this->instance = new $this->payload['class'];
160160
$this->instance->job = $this;
161161
$this->instance->args = $this->getArguments();
162162
$this->instance->queue = $this->queue;

lib/Resque/Redis.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class Resque_Redis extends Redisent
5050
'lset',
5151
'lrem',
5252
'lpop',
53+
'blpop',
5354
'rpop',
5455
'sadd',
5556
'srem',

lib/Resque/RedisCluster.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class Resque_RedisCluster extends RedisentCluster
5050
'lset',
5151
'lrem',
5252
'lpop',
53+
'blpop',
5354
'rpop',
5455
'sadd',
5556
'srem',

lib/Resque/Worker.php

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public function __construct($queues)
153153
*
154154
* @param int $interval How often to check for new jobs across the queues.
155155
*/
156-
public function work($interval = 5)
156+
public function work($interval = Resque::DEFAULT_INTERVAL)
157157
{
158158
$this->updateProcLine('Starting');
159159
$this->startup();
@@ -166,25 +166,25 @@ public function work($interval = 5)
166166
// Attempt to find and reserve a job
167167
$job = false;
168168
if(!$this->paused) {
169-
$job = $this->reserve();
169+
$job = $this->reserve($interval);
170170
}
171171

172-
if(!$job) {
173-
// For an interval of 0, break now - helps with unit testing etc
174-
if($interval == 0) {
175-
break;
176-
}
177-
// If no job was found, we sleep for $interval before continuing and checking again
178-
$this->log('Sleeping for ' . $interval, true);
179-
if($this->paused) {
180-
$this->updateProcLine('Paused');
181-
}
182-
else {
183-
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
184-
}
185-
usleep($interval * 1000000);
186-
continue;
187-
}
172+
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
173+
174+
if(!$job) {
175+
// For an interval of 0, break now - helps with unit testing etc
176+
if($interval == 0) {
177+
break;
178+
}
179+
180+
// If no job was found, we sleep for $interval before continuing and checking again
181+
if($this->paused) {
182+
$this->updateProcLine('Paused');
183+
usleep($interval * 1000000); //it's paused, so don't hog redis with requests.
184+
}
185+
186+
continue;
187+
}
188188

189189
$this->log('got ' . $job);
190190
Resque_Event::trigger('beforeFork', $job);
@@ -252,15 +252,15 @@ public function perform(Resque_Job $job)
252252
*
253253
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
254254
*/
255-
public function reserve()
255+
public function reserve($interval = null)
256256
{
257257
$queues = $this->queues();
258258
if(!is_array($queues)) {
259259
return;
260260
}
261261
foreach($queues as $queue) {
262-
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
263-
$job = Resque_Job::reserve($queue);
262+
$this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE);
263+
$job = Resque_Job::reserve($queue, $interval);
264264
if($job) {
265265
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
266266
return $job;

0 commit comments

Comments
 (0)