Skip to content

Commit dcbb8f2

Browse files
committed
Add QueueOrderReserver.
This reserver checks queues in the order they were specified. As long as jobs exist on higher priority queues, they will be reserved before moving to the next lowest priority queue. This implements the current default job reservation behaviour.
1 parent 701128e commit dcbb8f2

File tree

7 files changed

+271
-30
lines changed

7 files changed

+271
-30
lines changed

composer.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,10 @@
3737
"psr-0": {
3838
"Resque": "lib"
3939
}
40+
},
41+
"autoload-dev": {
42+
"psr-0": {
43+
"Resque": "test/"
44+
}
4045
}
4146
}

composer.lock

Lines changed: 33 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Resque\Reserver;
4+
5+
use Resque_Job;
6+
use Psr\Log\LoggerInterface;
7+
use Resque;
8+
9+
abstract class AbstractReserver implements ReserverInterface
10+
{
11+
/** @var array */
12+
private $queues;
13+
14+
/** @var LoggerInterface */
15+
protected $logger;
16+
17+
/**
18+
* @param LoggerInterface $logger
19+
* @param array $queues The queues to reserve from. If this contains '*', then the queues are retrieved dynamically
20+
* from redis on each call to reserve().
21+
*/
22+
public function __construct(LoggerInterface $logger, array $queues)
23+
{
24+
$this->logger = $logger;
25+
$this->queues = $queues;
26+
}
27+
28+
/**
29+
* {@inheritDoc}
30+
*/
31+
public function getQueues()
32+
{
33+
if (in_array('*', $this->queues)) {
34+
$queues = Resque::queues();
35+
sort($queues);
36+
return $queues;
37+
}
38+
39+
return $this->queues;
40+
}
41+
42+
/**
43+
* {@inheritDoc}
44+
*/
45+
public function waitAfterReservationAttempt()
46+
{
47+
return true;
48+
}
49+
50+
/**
51+
* {@inheritDoc}
52+
*/
53+
public function getName()
54+
{
55+
$name = get_class($this);
56+
$name = str_replace(__NAMESPACE__, '', $name);
57+
return trim($name, '\\');
58+
}
59+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
namespace Resque\Reserver;
4+
5+
use Resque_Job;
6+
7+
/**
8+
* QueueOrderReserver reserves jobs in the order that the queues is given. As long as jobs exist in a higher priority
9+
* queue, they will continue to be reserved before moving to the next lowest priority queue.
10+
*
11+
* For example: given queues A, B and C, all the jobs from queue A will be processed before moving onto queue B and
12+
* then after that queue C.
13+
*
14+
* This is the default reserver.
15+
*/
16+
class QueueOrderReserver extends AbstractReserver implements ReserverInterface
17+
{
18+
/**
19+
* {@inheritDoc}
20+
*/
21+
public function reserve()
22+
{
23+
foreach ($this->getQueues() as $queue) {
24+
$this->logger->debug("[{reserver}] Checking queue '{queue}' for jobs", array(
25+
'queue' => $queue,
26+
'reserver' => $this->getName(),
27+
));
28+
29+
$job = Resque_Job::reserve($queue);
30+
if ($job) {
31+
$this->logger->info("[{reserver}] Found job on queue '{queue}'", array(
32+
'queue' => $queue,
33+
'reserver' => $this->getName(),
34+
));
35+
return $job;
36+
}
37+
}
38+
39+
return null;
40+
}
41+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Resque\Tests\Reserver;
4+
5+
use Resque\Reserver\ReserverInterface;
6+
use Resque;
7+
8+
abstract class AbstractReserverTest extends \Resque_Tests_TestCase
9+
{
10+
/** @var string */
11+
protected $reserverName;
12+
13+
/**
14+
* Gets a reserver instance configured with the given queues.
15+
*
16+
* @param array $queues
17+
* @return ReserverInstance
18+
*/
19+
abstract protected function getReserver(array $queues = array());
20+
21+
public function testGetName()
22+
{
23+
$this->assertEquals($this->reserverName, $this->getReserver()->getName());
24+
}
25+
26+
public function testGetQueuesReturnsConfiguredQueues()
27+
{
28+
$queues = array(
29+
'queue_' . rand(1, 100),
30+
'queue_' . rand(101, 200),
31+
'queue_' . rand(201, 300),
32+
);
33+
$this->assertEquals($queues, $this->getReserver($queues)->getQueues());
34+
}
35+
36+
public function testGetQueuesWithAsterixQueueReturnsAllQueuesFromRedisInSortedOrder()
37+
{
38+
$queues = array(
39+
'queue_b',
40+
'queue_c',
41+
'queue_d',
42+
'queue_a',
43+
);
44+
45+
// register queues in redis
46+
foreach ($queues as $queue) {
47+
Resque::redis()->sadd('queues', $queue);
48+
}
49+
50+
$expected = array(
51+
'queue_a',
52+
'queue_b',
53+
'queue_c',
54+
'queue_d',
55+
);
56+
57+
$this->assertEquals($expected, $this->getReserver(array('*'))->getQueues());
58+
}
59+
}

0 commit comments

Comments
 (0)