Skip to content

Commit 6cda08d

Browse files
committed
#218 Remove item from queue as well when args match
1 parent 226ec33 commit 6cda08d

File tree

1 file changed

+32
-31
lines changed

1 file changed

+32
-31
lines changed

lib/Resque.php

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -253,41 +253,42 @@ public static function queues()
253253
*/
254254
private static function removeItems($queue, $items = Array())
255255
{
256-
$counter = 0;
257-
$originalQueue = 'queue:'. $queue;
258-
$tempQueue = $originalQueue. ':temp:'. time();
259-
$requeueQueue = $tempQueue. ':requeue';
260-
261-
// move each item from original queue to temp queue and process it
262-
$finished = false;
263-
while(!$finished) {
264-
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
265-
266-
if(!empty($string)) {
267-
if(self::matchItem($string, $items)) {
268-
$counter++;
269-
} else {
270-
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
271-
}
272-
} else {
273-
$finished = true;
256+
$counter = 0;
257+
$originalQueue = 'queue:'. $queue;
258+
$tempQueue = $originalQueue. ':temp:'. time();
259+
$requeueQueue = $tempQueue. ':requeue';
260+
261+
// move each item from original queue to temp queue and process it
262+
$finished = false;
263+
while (!$finished) {
264+
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);
265+
266+
if (!empty($string)) {
267+
if(self::matchItem($string, $items)) {
268+
self::redis()->rpop($tempQueue);
269+
$counter++;
270+
} else {
271+
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
272+
}
273+
} else {
274+
$finished = true;
275+
}
274276
}
275-
}
276277

277-
// move back from temp queue to original queue
278-
$finished = false;
279-
while(!$finished) {
280-
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
281-
if (empty($string)) {
282-
$finished = true;
278+
// move back from temp queue to original queue
279+
$finished = false;
280+
while (!$finished) {
281+
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
282+
if (empty($string)) {
283+
$finished = true;
284+
}
283285
}
284-
}
285-
286-
// remove temp queue and requeue queue
287-
self::redis()->del($requeueQueue);
288-
self::redis()->del($tempQueue);
289286

290-
return $counter;
287+
// remove temp queue and requeue queue
288+
self::redis()->del($requeueQueue);
289+
self::redis()->del($tempQueue);
290+
291+
return $counter;
291292
}
292293

293294
/**

0 commit comments

Comments
 (0)