Skip to content

Commit 1d24d00

Browse files
committed
Merge pull request #223 from theaxel/master
Fix dequeueing with args
2 parents c335bc3 + 6cda08d commit 1d24d00

File tree

2 files changed

+55
-31
lines changed

2 files changed

+55
-31
lines changed

lib/Resque.php

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -253,41 +253,42 @@ public static function queues()
253253
*/
254254
private static function removeItems($queue, $items = Array())
255255
{
256-
$counter = 0;
257-
$originalQueue = 'queue:'. $queue;
258-
$tempQueue = $originalQueue. ':temp:'. time();
259-
$requeueQueue = $tempQueue. ':requeue';
260-
261-
// move each item from original queue to temp queue and process it
262-
$finished = false;
263-
while(!$finished) {
264-
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
265-
266-
if(!empty($string)) {
267-
if(self::matchItem($string, $items)) {
268-
$counter++;
269-
} else {
270-
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
271-
}
272-
} else {
273-
$finished = true;
256+
$counter = 0;
257+
$originalQueue = 'queue:'. $queue;
258+
$tempQueue = $originalQueue. ':temp:'. time();
259+
$requeueQueue = $tempQueue. ':requeue';
260+
261+
// move each item from original queue to temp queue and process it
262+
$finished = false;
263+
while (!$finished) {
264+
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
265+
266+
if (!empty($string)) {
267+
if(self::matchItem($string, $items)) {
268+
self::redis()->rpop($tempQueue);
269+
$counter++;
270+
} else {
271+
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
272+
}
273+
} else {
274+
$finished = true;
275+
}
274276
}
275-
}
276277

277-
// move back from temp queue to original queue
278-
$finished = false;
279-
while(!$finished) {
280-
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
281-
if (empty($string)) {
282-
$finished = true;
278+
// move back from temp queue to original queue
279+
$finished = false;
280+
while (!$finished) {
281+
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
282+
if (empty($string)) {
283+
$finished = true;
284+
}
283285
}
284-
}
285-
286-
// remove temp queue and requeue queue
287-
self::redis()->del($requeueQueue);
288-
self::redis()->del($tempQueue);
289286

290-
return $counter;
287+
// remove temp queue and requeue queue
288+
self::redis()->del($requeueQueue);
289+
self::redis()->del($tempQueue);
290+
291+
return $counter;
291292
}
292293

293294
/**

test/Resque/Tests/JobTest.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,29 @@ public function testDequeueItemWithArg()
298298
$this->assertEquals(Resque::dequeue($queue, $test), 1);
299299
#$this->assertEquals(Resque::size($queue), 1);
300300
}
301+
302+
public function testDequeueSeveralItemsWithArgs()
303+
{
304+
// GIVEN
305+
$queue = 'jobs';
306+
$args = array('foo' => 1, 'bar' => 10);
307+
$removeArgs = array('foo' => 1, 'bar' => 2);
308+
Resque::enqueue($queue, 'Test_Job_Dequeue9', $args);
309+
Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs);
310+
Resque::enqueue($queue, 'Test_Job_Dequeue9', $removeArgs);
311+
$this->assertEquals(Resque::size($queue), 3);
312+
313+
// WHEN
314+
$test = array('Test_Job_Dequeue9' => $removeArgs);
315+
$removedItems = Resque::dequeue($queue, $test);
316+
317+
// THEN
318+
$this->assertEquals($removedItems, 2);
319+
$this->assertEquals(Resque::size($queue), 1);
320+
$item = Resque::pop($queue);
321+
$this->assertInternalType('array', $item['args']);
322+
$this->assertEquals(10, $item['args'][0]['bar'], 'Wrong items were dequeued from queue!');
323+
}
301324

302325
public function testDequeueItemWithUnorderedArg()
303326
{

0 commit comments

Comments
 (0)