-
Notifications
You must be signed in to change notification settings - Fork 4
add requeueing in background #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
69da24d
b128d26
e2d444c
b7035ba
2a9bdb4
069d9f9
061f998
f3a3714
74a7d48
862d41f
d3734e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,3 +4,7 @@ class Full(Exception): | |
|
|
||
| class Empty(Exception): | ||
| pass | ||
|
|
||
|
|
||
| class Stopped(Exception): | ||
| pass | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a newline |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| -- KEYS[1] last requeue timestamp | ||
| -- ARGV[1] current timestamp | ||
| -- ARGV[2] requeue interval | ||
|
|
||
| local last_requeue = tonumber(redis.call('get', KEYS[1])) | ||
| if last_requeue ~= nil and tonumber(ARGV[1]) - last_requeue <= tonumber(ARGV[2]) then | ||
| return {'error'} | ||
| else | ||
| redis.call('set', KEYS[1], ARGV[1]) | ||
| return {'ok'} | ||
| end |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| import asyncio | ||
| import timeit | ||
|
|
||
| from . import task, load_scripts, exceptions | ||
|
|
||
|
|
@@ -10,8 +11,10 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, | |
| fetching_fifo_key=None, | ||
| payloads_hash_key=None, | ||
| ack_hash_key=None, | ||
| last_requeue_key=None, | ||
| task_class=task.Task, | ||
| lua_sha=None, | ||
| requeue_interval=10000, | ||
| loop=None): | ||
|
|
||
| if main_queue_key is None: | ||
|
|
@@ -26,6 +29,9 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, | |
| if ack_hash_key is None: | ||
| ack_hash_key = key_prefix + 'ack' | ||
|
|
||
| if last_requeue_key is None: | ||
| last_requeue_key = key_prefix + 'last_requeue' | ||
|
|
||
| if loop is None: | ||
| loop = asyncio.get_event_loop() | ||
|
|
||
|
|
@@ -34,13 +40,22 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, | |
| 'fifo': fetching_fifo_key, | ||
| 'payload': payloads_hash_key, | ||
| 'ack': ack_hash_key, | ||
| 'last_requeue': last_requeue_key, | ||
| } | ||
|
|
||
| self._redis = redis | ||
| self._loop = loop | ||
| self._task_class = task_class | ||
| self._lua_sha = lua_sha if lua_sha is not None else {} | ||
| self._locks = {} | ||
| self._requeue_interval = requeue_interval | ||
| self._is_stopped = False | ||
|
|
||
| if self._requeue_interval != 0: | ||
| self._regular_requeue_task = \ | ||
| self._loop.create_task(self._requeue_regularly()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we call
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably the easiest way for now is to provide |
||
| else: | ||
| self._regular_requeue_task = None | ||
|
|
||
| async def _load_scripts(self, primary): | ||
|
|
||
|
|
@@ -82,6 +97,9 @@ async def _put_lua(self, task_id, task_payload): | |
| ) | ||
|
|
||
| def put(self, task, method='lua'): | ||
| if self._is_stopped: | ||
| raise exceptions.Stopped | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an error: you are rising an instance of type |
||
|
|
||
| task_id = self._task_class.generate_id() | ||
| task_payload = self._task_class.format_payload(task) | ||
|
|
||
|
|
@@ -107,6 +125,9 @@ async def _get_nowait(self, ack_info, script='get_nowait_l', key='fifo'): | |
| raise exceptions.Empty(result[0]) | ||
|
|
||
| async def get_nowait(self): | ||
| if self._is_stopped: | ||
| raise exceptions.Stopped | ||
|
|
||
| ack_info = self._task_class.generate_ack_info() | ||
|
|
||
| try: | ||
|
|
@@ -117,6 +138,9 @@ async def get_nowait(self): | |
| return await self._get_nowait(ack_info, 'get_nowait', 'queue') | ||
|
|
||
| async def get(self, retry_interval=1): | ||
| if self._is_stopped: | ||
| raise exceptions.Stopped | ||
|
|
||
| while self._loop.is_running(): | ||
| await self._redis.brpoplpush(self._keys['queue'], | ||
| self._keys['fifo'], | ||
|
|
@@ -145,12 +169,18 @@ async def _ack_lua(self, task_id): | |
| ) | ||
|
|
||
| def _ack(self, task_id, method='multi'): | ||
| if self._is_stopped: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this one is excessive |
||
| raise exceptions.Stopped | ||
|
|
||
| if method == 'multi': | ||
| return self._ack_pipe(task_id) | ||
| elif method == 'lua': | ||
| return self._ack_lua(task_id) | ||
|
|
||
| async def _fail(self, task_id): | ||
| if self._is_stopped: | ||
| raise exceptions.Stopped | ||
|
|
||
| if 'fail' not in self._lua_sha: | ||
| await self._load_scripts('fail') | ||
|
|
||
|
|
@@ -160,12 +190,44 @@ async def _fail(self, task_id): | |
| args=[task_id], | ||
| ) | ||
|
|
||
| async def _requeue(self, before=None): | ||
| async def requeue(self, before=-1): | ||
| if self._is_stopped: | ||
| raise exceptions.Stopped | ||
|
|
||
| if 'requeue' not in self._lua_sha: | ||
| await self._load_scripts('requeue') | ||
|
|
||
| if 'check_requeue_timestamp' not in self._lua_sha: | ||
| await self._load_scripts('check_requeue_timestamp') | ||
|
|
||
| now = int(timeit.default_timer() * 1000) | ||
| results = await self._redis.evalsha( | ||
| self._lua_sha['check_requeue_timestamp'], | ||
| keys=[self._keys['last_requeue']], | ||
| args=[now, self._requeue_interval], | ||
| ) | ||
| if results[0] == b'error': | ||
| return results | ||
|
|
||
| return await self._redis.evalsha( | ||
code-of-kpp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._lua_sha['requeue'], | ||
| keys=[self._keys['ack'], self._keys['queue']], | ||
| args=[before], | ||
| ) | ||
|
|
||
| def stop(self): | ||
| if self._is_stopped: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that one too (it is OK to call |
||
| raise exceptions.Stopped | ||
|
|
||
| if self._regular_requeue_task is not None: | ||
| self._regular_requeue_task.cancel() | ||
|
|
||
| self._is_stopped = True | ||
|
|
||
| async def _requeue_regularly(self): | ||
| before = int(timeit.default_timer() * 1000) | ||
| while (not self._redis.closed and self._loop.is_running() | ||
| and not self._is_stopped): | ||
| await asyncio.sleep(self._requeue_interval / 1000) | ||
| await self.requeue(before) | ||
| before += self._requeue_interval | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| import timeit | ||
| import asyncio | ||
| import pytest | ||
| import aioredis | ||
| import aioredisqueue | ||
|
|
||
code-of-kpp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| @pytest.mark.asyncio | ||
| async def test_requeue_method(): | ||
| redis = await aioredis.create_redis(('localhost', 6379), db=0) | ||
| queue = aioredisqueue.queue.Queue(redis) | ||
|
|
||
| await queue.put(b'payload') | ||
| await queue.get() | ||
| main_queue_len = await redis.llen(queue._keys['queue']) | ||
| ack_len = await redis.hlen(queue._keys['ack']) | ||
| assert ack_len >= 1 | ||
| await redis.delete(queue._keys['last_requeue']) | ||
| now = int(timeit.default_timer() * 1000) | ||
| results = await queue.requeue() | ||
| assert results[0] == b'ok' | ||
| assert len(results[1]) == ack_len | ||
| assert len(results[3]) == ack_len | ||
| assert results[2][-1] == main_queue_len + ack_len | ||
| assert int(await redis.get(queue._keys['last_requeue'])) == now | ||
|
|
||
| queue.stop() | ||
| redis.close() | ||
| await redis.wait_closed() | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_too_early(): | ||
| redis = await aioredis.create_redis(('localhost', 6379), db=0) | ||
| queue = aioredisqueue.queue.Queue(redis) | ||
|
|
||
| now = int(timeit.default_timer() * 1000) | ||
| await redis.set(queue._keys['last_requeue'], now) | ||
| results = await queue.requeue() | ||
| assert results[0] == b'error' | ||
|
|
||
| queue.stop() | ||
| redis.close() | ||
| await redis.wait_closed() | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_in_background(): | ||
| redis = await aioredis.create_redis(('localhost', 6379), db=0) | ||
| requeue_interval = 100 | ||
| eps = 0.01 | ||
| queue = aioredisqueue.queue.Queue(redis, requeue_interval=requeue_interval) | ||
|
|
||
| await redis.delete(queue._keys['last_requeue']) | ||
|
|
||
| await queue.put(b'payload') | ||
| await asyncio.sleep(eps) | ||
| task = await queue.get() | ||
|
|
||
| await asyncio.sleep(requeue_interval / 1000) | ||
| assert await redis.hget(queue._keys['ack'], task.id) is not None | ||
|
|
||
| await asyncio.sleep(requeue_interval / 1000) | ||
| assert await redis.hget(queue._keys['ack'], task.id) is None | ||
|
|
||
| queue.stop() | ||
| redis.close() | ||
| await redis.wait_closed() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add at least minimal usage example