|
10 | 10 | -- { queue_id, job_id, payload, requeues_remaining } |
11 | 11 |
|
12 | 12 |
|
13 | | -local ready_queue_id_list = redis.call('ZRANGEBYSCORE', KEYS[1] .. ':' .. KEYS[2], 0, ARGV[1]) |
| 13 | +local prefix = KEYS[1] |
| 14 | +local queue_type = KEYS[2] |
| 15 | + |
| 16 | +local current_timestamp = ARGV[1] |
| 17 | +local job_expiry_interval = ARGV[2] |
| 18 | + |
| 19 | + |
| 20 | +local ready_queue_id_list = redis.call('ZRANGEBYSCORE', prefix .. ':' .. queue_type, 0, current_timestamp) |
14 | 21 | if next(ready_queue_id_list) ~= nil then |
15 | 22 | -- there is a queue ready to be dequeued. |
16 | 23 | local ready_queue_id = ready_queue_id_list[1] |
17 | 24 | -- dequeue a job from the job queue. |
18 | | - local job_id = redis.call('LPOP', KEYS[1] .. ':' .. KEYS[2] .. ':' .. ready_queue_id) |
| 25 | + local job_id = redis.call('LPOP', prefix .. ':' .. queue_type .. ':' .. ready_queue_id) |
19 | 26 | -- get the payload for this job |
20 | | - local payload = redis.call('HGET', KEYS[1] .. ':payload', KEYS[2] .. ':' .. ready_queue_id .. ':' .. job_id) |
| 27 | + local payload = redis.call('HGET', prefix .. ':payload', queue_type .. ':' .. ready_queue_id .. ':' .. job_id) |
21 | 28 | -- update the time keeper with the current dequeue time. |
22 | | - local interval = redis.call('HGET', KEYS[1] .. ':interval', KEYS[2] .. ':' .. ready_queue_id) |
23 | | - redis.call('PSETEX', KEYS[1] .. ':' .. KEYS[2] .. ':' .. ready_queue_id .. ':time', ARGV[2], ARGV[1]) |
| 29 | + redis.call('PSETEX', prefix .. ':' .. queue_type .. ':' .. ready_queue_id .. ':time', job_expiry_interval, current_timestamp) |
24 | 30 | -- check if there are any more jobs of this queue in the job queue. |
25 | | - if redis.call('LLEN', KEYS[1] .. ':' .. KEYS[2] .. ':' .. ready_queue_id) == 0 then |
| 31 | + if redis.call('LLEN', prefix .. ':' .. queue_type .. ':' .. ready_queue_id) == 0 then |
26 | 32 | -- there are no more jobs of this queue. remove this queue from the ready sorted set. |
27 | | - redis.call('ZREM', KEYS[1] .. ':' .. KEYS[2], ready_queue_id) |
| 33 | + redis.call('ZREM', prefix .. ':' .. queue_type, ready_queue_id) |
28 | 34 | -- now check if the ready sorted set is empty. |
29 | | - if redis.call('EXISTS', KEYS[1] .. ':' .. KEYS[2]) ~= 1 then |
| 35 | + if redis.call('EXISTS', prefix .. ':' .. queue_type) ~= 1 then |
30 | 36 | -- the ready sorted set is empty. remove this 'queue_type' from |
31 | 37 | -- the metris ready queue type set |
32 | | - redis.call('SREM', KEYS[1] .. ':ready:queue_type', KEYS[2]) |
| 38 | + redis.call('SREM', prefix .. ':ready:queue_type', queue_type) |
33 | 39 | end |
34 | 40 | else |
35 | 41 | -- there are more jobs in the queue. update the next |
36 | 42 | -- dequeue time for this queue in the ready sorted set. |
37 | | - local next_dequeue_time = ARGV[1] + interval |
38 | | - redis.call('ZADD', KEYS[1] .. ':' .. KEYS[2], next_dequeue_time, ready_queue_id) |
| 43 | + local next_dequeue_time = current_timestamp |
| 44 | + local interval = tonumber(redis.call('HGET', prefix .. ':interval', queue_type .. ':' .. ready_queue_id)) |
| 45 | + if interval then |
| 46 | + next_dequeue_time = current_timestamp + interval |
| 47 | + end |
| 48 | + redis.call('ZADD', prefix .. ':' .. queue_type, next_dequeue_time, ready_queue_id) |
39 | 49 | end |
40 | | - local job_expiry_time = ARGV[1] + ARGV[2] |
| 50 | + local job_expiry_time = current_timestamp + job_expiry_interval |
41 | 51 | -- finally, add the job_id and queue_id that was dequeued into the active sorted set. |
42 | | - redis.call('ZADD', KEYS[1] .. ':' .. KEYS[2] .. ':active', job_expiry_time, ready_queue_id .. ':' .. job_id) |
| 52 | + redis.call('ZADD', prefix .. ':' .. queue_type .. ':active', job_expiry_time, ready_queue_id .. ':' .. job_id) |
43 | 53 | -- add the queue_type to metrics active queue type set. |
44 | | - redis.call('SADD', KEYS[1] .. ':active:queue_type', KEYS[2]) |
| 54 | + redis.call('SADD', prefix .. ':active:queue_type', queue_type) |
45 | 55 |
|
46 | 56 | -- get the requeues_remaining for this job |
47 | | - local requeues_remaining = redis.call('HGET', KEYS[1] .. ':' .. KEYS[2] .. ':' .. ready_queue_id .. ':requeues_remaining', job_id) |
| 57 | + local requeues_remaining = redis.call('HGET', prefix .. ':' .. queue_type .. ':' .. ready_queue_id .. ':requeues_remaining', job_id) |
48 | 58 |
|
49 | 59 | -- update the metrics counters |
50 | 60 | -- update global counter. |
51 | | - local timestamp_minute = math.floor(ARGV[1]/60000) * 60000 -- get the epoch for the minute |
| 61 | + local timestamp_minute = math.floor(current_timestamp/60000) * 60000 -- get the epoch for the minute |
52 | 62 | local expiry_time = math.floor((timestamp_minute + 600000) / 1000) -- store the data for 10 minutes. |
53 | | - if redis.call('EXISTS', KEYS[1] .. ':dequeue_counter:' .. timestamp_minute) ~= 1 then |
| 63 | + if redis.call('EXISTS', prefix .. ':dequeue_counter:' .. timestamp_minute) ~= 1 then |
54 | 64 | -- counter does not exists. set the initial value and expiry. |
55 | | - redis.call('SET', KEYS[1] .. ':dequeue_counter:' .. timestamp_minute, 1) |
56 | | - redis.call('EXPIREAT', KEYS[1] .. ':dequeue_counter:' .. timestamp_minute, expiry_time) |
| 65 | + redis.call('SET', prefix .. ':dequeue_counter:' .. timestamp_minute, 1) |
| 66 | + redis.call('EXPIREAT', prefix .. ':dequeue_counter:' .. timestamp_minute, expiry_time) |
57 | 67 | else |
58 | 68 | -- counter already exists. just increment the value. |
59 | | - redis.call('INCR', KEYS[1] .. ':dequeue_counter:' .. timestamp_minute) |
| 69 | + redis.call('INCR', prefix .. ':dequeue_counter:' .. timestamp_minute) |
60 | 70 | end |
61 | 71 |
|
62 | 72 | -- update the current queue counter. |
63 | | - if redis.call('EXISTS', KEYS[1] .. ':' .. KEYS[2] .. ':' .. ready_queue_id .. ':dequeue_counter:' .. timestamp_minute) ~= 1 then |
| 73 | + if redis.call('EXISTS', prefix .. ':' .. queue_type .. ':' .. ready_queue_id .. ':dequeue_counter:' .. timestamp_minute) ~= 1 then |
64 | 74 | -- counter does not exists. set the initial value and expiry. |
65 | | - redis.call('SET', KEYS[1] .. ':' .. KEYS[2] .. ':' .. ready_queue_id .. ':dequeue_counter:' .. timestamp_minute, 1) |
66 | | - redis.call('EXPIREAT', KEYS[1] .. ':' .. KEYS[2] .. ':' .. ready_queue_id .. ':dequeue_counter:' .. timestamp_minute, expiry_time) |
| 75 | + redis.call('SET', prefix .. ':' .. queue_type .. ':' .. ready_queue_id .. ':dequeue_counter:' .. timestamp_minute, 1) |
| 76 | + redis.call('EXPIREAT', prefix .. ':' .. queue_type .. ':' .. ready_queue_id .. ':dequeue_counter:' .. timestamp_minute, expiry_time) |
67 | 77 | else |
68 | 78 | -- counter already exists. just increment the value. |
69 | | - redis.call('INCR', KEYS[1] .. ':' .. KEYS[2] .. ':' .. ready_queue_id .. ':dequeue_counter:' .. timestamp_minute) |
| 79 | + redis.call('INCR', prefix .. ':' .. queue_type .. ':' .. ready_queue_id .. ':dequeue_counter:' .. timestamp_minute) |
70 | 80 | end |
71 | 81 |
|
72 | 82 | return { ready_queue_id, job_id, payload, requeues_remaining } |
|
0 commit comments