Skip to content

Commit 2057235

Browse files
committed
新增支持 pop() 超时时间,在超时前自动重试
1 parent 99100ac commit 2057235

File tree

4 files changed

+126
-55
lines changed

4 files changed

+126
-55
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
'co' => 1,
6464
// 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
6565
'process' => 1,
66-
// 消费循环尝试 pop 的时间间隔,单位:秒
66+
// 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
6767
'timespan' => 0.1,
6868
// 进程分组名称
6969
'processGroup' => 'a',
@@ -73,8 +73,12 @@
7373
'consumer' => 'AConsumer',
7474
// 驱动类所需要的参数数组
7575
'config' => [
76+
// redis 连接池明
7677
'poolName' => 'redis',
78+
// redis 键前缀
7779
'prefix' => 'imi:queue:test:',
80+
// 消费循环尝试 pop 的时间间隔,单位:秒(手动调用pop()方法有效)
81+
'timespan' => 0.1,
7882
]
7983
],
8084
],

src/Driver/IQueueDriver.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ public function push(IMessage $message, float $delay = 0, array $options = []):
2929
/**
3030
* 从队列弹出一个消息
3131
*
32+
* @param float $timeout
3233
* @return \Imi\Queue\Contract\IMessage|null
3334
*/
34-
public function pop(): ?IMessage;
35+
public function pop(float $timeout = 0): ?IMessage;
3536

3637
/**
3738
* 删除一个消息

src/Driver/RedisQueueDriver.php

Lines changed: 81 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ class RedisQueueDriver implements IQueueDriver
4141
*/
4242
protected $name;
4343

44+
/**
45+
* 循环尝试 pop 的时间间隔,单位:秒
46+
*
47+
* @var float
48+
*/
49+
protected $timespan = 0.03;
50+
4451
public function __construct(string $name, array $config = [])
4552
{
4653
$this->name = $name;
@@ -162,65 +169,87 @@ public function push(IMessage $message, float $delay = 0, array $options = []):
162169

163170
/**
164171
* 从队列弹出一个消息
165-
*
172+
*
173+
* @param float $timeout 超时时间,单位:秒。值是-1时立即返回结果
166174
* @return \Imi\Queue\Contract\IMessage|null
167175
*/
168-
public function pop(): ?IMessage
176+
public function pop(float $timeout = -1): ?IMessage
169177
{
170-
$this->parseDelayMessages();
171-
$this->parseTimeoutMessages();
172-
$redis = RedisManager::getInstance($this->poolName);
173-
$result = $redis->evalEx(<<<LUA
174-
-- 从列表弹出
175-
local messageId = redis.call('lpop', KEYS[1])
176-
if false == messageId then
177-
return -1
178-
end
179-
-- 获取消息内容
180-
local hashResult = redis.call('hgetall', KEYS[3] .. messageId)
181-
local message = {}
182-
for i=1,#hashResult,2 do
183-
message[hashResult[i]] = hashResult[i + 1]
184-
end
185-
-- 加入工作队列
186-
local score = tonumber(message.workingTimeout)
187-
if nil == score or score <= 0 then
188-
score = -1
189-
end
190-
redis.call('zadd', KEYS[2], ARGV[1] + score, messageId)
191-
return hashResult
192-
LUA
193-
, [
194-
$this->getQueueKey(QueueType::READY),
195-
$this->getQueueKey(QueueType::WORKING),
196-
$this->getMessageKeyPrefix(),
197-
microtime(true),
198-
], 3);
199-
200-
if(-1 === $result)
201-
{
202-
return null;
203-
}
204-
if(false === $result)
205-
{
206-
if('' === ($error = $redis->getLastError()))
178+
$time = $useTime = 0;
179+
do {
180+
if($timeout > 0)
207181
{
208-
throw new QueueException('Queue pop failed');
182+
if($time)
183+
{
184+
$leftTime = $timeout - $useTime;
185+
if($leftTime > $this->timespan)
186+
{
187+
usleep($this->timespan * 1000000);
188+
}
189+
}
190+
else
191+
{
192+
$time = microtime(true);
193+
}
209194
}
210-
else
195+
$this->parseDelayMessages();
196+
$this->parseTimeoutMessages();
197+
$redis = RedisManager::getInstance($this->poolName);
198+
$result = $redis->evalEx(<<<LUA
199+
-- 从列表弹出
200+
local messageId = redis.call('lpop', KEYS[1])
201+
if false == messageId then
202+
return -1
203+
end
204+
-- 获取消息内容
205+
local hashResult = redis.call('hgetall', KEYS[3] .. messageId)
206+
local message = {}
207+
for i=1,#hashResult,2 do
208+
message[hashResult[i]] = hashResult[i + 1]
209+
end
210+
-- 加入工作队列
211+
local score = tonumber(message.workingTimeout)
212+
if nil == score or score <= 0 then
213+
score = -1
214+
end
215+
redis.call('zadd', KEYS[2], ARGV[1] + score, messageId)
216+
return hashResult
217+
LUA
218+
, [
219+
$this->getQueueKey(QueueType::READY),
220+
$this->getQueueKey(QueueType::WORKING),
221+
$this->getMessageKeyPrefix(),
222+
microtime(true),
223+
], 3);
224+
if($result > 0)
211225
{
212-
throw new QueueException('Queue pop failed, ' . $error);
226+
$data = [];
227+
$length = count($result);
228+
for($i = 0; $i < $length; $i += 2)
229+
{
230+
$data[$result[$i]] = $result[$i + 1];
231+
}
232+
$message = new Message;
233+
$message->loadFromArray($data);
234+
return $message;
213235
}
214-
}
215-
$data = [];
216-
$length = count($result);
217-
for($i = 0; $i < $length; $i += 2)
218-
{
219-
$data[$result[$i]] = $result[$i + 1];
220-
}
221-
$message = new Message;
222-
$message->loadFromArray($data);
223-
return $message;
236+
if(false === $result)
237+
{
238+
if('' === ($error = $redis->getLastError()))
239+
{
240+
throw new QueueException('Queue pop failed');
241+
}
242+
else
243+
{
244+
throw new QueueException('Queue pop failed, ' . $error);
245+
}
246+
}
247+
if($timeout < 0)
248+
{
249+
return null;
250+
}
251+
} while(($useTime = (microtime(true) - $time)) < $timeout);
252+
return null;
224253
}
225254

226255
/**

tests/Queue/BaseQueueTest.php

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
<?php
22
namespace Imi\Queue\Test\Queue;
33

4+
use Swoole\Coroutine;
45
use Imi\Queue\Model\Message;
5-
use Imi\Queue\Driver\IQueueDriver;
66
use Swoole\Coroutine\Channel;
7+
use Imi\Queue\Driver\IQueueDriver;
78

89
abstract class BaseQueueTest extends BaseTest
910
{
1011
protected abstract function getDriver(): IQueueDriver;
1112

13+
public function testClear()
14+
{
15+
$this->getDriver()->clear();
16+
$this->assertTrue(true);
17+
}
18+
1219
public function testPush()
1320
{
1421
$driver = $this->getDriver();
@@ -24,6 +31,35 @@ public function testPop()
2431
$message = $this->getDriver()->pop();
2532
$this->assertInstanceOf(\Imi\Queue\Contract\IMessage::class, $message);
2633
$this->assertNotEmpty($message->getMessageId());
34+
$this->assertEquals('a', $message->getMessage());
35+
}
36+
37+
public function testPopTimeout()
38+
{
39+
$message = $totalTime = null;
40+
$channel = new Channel(1);
41+
go(function() use(&$message, &$totalTime, $channel){
42+
go(function() use($channel){
43+
Coroutine::sleep(1);
44+
$message = new Message;
45+
$message->setMessage('a');
46+
$messageId = $this->getDriver()->push($message);
47+
$this->assertNotEmpty($messageId);
48+
$channel->push(1);
49+
});
50+
$time = microtime(true);
51+
$message = $this->getDriver()->pop(3);
52+
$totalTime = microtime(true) - $time;
53+
$channel->push(1);
54+
});
55+
for($i = 0; $i < 2; ++$i)
56+
{
57+
$channel->pop(3);
58+
}
59+
$this->assertEquals(1, (int)$totalTime);
60+
$this->assertInstanceOf(\Imi\Queue\Contract\IMessage::class, $message);
61+
$this->assertNotEmpty($message->getMessageId());
62+
$this->assertEquals('a', $message->getMessage());
2763
}
2864

2965
public function testPushDelay()
@@ -48,6 +84,7 @@ public function testPushDelay()
4884
$this->assertEquals(3, (int)(microtime(true) - $time));
4985
$this->assertInstanceOf(\Imi\Queue\Contract\IMessage::class, $message);
5086
$this->assertNotEmpty($message->getMessageId());
87+
$this->assertEquals('b', $message->getMessage());
5188
}
5289

5390
public function testDelete()

0 commit comments

Comments
 (0)