Skip to content

Commit 618554a

Browse files
authored
模型支持动态指定表名和连接池名 (#190)
1 parent c22bdfc commit 618554a

File tree

1 file changed

+132
-142
lines changed

1 file changed

+132
-142
lines changed

src/Driver/RedisQueueDriver.php

Lines changed: 132 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -79,30 +79,29 @@ public function push(IMessage $message, float $delay = 0, array $options = []):
7979
$args[] = $v;
8080
}
8181
$result = $redis->evalEx(<<<'LUA'
82-
local queueKey = KEYS[1]
83-
local messageKeyPrefix = KEYS[2]
84-
local messageIdKey = KEYS[3]
85-
local delayTo = ARGV[1]
86-
local date = ARGV[2]
87-
-- 创建消息id
88-
local messageId = redis.call('hIncrby', messageIdKey, date, 1);
89-
if messageId > 0 then
90-
messageId = date .. messageId
91-
else
92-
return false
93-
end
94-
-- 创建消息
95-
local messageKey = messageKeyPrefix .. messageId;
96-
local ARGVLength = table.getn(ARGV)
97-
for i=3,ARGVLength,2 do
98-
redis.call('hset', messageKey, ARGV[i], ARGV[i + 1])
99-
end
100-
redis.call('hset', messageKey, 'messageId', messageId)
101-
-- 加入延时队列
102-
redis.call('zadd', queueKey, delayTo, messageId);
103-
return messageId
104-
LUA
105-
, $args, 3);
82+
local queueKey = KEYS[1]
83+
local messageKeyPrefix = KEYS[2]
84+
local messageIdKey = KEYS[3]
85+
local delayTo = ARGV[1]
86+
local date = ARGV[2]
87+
-- 创建消息id
88+
local messageId = redis.call('hIncrby', messageIdKey, date, 1);
89+
if messageId > 0 then
90+
messageId = date .. messageId
91+
else
92+
return false
93+
end
94+
-- 创建消息
95+
local messageKey = messageKeyPrefix .. messageId;
96+
local ARGVLength = table.getn(ARGV)
97+
for i=3,ARGVLength,2 do
98+
redis.call('hset', messageKey, ARGV[i], ARGV[i + 1])
99+
end
100+
redis.call('hset', messageKey, 'messageId', messageId)
101+
-- 加入延时队列
102+
redis.call('zadd', queueKey, delayTo, messageId);
103+
return messageId
104+
LUA, $args, 3);
106105
}
107106
else
108107
{
@@ -118,29 +117,28 @@ public function push(IMessage $message, float $delay = 0, array $options = []):
118117
$args[] = $v;
119118
}
120119
$result = $redis->evalEx(<<<'LUA'
121-
local queueKey = KEYS[1]
122-
local messageKeyPrefix = KEYS[2]
123-
local messageIdKey = KEYS[3]
124-
local date = ARGV[1]
125-
-- 创建消息id
126-
local messageId = redis.call('hIncrby', messageIdKey, date, 1);
127-
if messageId > 0 then
128-
messageId = date .. messageId
129-
else
130-
return false
131-
end
132-
-- 创建消息
133-
local messageKey = messageKeyPrefix .. messageId;
134-
local ARGVLength = table.getn(ARGV)
135-
for i=2,ARGVLength,2 do
136-
redis.call('hset', messageKey, ARGV[i], ARGV[i + 1])
137-
end
138-
redis.call('hset', messageKey, 'messageId', messageId)
139-
-- 加入队列
140-
redis.call('rpush', queueKey, messageId);
141-
return messageId
142-
LUA
143-
, $args, 3);
120+
local queueKey = KEYS[1]
121+
local messageKeyPrefix = KEYS[2]
122+
local messageIdKey = KEYS[3]
123+
local date = ARGV[1]
124+
-- 创建消息id
125+
local messageId = redis.call('hIncrby', messageIdKey, date, 1);
126+
if messageId > 0 then
127+
messageId = date .. messageId
128+
else
129+
return false
130+
end
131+
-- 创建消息
132+
local messageKey = messageKeyPrefix .. messageId;
133+
local ARGVLength = table.getn(ARGV)
134+
for i=2,ARGVLength,2 do
135+
redis.call('hset', messageKey, ARGV[i], ARGV[i + 1])
136+
end
137+
redis.call('hset', messageKey, 'messageId', messageId)
138+
-- 加入队列
139+
redis.call('rpush', queueKey, messageId);
140+
return messageId
141+
LUA, $args, 3);
144142
}
145143
if (false === $result)
146144
{
@@ -184,26 +182,25 @@ public function pop(float $timeout = 0): ?IMessage
184182
$this->parseTimeoutMessages();
185183
$redis = RedisManager::getInstance($this->poolName);
186184
$result = $redis->evalEx(<<<'LUA'
187-
-- 从列表弹出
188-
local messageId = redis.call('lpop', KEYS[1])
189-
if false == messageId then
190-
return -1
191-
end
192-
-- 获取消息内容
193-
local hashResult = redis.call('hgetall', KEYS[3] .. messageId)
194-
local message = {}
195-
for i=1,#hashResult,2 do
196-
message[hashResult[i]] = hashResult[i + 1]
197-
end
198-
-- 加入工作队列
199-
local score = tonumber(message.workingTimeout)
200-
if nil == score or score <= 0 then
201-
score = -1
202-
end
203-
redis.call('zadd', KEYS[2], ARGV[1] + score, messageId)
204-
return hashResult
205-
LUA
206-
, [
185+
-- 从列表弹出
186+
local messageId = redis.call('lpop', KEYS[1])
187+
if false == messageId then
188+
return -1
189+
end
190+
-- 获取消息内容
191+
local hashResult = redis.call('hgetall', KEYS[3] .. messageId)
192+
local message = {}
193+
for i=1,#hashResult,2 do
194+
message[hashResult[i]] = hashResult[i + 1]
195+
end
196+
-- 加入工作队列
197+
local score = tonumber(message.workingTimeout)
198+
if nil == score or score <= 0 then
199+
score = -1
200+
end
201+
redis.call('zadd', KEYS[2], ARGV[1] + score, messageId)
202+
return hashResult
203+
LUA, [
207204
$this->getQueueKey(QueueType::READY),
208205
$this->getQueueKey(QueueType::WORKING),
209206
$this->getMessageKeyPrefix(),
@@ -250,18 +247,17 @@ public function delete(IMessage $message): bool
250247
{
251248
$redis = RedisManager::getInstance($this->poolName);
252249
$result = $redis->evalEx(<<<'LUA'
253-
local messageId = ARGV[1]
254-
-- 删除消息
255-
redis.call('del', KEYS[3] .. messageId)
256-
-- 从队列删除
257-
if redis.call('lrem', KEYS[1], 1, messageId) <= 0 then
258-
if redis.call('zrem', KEYS[2], messageId) <= 0 then
259-
return false
260-
end
261-
end
262-
return true
263-
LUA
264-
, [
250+
local messageId = ARGV[1]
251+
-- 删除消息
252+
redis.call('del', KEYS[3] .. messageId)
253+
-- 从队列删除
254+
if redis.call('lrem', KEYS[1], 1, messageId) <= 0 then
255+
if redis.call('zrem', KEYS[2], messageId) <= 0 then
256+
return false
257+
end
258+
end
259+
return true
260+
LUA, [
265261
$this->getQueueKey(QueueType::READY),
266262
$this->getQueueKey(QueueType::DELAY),
267263
$this->getMessageKeyPrefix(),
@@ -311,15 +307,14 @@ public function success(IMessage $message): int
311307
{
312308
$redis = RedisManager::getInstance($this->poolName);
313309
$result = $redis->evalEx(<<<'LUA'
314-
-- 从工作队列删除
315-
redis.call('zrem', KEYS[1], ARGV[1])
316-
-- 从超时队列删除
317-
redis.call('del', KEYS[3])
318-
-- 删除消息
319-
redis.call('del', KEYS[2] .. ARGV[1])
320-
return true
321-
LUA
322-
, [
310+
-- 从工作队列删除
311+
redis.call('zrem', KEYS[1], ARGV[1])
312+
-- 从超时队列删除
313+
redis.call('del', KEYS[3])
314+
-- 删除消息
315+
redis.call('del', KEYS[2] .. ARGV[1])
316+
return true
317+
LUA, [
323318
$this->getQueueKey(QueueType::WORKING),
324319
$this->getMessageKeyPrefix(),
325320
$this->getQueueKey(QueueType::TIMEOUT),
@@ -350,24 +345,23 @@ public function fail(IMessage $message, bool $requeue = false): int
350345
if ($requeue)
351346
{
352347
$operation = <<<'LUA'
353-
-- 加入队列
354-
redis.call('rpush', KEYS[2], ARGV[1]);
355-
LUA;
348+
-- 加入队列
349+
redis.call('rpush', KEYS[2], ARGV[1]);
350+
LUA;
356351
}
357352
else
358353
{
359354
$operation = <<<'LUA'
360-
-- 加入失败队列
361-
redis.call('rpush', KEYS[2], ARGV[1])
362-
LUA;
355+
-- 加入失败队列
356+
redis.call('rpush', KEYS[2], ARGV[1])
357+
LUA;
363358
}
364359
$result = $redis->evalEx(<<<LUA
365-
-- 从工作队列删除
366-
redis.call('zrem', KEYS[1], ARGV[1])
367-
{$operation}
368-
return true
369-
LUA
370-
, [
360+
-- 从工作队列删除
361+
redis.call('zrem', KEYS[1], ARGV[1])
362+
{$operation}
363+
return true
364+
LUA, [
371365
$this->getQueueKey(QueueType::WORKING),
372366
$requeue ? $this->getQueueKey(QueueType::READY) : $this->getQueueKey(QueueType::FAIL),
373367
$message->getMessageId(),
@@ -422,14 +416,13 @@ public function restoreFailMessages(): int
422416
{
423417
$redis = RedisManager::getInstance($this->poolName);
424418
$result = $redis->evalEx(<<<'LUA'
425-
local result = 0
426-
while(redis.call('Rpoplpush', KEYS[2], KEYS[1]))
427-
do
428-
result = result + 1
429-
end
430-
return result
431-
LUA
432-
, [
419+
local result = 0
420+
while(redis.call('Rpoplpush', KEYS[2], KEYS[1]))
421+
do
422+
result = result + 1
423+
end
424+
return result
425+
LUA, [
433426
$this->getQueueKey(QueueType::READY),
434427
$this->getQueueKey(QueueType::FAIL),
435428
], 2);
@@ -456,14 +449,13 @@ public function restoreTimeoutMessages(): int
456449
{
457450
$redis = RedisManager::getInstance($this->poolName);
458451
$result = $redis->evalEx(<<<'LUA'
459-
local result = 0
460-
while(redis.call('Rpoplpush', KEYS[2], KEYS[1]))
461-
do
462-
result = result + 1
463-
end
464-
return result
465-
LUA
466-
, [
452+
local result = 0
453+
while(redis.call('Rpoplpush', KEYS[2], KEYS[1]))
454+
do
455+
result = result + 1
456+
end
457+
return result
458+
LUA, [
467459
$this->getQueueKey(QueueType::READY),
468460
$this->getQueueKey(QueueType::TIMEOUT),
469461
], 2);
@@ -492,19 +484,18 @@ protected function parseDelayMessages(int $count = 100): int
492484
{
493485
$redis = RedisManager::getInstance($this->poolName);
494486
$result = $redis->evalEx(<<<'LUA'
495-
-- 查询消息ID
496-
local messageIds = redis.call('zrevrangebyscore', KEYS[2], ARGV[1], 0, 'limit', 0, ARGV[2])
497-
local messageIdCount = table.getn(messageIds)
498-
if 0 == messageIdCount then
499-
return 0
500-
end
501-
-- 加入队列
502-
redis.call('rpush', KEYS[1], unpack(messageIds))
503-
-- 从延时队列删除
504-
redis.call('zrem', KEYS[2], unpack(messageIds))
505-
return messageIdCount
506-
LUA
507-
, [
487+
-- 查询消息ID
488+
local messageIds = redis.call('zrevrangebyscore', KEYS[2], ARGV[1], 0, 'limit', 0, ARGV[2])
489+
local messageIdCount = table.getn(messageIds)
490+
if 0 == messageIdCount then
491+
return 0
492+
end
493+
-- 加入队列
494+
redis.call('rpush', KEYS[1], unpack(messageIds))
495+
-- 从延时队列删除
496+
redis.call('zrem', KEYS[2], unpack(messageIds))
497+
return messageIdCount
498+
LUA, [
508499
$this->getQueueKey(QueueType::READY),
509500
$this->getQueueKey(QueueType::DELAY),
510501
microtime(true),
@@ -535,19 +526,18 @@ protected function parseTimeoutMessages(int $count = 100): int
535526
{
536527
$redis = RedisManager::getInstance($this->poolName);
537528
$result = $redis->evalEx(<<<'LUA'
538-
-- 查询消息ID
539-
local messageIds = redis.call('zrevrangebyscore', KEYS[1], ARGV[1], 0, 'limit', 0, ARGV[2])
540-
local messageIdCount = table.getn(messageIds)
541-
if 0 == messageIdCount then
542-
return 0
543-
end
544-
-- 加入超时队列
545-
redis.call('rpush', KEYS[2], unpack(messageIds))
546-
-- 从工作队列删除
547-
redis.call('zrem', KEYS[1], unpack(messageIds))
548-
return messageIdCount
549-
LUA
550-
, [
529+
-- 查询消息ID
530+
local messageIds = redis.call('zrevrangebyscore', KEYS[1], ARGV[1], 0, 'limit', 0, ARGV[2])
531+
local messageIdCount = table.getn(messageIds)
532+
if 0 == messageIdCount then
533+
return 0
534+
end
535+
-- 加入超时队列
536+
redis.call('rpush', KEYS[2], unpack(messageIds))
537+
-- 从工作队列删除
538+
redis.call('zrem', KEYS[1], unpack(messageIds))
539+
return messageIdCount
540+
LUA, [
551541
$this->getQueueKey(QueueType::WORKING),
552542
$this->getQueueKey(QueueType::TIMEOUT),
553543
microtime(true),

0 commit comments

Comments
 (0)