Skip to content

Commit 4d55528

Browse files
committed
Basic support for blocking list pop
1 parent 37cdec8 commit 4d55528

File tree

4 files changed

+37
-28
lines changed

4 files changed

+37
-28
lines changed

lib/Resque.php

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ class Resque
1010
{
1111
const VERSION = '1.2';
1212

13+
const DEFAULT_INTERVAL = 5;
14+
1315
/**
1416
* @var Resque_Redis Instance of Resque_Redis that talks to redis.
1517
*/
@@ -112,14 +114,19 @@ public static function push($queue, $item)
112114
* @param string $queue The name of the queue to fetch an item from.
113115
* @return array Decoded item from the queue.
114116
*/
115-
public static function pop($queue)
117+
public static function pop($queue, $interval = null)
116118
{
117-
$item = self::redis()->lpop('queue:' . $queue);
119+
if($interval == null) {
120+
$item = self::redis()->lpop('queue:' . $queue);
121+
} else {
122+
$item = self::redis()->blpop('queue:' . $queue, $interval ?: Resque::DEFAULT_INTERVAL);
123+
}
124+
118125
if(!$item) {
119126
return;
120127
}
121128

122-
return json_decode($item, true);
129+
return json_decode($interval == 0 ? $item : $item[1], true);
123130
}
124131

125132
/**
@@ -164,8 +171,9 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals
164171
* @param string $queue Queue to fetch next available job from.
165172
* @return Resque_Job Instance of Resque_Job to be processed, false if none or error.
166173
*/
167-
public static function reserve($queue)
174+
public static function reserve($queue, $interval = null)
168175
{
176+
require_once dirname(__FILE__) . '/Resque/Job.php';
169177
return Resque_Job::reserve($queue);
170178
}
171179

lib/Resque/Job.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ public static function create($queue, $class, $args = null, $monitor = false)
8080
* @param string $queue The name of the queue to check for a job in.
8181
* @return null|object Null when there aren't any waiting jobs, instance of Resque_Job when a job was found.
8282
*/
83-
public static function reserve($queue)
83+
public static function reserve($queue, $interval = null)
8484
{
85-
$payload = Resque::pop($queue);
85+
$payload = Resque::pop($queue, $interval);
8686
if(!is_array($payload)) {
8787
return false;
8888
}
@@ -153,7 +153,7 @@ public function getInstance()
153153
);
154154
}
155155

156-
$this->instance = new $this->payload['class']();
156+
$this->instance = new $this->payload['class'];
157157
$this->instance->job = $this;
158158
$this->instance->args = $this->getArguments();
159159
$this->instance->queue = $this->queue;

lib/Resque/Redis.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class Resque_Redis
4747
'lset',
4848
'lrem',
4949
'lpop',
50+
'blpop',
5051
'rpop',
5152
'sadd',
5253
'srem',

lib/Resque/Worker.php

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public function __construct($queues)
147147
*
148148
* @param int $interval How often to check for new jobs across the queues.
149149
*/
150-
public function work($interval = 5)
150+
public function work($interval = Resque::DEFAULT_INTERVAL)
151151
{
152152
$this->updateProcLine('Starting');
153153
$this->startup();
@@ -160,25 +160,25 @@ public function work($interval = 5)
160160
// Attempt to find and reserve a job
161161
$job = false;
162162
if(!$this->paused) {
163-
$job = $this->reserve();
163+
$job = $this->reserve($interval);
164164
}
165165

166-
if(!$job) {
167-
// For an interval of 0, break now - helps with unit testing etc
168-
if($interval == 0) {
169-
break;
170-
}
171-
// If no job was found, we sleep for $interval before continuing and checking again
172-
$this->log('Sleeping for ' . $interval, self::LOG_VERBOSE);
173-
if($this->paused) {
174-
$this->updateProcLine('Paused');
175-
}
176-
else {
177-
$this->updateProcLine('Waiting for ' . implode(',', $this->queues));
178-
}
179-
usleep($interval * 1000000);
180-
continue;
181-
}
166+
$this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval);
167+
168+
if(!$job) {
169+
// For an interval of 0, break now - helps with unit testing etc
170+
if($interval == 0) {
171+
break;
172+
}
173+
174+
// If no job was found, we sleep for $interval before continuing and checking again
175+
if($this->paused) {
176+
$this->updateProcLine('Paused');
177+
usleep($interval * 1000000); //it's paused, so don't hog redis with requests.
178+
}
179+
180+
continue;
181+
}
182182

183183
$this->log('got ' . $job);
184184
Resque_Event::trigger('beforeFork', $job);
@@ -246,15 +246,15 @@ public function perform(Resque_Job $job)
246246
*
247247
* @return object|boolean Instance of Resque_Job if a job is found, false if not.
248248
*/
249-
public function reserve()
249+
public function reserve($interval = null)
250250
{
251251
$queues = $this->queues();
252252
if(!is_array($queues)) {
253253
return;
254254
}
255255
foreach($queues as $queue) {
256-
$this->log('Checking ' . $queue, self::LOG_VERBOSE);
257-
$job = Resque_Job::reserve($queue);
256+
$this->log('Checking ' . $queue . ' with interval ' . $interval, self::LOG_VERBOSE);
257+
$job = Resque_Job::reserve($queue, $interval);
258258
if($job) {
259259
$this->log('Found job on ' . $queue, self::LOG_VERBOSE);
260260
return $job;

0 commit comments

Comments
 (0)