From 69da24d0d8c59c54739c413d73d3dca670439fa3 Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Sun, 21 Apr 2019 22:55:46 +0300 Subject: [PATCH 01/11] add requeueing in background --- aioredisqueue/lua/requeue.lua | 19 ++++++++--- aioredisqueue/queue.py | 24 ++++++++++--- test/test_requeueing.py | 63 +++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 8 deletions(-) create mode 100644 test/test_requeueing.py diff --git a/aioredisqueue/lua/requeue.lua b/aioredisqueue/lua/requeue.lua index 610089c..2b5163e 100644 --- a/aioredisqueue/lua/requeue.lua +++ b/aioredisqueue/lua/requeue.lua @@ -1,18 +1,27 @@ -- KEYS[1] ack table -- KEYS[2] main queue --- ARGV[1] requeue jobs less than this numeric value +-- KEYS[3] last requeue timestamp +-- ARGV[1] current timestamp +-- ARGV[2] requeue interval +-- ARGV[3] requeue jobs less than this numeric value + +local last_requeue = tonumber(redis.call('get', KEYS[3])) +local now = tonumber(ARGV[1]) +if last_requeue ~= nil and now - last_requeue <= tonumber(ARGV[2]) then + return {'error'} +end local job_id, v, r local results_hdel = {} local results_lpush = {} local results_keys = {} -local min_val = tonumber(ARGV[1]) +local min_val = tonumber(ARGV[3]) local kvs = redis.call('HGETALL', KEYS[1]) for i = 1, #kvs, 2 do job_id = kvs[i] v = tonumber(kvs[i + 1]) - if v < min_val then + if min_val == -1 or v < min_val then r = redis.call('hdel', KEYS[1], job_id) table.insert(results_hdel, r) if r then @@ -26,4 +35,6 @@ for i = 1, #kvs, 2 do end end -return {results_hdel, results_lpush, results_keys} +redis.call('set', KEYS[3], tostring(now)) + +return {'ok', results_hdel, results_lpush, results_keys} diff --git a/aioredisqueue/queue.py b/aioredisqueue/queue.py index 6f63f6f..7db1ad5 100644 --- a/aioredisqueue/queue.py +++ b/aioredisqueue/queue.py @@ -1,4 +1,5 @@ import asyncio +import timeit from . import task, load_scripts, exceptions @@ -10,9 +11,11 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, fetching_fifo_key=None, payloads_hash_key=None, ack_hash_key=None, + last_requeue_key=None, task_class=task.Task, lua_sha=None, - loop=None): + loop=None, + requeue_interval=10000): if main_queue_key is None: main_queue_key = key_prefix + 'queue' @@ -26,6 +29,9 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, if ack_hash_key is None: ack_hash_key = key_prefix + 'ack' + if last_requeue_key is None: + last_requeue_key = key_prefix + 'last_requeue' + if loop is None: loop = asyncio.get_event_loop() @@ -34,6 +40,7 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, 'fifo': fetching_fifo_key, 'payload': payloads_hash_key, 'ack': ack_hash_key, + 'last_requeue': last_requeue_key, } self._redis = redis @@ -41,6 +48,9 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, self._task_class = task_class self._lua_sha = lua_sha if lua_sha is not None else {} self._locks = {} + self._requeue_interval = requeue_interval + + self._loop.create_task(self._requeue_periodically()) async def _load_scripts(self, primary): @@ -160,12 +170,18 @@ async def _fail(self, task_id): args=[task_id], ) - async def _requeue(self, before=None): + async def _requeue(self, now, before=-1): if 'requeue' not in self._lua_sha: await self._load_scripts('requeue') return await self._redis.evalsha( self._lua_sha['requeue'], - keys=[self._keys['ack'], self._keys['queue']], - args=[before], + keys=[self._keys['ack'], self._keys['queue'], self._keys['last_requeue']], + args=[now, self._requeue_interval, before], ) + + async def _requeue_periodically(self): + while not self._redis.closed: + await asyncio.sleep(self._requeue_interval / 1000) + now = int(timeit.default_timer() * 1000) + await self._requeue(now, now - self._requeue_interval) diff --git a/test/test_requeueing.py b/test/test_requeueing.py new file mode 100644 index 0000000..63d145e --- /dev/null +++ b/test/test_requeueing.py @@ -0,0 +1,63 @@ +import aioredis +import aioredisqueue +import asyncio +import pytest +import timeit + +@pytest.mark.asyncio +async def test_requeue_method(): + r = await aioredis.create_redis(('localhost', 6379), db=0) + q = aioredisqueue.queue.Queue(r) + + await q.put(b'payload') + await q.get() + main_queue_len = await r.llen(q._keys['queue']) + ack_len = await r.hlen(q._keys['ack']) + assert ack_len >= 1 + await r.delete(q._keys['last_requeue']) + now = int(timeit.default_timer() * 1000) + results = await q._requeue(now) + assert len(results[1]) == ack_len + assert len(results[3]) == ack_len + assert results[2][-1] == main_queue_len + ack_len + assert int(await r.get(q._keys['last_requeue'])) == now + + r.close() + await r.wait_closed() + + +@pytest.mark.asyncio +async def test_too_early(): + r = await aioredis.create_redis(('localhost', 6379), db=0) + q = aioredisqueue.queue.Queue(r) + + now = int(timeit.default_timer() * 1000) + await r.set(q._keys['last_requeue'], now) + results = await q._requeue(now) + assert results[0] == b'error' + + r.close() + await r.wait_closed() + + +@pytest.mark.asyncio +async def test_in_background(): + r = await aioredis.create_redis(('localhost', 6379), db=0) + requeue_interval = 100 + eps = 0.01 + q = aioredisqueue.queue.Queue(r, requeue_interval=requeue_interval) + + await r.delete(q._keys['last_requeue']) + + await q.put(b'payload') + await asyncio.sleep(eps) + task = await q.get() + + await asyncio.sleep(requeue_interval / 1000) + assert await r.hget(q._keys['ack'], task.id) is not None + + await asyncio.sleep(requeue_interval / 1000) + assert await r.hget(q._keys['ack'], task.id) is None + + r.close() + await r.wait_closed() From b128d26a026747264dd730d91eed96993b9611db Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Fri, 26 Apr 2019 18:01:10 +0300 Subject: [PATCH 02/11] check last requeue timestamp separately, make requeue method public, remove now argument --- aioredisqueue/load_scripts.py | 6 +++++- aioredisqueue/lua/requeue.lua | 15 ++------------- aioredisqueue/queue.py | 23 ++++++++++++++++++----- test/test_requeueing.py | 5 +++-- 4 files changed, 28 insertions(+), 21 deletions(-) diff --git a/aioredisqueue/load_scripts.py b/aioredisqueue/load_scripts.py index 3007045..899924c 100644 --- a/aioredisqueue/load_scripts.py +++ b/aioredisqueue/load_scripts.py @@ -4,7 +4,7 @@ __all__ = ('load_put', 'load_ack', 'load_fail', 'load_requeue', - 'load_get_nowait', 'load_get_nowait_l') + 'load_get_nowait', 'load_get_nowait_l', 'load_check_requeue_timestamp') def _load(name): @@ -39,3 +39,7 @@ def load_get_nowait(): def load_get_nowait_l(): return _load('get_nowait_l') + + +def load_check_requeue_timestamp(): + return _load('check_requeue_timestamp') diff --git a/aioredisqueue/lua/requeue.lua b/aioredisqueue/lua/requeue.lua index 2b5163e..a4902cd 100644 --- a/aioredisqueue/lua/requeue.lua +++ b/aioredisqueue/lua/requeue.lua @@ -1,21 +1,12 @@ -- KEYS[1] ack table -- KEYS[2] main queue --- KEYS[3] last requeue timestamp --- ARGV[1] current timestamp --- ARGV[2] requeue interval --- ARGV[3] requeue jobs less than this numeric value - -local last_requeue = tonumber(redis.call('get', KEYS[3])) -local now = tonumber(ARGV[1]) -if last_requeue ~= nil and now - last_requeue <= tonumber(ARGV[2]) then - return {'error'} -end +-- ARGV[1] requeue jobs less than this numeric value local job_id, v, r local results_hdel = {} local results_lpush = {} local results_keys = {} -local min_val = tonumber(ARGV[3]) +local min_val = tonumber(ARGV[1]) local kvs = redis.call('HGETALL', KEYS[1]) for i = 1, #kvs, 2 do @@ -35,6 +26,4 @@ for i = 1, #kvs, 2 do end end -redis.call('set', KEYS[3], tostring(now)) - return {'ok', results_hdel, results_lpush, results_keys} diff --git a/aioredisqueue/queue.py b/aioredisqueue/queue.py index 7db1ad5..96c1d1f 100644 --- a/aioredisqueue/queue.py +++ b/aioredisqueue/queue.py @@ -170,18 +170,31 @@ async def _fail(self, task_id): args=[task_id], ) - async def _requeue(self, now, before=-1): + async def requeue(self, before=-1): if 'requeue' not in self._lua_sha: await self._load_scripts('requeue') + if 'check_requeue_timestamp' not in self._lua_sha: + await self._load_scripts('check_requeue_timestamp') + + now = int(timeit.default_timer() * 1000) + results = await self._redis.evalsha( + self._lua_sha['check_requeue_timestamp'], + keys=[self._keys['last_requeue']], + args=[now, self._requeue_interval], + ) + if results[0] == b'error': + return results + return await self._redis.evalsha( self._lua_sha['requeue'], - keys=[self._keys['ack'], self._keys['queue'], self._keys['last_requeue']], - args=[now, self._requeue_interval, before], + keys=[self._keys['ack'], self._keys['queue']], + args=[before], ) async def _requeue_periodically(self): + before = int(timeit.default_timer() * 1000) while not self._redis.closed: await asyncio.sleep(self._requeue_interval / 1000) - now = int(timeit.default_timer() * 1000) - await self._requeue(now, now - self._requeue_interval) + await self.requeue(before) + before += self._requeue_interval diff --git a/test/test_requeueing.py b/test/test_requeueing.py index 63d145e..992652d 100644 --- a/test/test_requeueing.py +++ b/test/test_requeueing.py @@ -16,7 +16,8 @@ async def test_requeue_method(): assert ack_len >= 1 await r.delete(q._keys['last_requeue']) now = int(timeit.default_timer() * 1000) - results = await q._requeue(now) + results = await q.requeue() + assert results[0] == b'ok' assert len(results[1]) == ack_len assert len(results[3]) == ack_len assert results[2][-1] == main_queue_len + ack_len @@ -33,7 +34,7 @@ async def test_too_early(): now = int(timeit.default_timer() * 1000) await r.set(q._keys['last_requeue'], now) - results = await q._requeue(now) + results = await q.requeue() assert results[0] == b'error' r.close() From e2d444c143df0a898ef0733c59d91b7da311f818 Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Fri, 26 Apr 2019 18:03:51 +0300 Subject: [PATCH 03/11] list loop argument last --- aioredisqueue/queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aioredisqueue/queue.py b/aioredisqueue/queue.py index 96c1d1f..b7f8b97 100644 --- a/aioredisqueue/queue.py +++ b/aioredisqueue/queue.py @@ -14,8 +14,8 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, last_requeue_key=None, task_class=task.Task, lua_sha=None, - loop=None, - requeue_interval=10000): + requeue_interval=10000, + loop=None): if main_queue_key is None: main_queue_key = key_prefix + 'queue' From b7035ba875a360ba35ebb5316e1b8f96b07e76ae Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Fri, 26 Apr 2019 20:15:23 +0300 Subject: [PATCH 04/11] add no auto requeueing option --- aioredisqueue/queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aioredisqueue/queue.py b/aioredisqueue/queue.py index b7f8b97..caacfd7 100644 --- a/aioredisqueue/queue.py +++ b/aioredisqueue/queue.py @@ -50,7 +50,8 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, self._locks = {} self._requeue_interval = requeue_interval - self._loop.create_task(self._requeue_periodically()) + if self._requeue_interval != 0: + self._loop.create_task(self._requeue_periodically()) async def _load_scripts(self, primary): From 2a9bdb4718ea2fda1756e35279ea38d9e3b08369 Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Fri, 26 Apr 2019 20:44:33 +0300 Subject: [PATCH 05/11] add lua script for requeue timestamp --- aioredisqueue/lua/check_requeue_timestamp.lua | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 aioredisqueue/lua/check_requeue_timestamp.lua diff --git a/aioredisqueue/lua/check_requeue_timestamp.lua b/aioredisqueue/lua/check_requeue_timestamp.lua new file mode 100644 index 0000000..c8aa3a3 --- /dev/null +++ b/aioredisqueue/lua/check_requeue_timestamp.lua @@ -0,0 +1,11 @@ +-- KEYS[1] last requeue timestamp +-- ARGV[1] current timestamp +-- ARGV[2] requeue interval + +local last_requeue = tonumber(redis.call('get', KEYS[1])) +if last_requeue ~= nil and tonumber(ARGV[1]) - last_requeue <= tonumber(ARGV[2]) then + return {'error'} +else + redis.call('set', KEYS[1], ARGV[1]) + return {'ok'} +end From 069d9f9222bad5d0e65f1bc654c7ebc44999d532 Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Wed, 15 May 2019 20:05:57 +0300 Subject: [PATCH 06/11] change stop condition for auto requeueing, add a way to stop manually --- aioredisqueue/queue.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/aioredisqueue/queue.py b/aioredisqueue/queue.py index caacfd7..6a52111 100644 --- a/aioredisqueue/queue.py +++ b/aioredisqueue/queue.py @@ -49,6 +49,7 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, self._lua_sha = lua_sha if lua_sha is not None else {} self._locks = {} self._requeue_interval = requeue_interval + self._requeueing_is_stopped = self._requeue_interval == 0 if self._requeue_interval != 0: self._loop.create_task(self._requeue_periodically()) @@ -193,9 +194,13 @@ async def requeue(self, before=-1): args=[before], ) + def stop_requeueing(self): + self._requeueing_is_stopped = True + async def _requeue_periodically(self): before = int(timeit.default_timer() * 1000) - while not self._redis.closed: + while (not self._redis.closed and self._loop.is_running() + and not self._requeueing_is_stopped): await asyncio.sleep(self._requeue_interval / 1000) await self.requeue(before) before += self._requeue_interval From 061f998fb950061346db0943c943dcb13edcbb5e Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Wed, 15 May 2019 21:48:47 +0300 Subject: [PATCH 07/11] change import order and variable names --- test/test_requeueing.py | 58 ++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/test/test_requeueing.py b/test/test_requeueing.py index 992652d..02654f6 100644 --- a/test/test_requeueing.py +++ b/test/test_requeueing.py @@ -1,64 +1,64 @@ -import aioredis -import aioredisqueue +import timeit import asyncio import pytest -import timeit +import aioredis +import aioredisqueue @pytest.mark.asyncio async def test_requeue_method(): - r = await aioredis.create_redis(('localhost', 6379), db=0) - q = aioredisqueue.queue.Queue(r) + redis = await aioredis.create_redis(('localhost', 6379), db=0) + queue = aioredisqueue.queue.Queue(redis) - await q.put(b'payload') - await q.get() - main_queue_len = await r.llen(q._keys['queue']) - ack_len = await r.hlen(q._keys['ack']) + await queue.put(b'payload') + await queue.get() + main_queue_len = await redis.llen(queue._keys['queue']) + ack_len = await redis.hlen(queue._keys['ack']) assert ack_len >= 1 - await r.delete(q._keys['last_requeue']) + await redis.delete(queue._keys['last_requeue']) now = int(timeit.default_timer() * 1000) - results = await q.requeue() + results = await queue.requeue() assert results[0] == b'ok' assert len(results[1]) == ack_len assert len(results[3]) == ack_len assert results[2][-1] == main_queue_len + ack_len - assert int(await r.get(q._keys['last_requeue'])) == now + assert int(await redis.get(queue._keys['last_requeue'])) == now - r.close() - await r.wait_closed() + redis.close() + await redis.wait_closed() @pytest.mark.asyncio async def test_too_early(): - r = await aioredis.create_redis(('localhost', 6379), db=0) - q = aioredisqueue.queue.Queue(r) + redis = await aioredis.create_redis(('localhost', 6379), db=0) + queue = aioredisqueue.queue.Queue(redis) now = int(timeit.default_timer() * 1000) - await r.set(q._keys['last_requeue'], now) - results = await q.requeue() + await redis.set(queue._keys['last_requeue'], now) + results = await queue.requeue() assert results[0] == b'error' - r.close() - await r.wait_closed() + redis.close() + await redis.wait_closed() @pytest.mark.asyncio async def test_in_background(): - r = await aioredis.create_redis(('localhost', 6379), db=0) + redis = await aioredis.create_redis(('localhost', 6379), db=0) requeue_interval = 100 eps = 0.01 - q = aioredisqueue.queue.Queue(r, requeue_interval=requeue_interval) + queue = aioredisqueue.queue.Queue(redis, requeue_interval=requeue_interval) - await r.delete(q._keys['last_requeue']) + await redis.delete(queue._keys['last_requeue']) - await q.put(b'payload') + await queue.put(b'payload') await asyncio.sleep(eps) - task = await q.get() + task = await queue.get() await asyncio.sleep(requeue_interval / 1000) - assert await r.hget(q._keys['ack'], task.id) is not None + assert await redis.hget(queue._keys['ack'], task.id) is not None await asyncio.sleep(requeue_interval / 1000) - assert await r.hget(q._keys['ack'], task.id) is None + assert await redis.hget(queue._keys['ack'], task.id) is None - r.close() - await r.wait_closed() + redis.close() + await redis.wait_closed() From f3a371437dc47731774c694b210aaef217ef04aa Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Wed, 15 May 2019 22:28:37 +0300 Subject: [PATCH 08/11] change stop semantics --- aioredisqueue/exceptions.py | 4 ++++ aioredisqueue/queue.py | 28 ++++++++++++++++++++++++---- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/aioredisqueue/exceptions.py b/aioredisqueue/exceptions.py index 1dd16ad..4c96294 100644 --- a/aioredisqueue/exceptions.py +++ b/aioredisqueue/exceptions.py @@ -4,3 +4,7 @@ class Full(Exception): class Empty(Exception): pass + + +class Stopped(Exception): + pass \ No newline at end of file diff --git a/aioredisqueue/queue.py b/aioredisqueue/queue.py index 6a52111..8a86350 100644 --- a/aioredisqueue/queue.py +++ b/aioredisqueue/queue.py @@ -49,7 +49,7 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, self._lua_sha = lua_sha if lua_sha is not None else {} self._locks = {} self._requeue_interval = requeue_interval - self._requeueing_is_stopped = self._requeue_interval == 0 + self._is_stopped = False if self._requeue_interval != 0: self._loop.create_task(self._requeue_periodically()) @@ -94,6 +94,9 @@ async def _put_lua(self, task_id, task_payload): ) def put(self, task, method='lua'): + if self._is_stopped: + raise exceptions.Stopped + task_id = self._task_class.generate_id() task_payload = self._task_class.format_payload(task) @@ -119,6 +122,9 @@ async def _get_nowait(self, ack_info, script='get_nowait_l', key='fifo'): raise exceptions.Empty(result[0]) async def get_nowait(self): + if self._is_stopped: + raise exceptions.Stopped + ack_info = self._task_class.generate_ack_info() try: @@ -129,6 +135,9 @@ async def get_nowait(self): return await self._get_nowait(ack_info, 'get_nowait', 'queue') async def get(self, retry_interval=1): + if self._is_stopped: + raise exceptions.Stopped + while self._loop.is_running(): await self._redis.brpoplpush(self._keys['queue'], self._keys['fifo'], @@ -157,12 +166,18 @@ async def _ack_lua(self, task_id): ) def _ack(self, task_id, method='multi'): + if self._is_stopped: + raise exceptions.Stopped + if method == 'multi': return self._ack_pipe(task_id) elif method == 'lua': return self._ack_lua(task_id) async def _fail(self, task_id): + if self._is_stopped: + raise exceptions.Stopped + if 'fail' not in self._lua_sha: await self._load_scripts('fail') @@ -173,6 +188,9 @@ async def _fail(self, task_id): ) async def requeue(self, before=-1): + if self._is_stopped: + raise exceptions.Stopped + if 'requeue' not in self._lua_sha: await self._load_scripts('requeue') @@ -194,13 +212,15 @@ async def requeue(self, before=-1): args=[before], ) - def stop_requeueing(self): - self._requeueing_is_stopped = True + def stop(self): + if self._is_stopped: + raise exceptions.Stopped + self._is_stopped = True async def _requeue_periodically(self): before = int(timeit.default_timer() * 1000) while (not self._redis.closed and self._loop.is_running() - and not self._requeueing_is_stopped): + and not self._is_stopped): await asyncio.sleep(self._requeue_interval / 1000) await self.requeue(before) before += self._requeue_interval From 74a7d48cadac36183236b8497f51920b77675d2f Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Wed, 15 May 2019 22:59:14 +0300 Subject: [PATCH 09/11] cancel requeue_regularly task in stop method, stop queue in the end of tests --- aioredisqueue/queue.py | 11 +++++++++-- test/test_requeueing.py | 3 +++ test/test_task_usage.py | 13 +++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/aioredisqueue/queue.py b/aioredisqueue/queue.py index 8a86350..1821c8c 100644 --- a/aioredisqueue/queue.py +++ b/aioredisqueue/queue.py @@ -52,7 +52,10 @@ def __init__(self, redis, key_prefix='aioredisqueue_', *, self._is_stopped = False if self._requeue_interval != 0: - self._loop.create_task(self._requeue_periodically()) + self._regular_requeue_task = \ + self._loop.create_task(self._requeue_regularly()) + else: + self._regular_requeue_task = None async def _load_scripts(self, primary): @@ -215,9 +218,13 @@ async def requeue(self, before=-1): def stop(self): if self._is_stopped: raise exceptions.Stopped + + if self._regular_requeue_task is not None: + self._regular_requeue_task.cancel() + self._is_stopped = True - async def _requeue_periodically(self): + async def _requeue_regularly(self): before = int(timeit.default_timer() * 1000) while (not self._redis.closed and self._loop.is_running() and not self._is_stopped): diff --git a/test/test_requeueing.py b/test/test_requeueing.py index 02654f6..ae42571 100644 --- a/test/test_requeueing.py +++ b/test/test_requeueing.py @@ -23,6 +23,7 @@ async def test_requeue_method(): assert results[2][-1] == main_queue_len + ack_len assert int(await redis.get(queue._keys['last_requeue'])) == now + queue.stop() redis.close() await redis.wait_closed() @@ -37,6 +38,7 @@ async def test_too_early(): results = await queue.requeue() assert results[0] == b'error' + queue.stop() redis.close() await redis.wait_closed() @@ -60,5 +62,6 @@ async def test_in_background(): await asyncio.sleep(requeue_interval / 1000) assert await redis.hget(queue._keys['ack'], task.id) is None + queue.stop() redis.close() await redis.wait_closed() diff --git a/test/test_task_usage.py b/test/test_task_usage.py index 8e82abc..ab3aecf 100644 --- a/test/test_task_usage.py +++ b/test/test_task_usage.py @@ -14,6 +14,10 @@ async def test_basic_put_get_and_get(): result = await queue.get() assert result.payload == message + queue.stop() + r.close() + await r.wait_closed() + async def get_ack(queue): task = await queue.get() @@ -43,6 +47,11 @@ async def test_ack(): await get_ack(queue) + queue.stop() + r.close() + await r.wait_closed() + + @pytest.mark.asyncio async def test_fail_ack(): r = await aioredis.create_redis(('localhost', 6379), db=0) @@ -51,3 +60,7 @@ async def test_fail_ack(): message = b'payload' await queue.put(message) await get_fail_ack(queue) + + queue.stop() + r.close() + await r.wait_closed() From 862d41fc67ef378a7a2e7f14169485e73bc96276 Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Wed, 15 May 2019 23:00:50 +0300 Subject: [PATCH 10/11] change import order --- test/test_task_usage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_task_usage.py b/test/test_task_usage.py index ab3aecf..e7f80b5 100644 --- a/test/test_task_usage.py +++ b/test/test_task_usage.py @@ -1,5 +1,5 @@ -import pytest import asyncio +import pytest import aioredis import aioredisqueue From d3734e0fe317cdee3f2b6ab24115e65aa4a4048e Mon Sep 17 00:00:00 2001 From: "mesher.x" Date: Wed, 15 May 2019 23:04:39 +0300 Subject: [PATCH 11/11] remove periodical requeueing from new features section --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 45c6d7f..9e54cbb 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,5 @@ performed during each such call. be renamed into `put_nowait`. - `get_multi` and `put_multi` methods, allowing getting and putting multiple items from queue with one call -- method for periodical requeueing of not acknowledged tasks - keeping track of times a task was requeued, dropping too old tasks or tasks with too many retries.