Skip to content

Commit f4eeb58

Browse files
committed
New TaskQueue tests: bug when adding existing task
1 parent ca59bc8 commit f4eeb58

File tree

3 files changed

+173
-26
lines changed

3 files changed

+173
-26
lines changed

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@
4444
"web3==4.4.1",
4545
],
4646
'test': [
47-
"hypothesis==3.44.26",
47+
"hypothesis==3.69.5",
4848
# pinned to <3.7 until async fixtures work again
4949
# https://github.com/pytest-dev/pytest-asyncio/issues/89
5050
"pytest>=3.6,<3.7",
51-
"pytest-asyncio==0.8.0",
51+
"pytest-asyncio==0.9.0",
5252
"pytest-cov==2.5.1",
5353
"pytest-watch>=4.1.0,<5",
5454
"pytest-xdist==1.18.1",

tests/trinity/utils/test_task_queue.py

Lines changed: 149 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,132 @@
11
import asyncio
2+
from asyncio import (
3+
Event,
4+
)
5+
from contextlib import contextmanager
6+
import functools
27
import pytest
8+
import random
39

10+
from cancel_token import CancelToken, OperationCancelled
411
from eth_utils import ValidationError
12+
from hypothesis import (
13+
example,
14+
given,
15+
strategies as st,
16+
)
517

618
from trinity.utils.datastructures import TaskQueue
719

20+
DEFAULT_TIMEOUT = 0.05
21+
822

9-
async def wait(coro, timeout=0.05):
23+
async def wait(coro, timeout=DEFAULT_TIMEOUT):
1024
return await asyncio.wait_for(coro, timeout=timeout)
1125

1226

27+
@contextmanager
28+
def trap_operation_cancelled():
29+
try:
30+
yield
31+
except OperationCancelled:
32+
pass
33+
34+
35+
def run_in_event_loop(async_func):
36+
@functools.wraps(async_func)
37+
def wrapped(operations, queue_size, add_size, get_size, event_loop):
38+
event_loop.run_until_complete(asyncio.ensure_future(
39+
async_func(operations, queue_size, add_size, get_size, event_loop),
40+
loop=event_loop,
41+
))
42+
return wrapped
43+
44+
45+
@given(
46+
operations=st.lists(
47+
elements=st.tuples(st.integers(min_value=0, max_value=5), st.booleans()),
48+
min_size=10,
49+
max_size=30,
50+
),
51+
queue_size=st.integers(min_value=1, max_value=20),
52+
add_size=st.integers(min_value=1, max_value=20),
53+
get_size=st.integers(min_value=1, max_value=20),
54+
)
55+
@example(
56+
# try having two adders alternate a couple times quickly
57+
operations=[(0, False), (1, False), (0, False), (1, True), (2, False), (2, False), (2, False)],
58+
queue_size=5,
59+
add_size=2,
60+
get_size=5,
61+
)
62+
@run_in_event_loop
63+
async def test_no_asyncio_exception_leaks(operations, queue_size, add_size, get_size, event_loop):
64+
"""
65+
This could be made much more general, at the cost of simplicity.
66+
For now, this mimics real usage enough to hopefully catch the big issues.
67+
68+
Some examples for more generality:
69+
70+
- different get sizes on each call
71+
- complete varying amounts of tasks at each call
72+
"""
73+
74+
async def getter(queue, num_tasks, get_event, complete_event, cancel_token):
75+
with trap_operation_cancelled():
76+
# wait to run the get
77+
await cancel_token.cancellable_wait(get_event.wait())
78+
79+
batch, tasks = await cancel_token.cancellable_wait(
80+
queue.get(num_tasks)
81+
)
82+
get_event.clear()
83+
84+
# wait to run the completion
85+
await cancel_token.cancellable_wait(complete_event.wait())
86+
87+
queue.complete(batch, tasks)
88+
complete_event.clear()
89+
90+
async def adder(queue, add_size, add_event, cancel_token):
91+
with trap_operation_cancelled():
92+
# wait to run the add
93+
await cancel_token.cancellable_wait(add_event.wait())
94+
95+
await cancel_token.cancellable_wait(
96+
queue.add(tuple(random.randint(0, 2 ** 32) for _ in range(add_size)))
97+
)
98+
add_event.clear()
99+
100+
async def operation_order(operations, events, cancel_token):
101+
for operation_id, pause in operations:
102+
events[operation_id].set()
103+
if pause:
104+
await asyncio.sleep(0)
105+
106+
await asyncio.sleep(0)
107+
cancel_token.trigger()
108+
109+
q = TaskQueue(queue_size)
110+
events = tuple(Event() for _ in range(6))
111+
add_event, add2_event, get_event, get2_event, complete_event, complete2_event = events
112+
cancel_token = CancelToken('end test')
113+
114+
done, pending = await asyncio.wait([
115+
getter(q, get_size, get_event, complete_event, cancel_token),
116+
getter(q, get_size, get2_event, complete2_event, cancel_token),
117+
adder(q, add_size, add_event, cancel_token),
118+
adder(q, add_size, add2_event, cancel_token),
119+
operation_order(operations, events, cancel_token),
120+
], return_when=asyncio.FIRST_EXCEPTION)
121+
122+
for task in done:
123+
exc = task.exception()
124+
if exc:
125+
raise exc
126+
127+
assert not pending
128+
129+
13130
@pytest.mark.asyncio
14131
async def test_queue_size_reset_after_complete():
15132
q = TaskQueue(maxsize=2)
@@ -63,7 +180,7 @@ async def test_default_priority_order():
63180

64181
@pytest.mark.asyncio
65182
async def test_custom_priority_order():
66-
q = TaskQueue(maxsize=4, order_fn=lambda x: 0-x)
183+
q = TaskQueue(maxsize=4, order_fn=lambda x: 0 - x)
67184

68185
await wait(q.add((2, 1, 3)))
69186
(batch, tasks) = await wait(q.get())
@@ -108,6 +225,25 @@ async def test_wait_empty_queue():
108225
assert False, "should not return from get() when nothing is available on queue"
109226

110227

228+
@pytest.mark.asyncio
229+
async def test_cannot_complete_batch_with_wrong_task():
230+
q = TaskQueue()
231+
232+
await wait(q.add((1, 2)))
233+
234+
batch, tasks = await wait(q.get())
235+
236+
# cannot complete a valid task with a task it wasn't given
237+
with pytest.raises(ValidationError):
238+
q.complete(batch, (3, 4))
239+
240+
# partially invalid completion calls leave the valid task in an incomplete state
241+
with pytest.raises(ValidationError):
242+
q.complete(batch, (1, 3))
243+
244+
assert 1 in q
245+
246+
111247
@pytest.mark.asyncio
112248
async def test_cannot_complete_batch_unless_pending():
113249
q = TaskQueue()
@@ -156,10 +292,9 @@ async def test_two_pending_adds_one_release():
156292
assert len(tasks) in {0, 1}
157293

158294
if len(tasks) == 1:
159-
batch2, tasks2 = await wait(q.get())
295+
_, tasks2 = await wait(q.get())
160296
all_tasks = tuple(sorted(tasks + tasks2))
161297
elif len(tasks) == 2:
162-
batch2 = None
163298
all_tasks = tasks
164299

165300
assert all_tasks == (0, 3)
@@ -186,12 +321,20 @@ async def test_queue_get_cap(start_tasks, get_max, expected, remainder):
186321
assert tasks == expected
187322

188323
if remainder:
189-
batch2, tasks2 = await wait(q.get())
324+
_, tasks2 = await wait(q.get())
190325
assert tasks2 == remainder
191326
else:
192327
try:
193-
batch2, tasks2 = await wait(q.get())
328+
_, tasks2 = await wait(q.get())
194329
except asyncio.TimeoutError:
195330
pass
196331
else:
197332
assert False, f"No more tasks to get, but got {tasks2!r}"
333+
334+
335+
@pytest.mark.asyncio
336+
async def test_cannot_readd_same_task():
337+
q = TaskQueue()
338+
await q.add((1, 2))
339+
with pytest.raises(ValidationError):
340+
await q.add((2,))

trinity/utils/datastructures.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ async def add(self, tasks: Tuple[TTask, ...]) -> None:
7676
if not isinstance(tasks, tuple):
7777
raise ValidationError(f"must pass a tuple of tasks to add(), but got {tasks!r}")
7878

79+
already_pending = self._tasks.intersection(tasks)
80+
if already_pending:
81+
raise ValidationError(
82+
f"Can't readd a task to queue. {already_pending!r} are already present"
83+
)
84+
7985
# make sure to insert the highest-priority items first, in case queue fills up
8086
remaining = tuple(sorted((self._order_fn(task), task) for task in tasks))
8187

@@ -102,24 +108,15 @@ async def add(self, tasks: Tuple[TTask, ...]) -> None:
102108
# There will always be room in _open_queue until _maxsize is reached
103109
try:
104110
self._open_queue.put_nowait(task)
105-
except QueueFull:
111+
except QueueFull as exc:
106112
task_idx = queueing.index(task)
107-
# TODO remove once this bug is tracked down
108-
import logging; logging.error(
109-
'TaskQueue unsuccessful in adding task %r because qsize=%d, '
110-
'num_tasks=%d, _maxsize=%d, open_slots=%d, num queueing=%d, '
111-
'len(_tasks)=%d, task_idx=%d, queuing=%r',
112-
task,
113-
self._open_queue.qsize(),
114-
num_tasks,
115-
self._maxsize,
116-
open_slots,
117-
len(queueing),
118-
len(self._tasks),
119-
task_idx,
120-
queueing,
113+
qsize = self._open_queue.qsize()
114+
raise QueueFull(
115+
f'TaskQueue unsuccessful in adding task {task[1]!r} because qsize={qsize}, '
116+
f'num_tasks={num_tasks}, maxsize={self._maxsize}, open_slots={open_slots}, '
117+
f'num queueing={len(queueing)}, len(_tasks)={len(self._tasks)}, task_idx='
118+
f'{task_idx}, queuing={queueing}, original msg: {exc}',
121119
)
122-
raise
123120

124121
unranked_queued = tuple(task for _rank, task in queueing)
125122
self._tasks.update(unranked_queued)
@@ -171,9 +168,16 @@ def complete(self, batch_id: int, completed: Tuple[TTask, ...]) -> None:
171168

172169
attempted = self._in_progress.pop(batch_id)
173170

174-
remaining = set(attempted).difference(completed)
171+
unrecognized_tasks = set(completed).difference(attempted)
172+
if unrecognized_tasks:
173+
self._in_progress[batch_id] = attempted
174+
raise ValidationError(
175+
f"cannot complete tasks {unrecognized_tasks!r} in this batch, only {attempted!r}"
176+
)
177+
178+
incomplete = set(attempted).difference(completed)
175179

176-
for task in remaining:
180+
for task in incomplete:
177181
# These tasks are already counted in the total task count, so there will be room
178182
self._open_queue.put_nowait((self._order_fn(task), task))
179183

0 commit comments

Comments
 (0)