Skip to content

Commit 7807f9e

Browse files
committed
Add BlockingListPopReserver.
This behaves similarly to QueueOrderReserver, but uses the redis blpop command to do a blocking wait for a job to come onto the specified queues.
1 parent d324639 commit 7807f9e

File tree

2 files changed

+124
-0
lines changed

2 files changed

+124
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Resque\Reserver;
4+
5+
use Resque_Job;
6+
use Psr\Log\LoggerInterface;
7+
8+
/**
9+
* BlockingListPopReserver uses the blocking list pop command in redis (https://redis.io/commands/blpop) to wait for a
10+
* job to become available on any of the given queues.
11+
* This also behaves similarly to QueueOrderReserver in that the queues are checked in the order they are given.
12+
*
13+
* Environment variables:
14+
* - BLPOP_TIMEOUT: The maximum time in seconds that the bplop command should block while waiting for a job.
15+
* upon timeout, the worker will attempt to immediately reserve a job again. If zero is specified, the command will
16+
* block indefinitely. If not specified, the value of the INTERVAL variable will be used which defaults to 5 seconds.
17+
*/
18+
class BlockingListPopReserver extends AbstractReserver implements ReserverInterface
19+
{
20+
/** @var int */
21+
const DEFAULT_TIMEOUT = 5;
22+
23+
/**
24+
* @param LoggerInterface $logger
25+
* @param array $queues The queues to reserve from. If null, then the queues are retrieved dynamically from redis
26+
* on each call to reserve().
27+
* @param int $timeout The number of seconds to wait for a job to be enqueued. A timeout of zero will block
28+
* indefinitely.
29+
*/
30+
public function __construct(LoggerInterface $logger, array $queues, $timeout = self::DEFAULT_TIMEOUT)
31+
{
32+
$this->timeout = $timeout;
33+
parent::__construct($logger, $queues);
34+
}
35+
36+
/**
37+
* {@inheritDoc}
38+
*/
39+
public function reserve()
40+
{
41+
$job = Resque_Job::reserveBlocking($this->getQueues(), $this->timeout);
42+
if ($job) {
43+
$this->logger->info("[{reserver}] Found job on queue '{queue}'", array(
44+
'queue' => $job->queue,
45+
'reserver' => $this->getName(),
46+
));
47+
return $job;
48+
}
49+
return null;
50+
}
51+
52+
/**
53+
* {@inheritDoc}
54+
*/
55+
public function waitAfterReservationAttempt()
56+
{
57+
return false;
58+
}
59+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
namespace Resque\Tests\Reserver;
4+
5+
use Resque\Reserver\BlockingListPopReserver;
6+
use Resque;
7+
8+
/**
9+
* BlockingListPopReserver behaves the same as QueueOrderReserver but with different underlying implementation.
10+
*/
11+
class BlockingListPopReserverTest extends QueueOrderReserverTest
12+
{
13+
protected $reserverName = 'BlockingListPopReserver';
14+
15+
protected function getReserver(array $queues = array(), $timeout = 1)
16+
{
17+
return new BlockingListPopReserver(new \Resque_Log(), $queues, $timeout);
18+
}
19+
20+
public function testWaitAfterReservationAttemptReturnsTrue()
21+
{
22+
$this->assertFalse($this->getReserver()->waitAfterReservationAttempt());
23+
}
24+
25+
public function testReserveCallsBlpopWithTimeout()
26+
{
27+
$timeout = rand(1, 100);
28+
29+
$queues = array(
30+
'high',
31+
'medium',
32+
'low',
33+
);
34+
35+
$redisQueues = array(
36+
'queue:high',
37+
'queue:medium',
38+
'queue:low',
39+
);
40+
41+
$payload = array('class' => 'Test_Job');
42+
$item = array('resque:queue:high', json_encode($payload));
43+
44+
$redis = $this->getMockBuilder('\Resque_Redis')
45+
->disableOriginalConstructor()
46+
->setMethods(['__call'])
47+
->getMock();
48+
49+
$redis
50+
->expects($this->once())
51+
->method('__call')
52+
->with($this->equalTo('blpop'), $this->equalTo(array($redisQueues, $timeout)))
53+
->will($this->returnValue($item));
54+
55+
$originalRedis = Resque::$redis;
56+
57+
Resque::$redis = $redis;
58+
59+
$job = $this->getReserver($queues, $timeout)->reserve();
60+
$this->assertEquals('high', $job->queue);
61+
$this->assertEquals($payload, $job->payload);
62+
63+
Resque::$redis = $originalRedis;
64+
}
65+
}

0 commit comments

Comments
 (0)