Skip to content
19 changes: 15 additions & 4 deletions aioredisqueue/lua/requeue.lua
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
-- KEYS[1] ack table
-- KEYS[2] main queue
-- ARGV[1] requeue jobs less than this numeric value
-- KEYS[3] last requeue timestamp
-- ARGV[1] current timestamp
-- ARGV[2] requeue interval
-- ARGV[3] requeue jobs less than this numeric value

local last_requeue = tonumber(redis.call('get', KEYS[3]))
local now = tonumber(ARGV[1])
if last_requeue ~= nil and now - last_requeue <= tonumber(ARGV[2]) then
return {'error'}
end

local job_id, v, r
local results_hdel = {}
local results_lpush = {}
local results_keys = {}
local min_val = tonumber(ARGV[1])
local min_val = tonumber(ARGV[3])
local kvs = redis.call('HGETALL', KEYS[1])

for i = 1, #kvs, 2 do
job_id = kvs[i]
v = tonumber(kvs[i + 1])
if v < min_val then
if min_val == -1 or v < min_val then
r = redis.call('hdel', KEYS[1], job_id)
table.insert(results_hdel, r)
if r then
Expand All @@ -26,4 +35,6 @@ for i = 1, #kvs, 2 do
end
end

return {results_hdel, results_lpush, results_keys}
redis.call('set', KEYS[3], tostring(now))

return {'ok', results_hdel, results_lpush, results_keys}
24 changes: 20 additions & 4 deletions aioredisqueue/queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import timeit

from . import task, load_scripts, exceptions

Expand All @@ -10,9 +11,11 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *,
fetching_fifo_key=None,
payloads_hash_key=None,
ack_hash_key=None,
last_requeue_key=None,
task_class=task.Task,
lua_sha=None,
loop=None):
loop=None,
requeue_interval=10000):

if main_queue_key is None:
main_queue_key = key_prefix + 'queue'
Expand All @@ -26,6 +29,9 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *,
if ack_hash_key is None:
ack_hash_key = key_prefix + 'ack'

if last_requeue_key is None:
last_requeue_key = key_prefix + 'last_requeue'

if loop is None:
loop = asyncio.get_event_loop()

Expand All @@ -34,13 +40,17 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *,
'fifo': fetching_fifo_key,
'payload': payloads_hash_key,
'ack': ack_hash_key,
'last_requeue': last_requeue_key,
}

self._redis = redis
self._loop = loop
self._task_class = task_class
self._lua_sha = lua_sha if lua_sha is not None else {}
self._locks = {}
self._requeue_interval = requeue_interval

self._loop.create_task(self._requeue_periodically())

async def _load_scripts(self, primary):

Expand Down Expand Up @@ -160,12 +170,18 @@ async def _fail(self, task_id):
args=[task_id],
)

async def _requeue(self, before=None):
async def _requeue(self, now, before=-1):
if 'requeue' not in self._lua_sha:
await self._load_scripts('requeue')

return await self._redis.evalsha(
self._lua_sha['requeue'],
keys=[self._keys['ack'], self._keys['queue']],
args=[before],
keys=[self._keys['ack'], self._keys['queue'], self._keys['last_requeue']],
args=[now, self._requeue_interval, before],
)

async def _requeue_periodically(self):
while not self._redis.closed:
await asyncio.sleep(self._requeue_interval / 1000)
now = int(timeit.default_timer() * 1000)
await self._requeue(now, now - self._requeue_interval)
63 changes: 63 additions & 0 deletions test/test_requeueing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import aioredis
import aioredisqueue
import asyncio
import pytest
import timeit

@pytest.mark.asyncio
async def test_requeue_method():
r = await aioredis.create_redis(('localhost', 6379), db=0)
q = aioredisqueue.queue.Queue(r)

await q.put(b'payload')
await q.get()
main_queue_len = await r.llen(q._keys['queue'])
ack_len = await r.hlen(q._keys['ack'])
assert ack_len >= 1
await r.delete(q._keys['last_requeue'])
now = int(timeit.default_timer() * 1000)
results = await q._requeue(now)
assert len(results[1]) == ack_len
assert len(results[3]) == ack_len
assert results[2][-1] == main_queue_len + ack_len
assert int(await r.get(q._keys['last_requeue'])) == now

r.close()
await r.wait_closed()


@pytest.mark.asyncio
async def test_too_early():
r = await aioredis.create_redis(('localhost', 6379), db=0)
q = aioredisqueue.queue.Queue(r)

now = int(timeit.default_timer() * 1000)
await r.set(q._keys['last_requeue'], now)
results = await q._requeue(now)
assert results[0] == b'error'

r.close()
await r.wait_closed()


@pytest.mark.asyncio
async def test_in_background():
r = await aioredis.create_redis(('localhost', 6379), db=0)
requeue_interval = 100
eps = 0.01
q = aioredisqueue.queue.Queue(r, requeue_interval=requeue_interval)

await r.delete(q._keys['last_requeue'])

await q.put(b'payload')
await asyncio.sleep(eps)
task = await q.get()

await asyncio.sleep(requeue_interval / 1000)
assert await r.hget(q._keys['ack'], task.id) is not None

await asyncio.sleep(requeue_interval / 1000)
assert await r.hget(q._keys['ack'], task.id) is None

r.close()
await r.wait_closed()