Skip to content

Commit 2ce69ce

Browse files
committed
支持失败并重回队列操作
1 parent 47103e2 commit 2ce69ce

File tree

4 files changed

+33
-6
lines changed

4 files changed

+33
-6
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ if(null !== $message)
147147

148148
// 将消息标记为失败
149149
$queue->fail($message);
150+
151+
// 将消息标记为失败,并重回队列
152+
$queue->fail($message, true);
150153
}
151154
```
152155

src/Driver/IQueueDriver.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,10 @@ public function success(IMessage $message);
6161
* 将消息标记为失败
6262
*
6363
* @param \Imi\Queue\Contract\IMessage $message
64+
* @param bool $requeue
6465
* @return void
6566
*/
66-
public function fail(IMessage $message);
67+
public function fail(IMessage $message, bool $requeue = false);
6768

6869
/**
6970
* 获取队列状态

src/Driver/RedisQueueDriver.php

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,21 +328,35 @@ public function success(IMessage $message)
328328
* 将消息标记为失败
329329
*
330330
* @param \Imi\Queue\Contract\IMessage $message
331+
* @param bool $requeue
331332
* @return void
332333
*/
333-
public function fail(IMessage $message)
334+
public function fail(IMessage $message, bool $requeue = false)
334335
{
335336
$redis = RedisManager::getInstance($this->poolName);
337+
if($requeue)
338+
{
339+
$operation = <<<LUA
340+
-- 加入队列
341+
redis.call('rpush', KEYS[2], ARGV[1]);
342+
LUA;
343+
}
344+
else
345+
{
346+
$operation = <<<LUA
347+
-- 加入失败队列
348+
redis.call('rpush', KEYS[2], ARGV[1])
349+
LUA;
350+
}
336351
$result = $redis->evalEx(<<<LUA
337352
-- 从工作队列删除
338353
redis.call('zrem', KEYS[1], ARGV[1])
339-
-- 加入失败队列
340-
redis.call('rpush', KEYS[2], ARGV[1])
354+
{$operation}
341355
return true
342356
LUA
343357
, [
344358
$this->getQueueKey(QueueType::WORKING),
345-
$this->getQueueKey(QueueType::FAIL),
359+
$requeue ? $this->getQueueKey(QueueType::READY) : $this->getQueueKey(QueueType::FAIL),
346360
$message->getMessageId(),
347361
], 2);
348362

tests/Queue/BaseQueueTest.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,18 @@ public function testRestoreFailMessages()
8181
$message = $this->getDriver()->pop();
8282
$this->assertNotEmpty($message->getMessageId());
8383

84-
$driver->fail($message, 'gg');
84+
$driver->fail($message);
8585

8686
$this->assertEquals(1, $driver->restoreFailMessages());
87+
88+
// requeue
89+
$message = $this->getDriver()->pop();
90+
$this->assertNotEmpty($message->getMessageId());
91+
92+
$driver->fail($message, true);
93+
94+
$this->assertEquals(1, $driver->status()->getReady());
95+
$this->assertEquals(0, $driver->restoreFailMessages());
8796
}
8897

8998
public function testRestoreTimeoutMessages()

0 commit comments

Comments
 (0)