Skip to content

Commit f7b0412

Browse files
committed
Extract out the reused queue_get_batch
1 parent b9a6e82 commit f7b0412

File tree

2 files changed

+66
-55
lines changed

2 files changed

+66
-55
lines changed

trinity/utils/datastructures.py

Lines changed: 14 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131
identity,
3232
)
3333

34+
from trinity.utils.queues import (
35+
queue_get_batch,
36+
queue_get_nowait,
37+
)
38+
3439
TFunc = TypeVar('TFunc')
3540
TSubtask = TypeVar('TSubtask', bound=Enum)
3641
TTask = TypeVar('TTask')
@@ -211,7 +216,10 @@ def get_nowait(self, max_results: int = None) -> Tuple[int, Tuple[TTask, ...]]:
211216
if self._open_queue.empty():
212217
raise QueueFull("No tasks are available to get")
213218
else:
214-
pending_tasks = self._get_nowait(max_results)
219+
ranked_tasks = queue_get_nowait(self._open_queue, max_results)
220+
221+
# strip out the wrapper used internally for sorting
222+
pending_tasks = tuple(task.original for task in ranked_tasks)
215223

216224
# Generate a pending batch of tasks, so uncompleted tasks can be inferred
217225
next_id = next(self._id_generator)
@@ -226,49 +234,14 @@ async def get(self, max_results: int = None) -> Tuple[int, Tuple[TTask, ...]]:
226234
:param max_results: return up to this many pending tasks. If None, return all pending tasks.
227235
:return: (batch_id, tasks to attempt)
228236
"""
229-
if max_results is not None and max_results < 1:
230-
raise ValidationError("Must request at least one task to process, not {max_results!r}")
231-
232-
# if the queue is empty, wait until at least one item is available
233-
queue = self._open_queue
234-
if queue.empty():
235-
wrapped_first_task = await queue.get()
236-
else:
237-
wrapped_first_task = queue.get_nowait()
238-
first_task = wrapped_first_task.original
239-
240-
# In order to return from get() as soon as possible, never await again.
241-
# Instead, take only the tasks that are already available.
242-
if max_results is None:
243-
remaining_count = None
244-
else:
245-
remaining_count = max_results - 1
246-
remaining_tasks = self._get_nowait(remaining_count)
247-
248-
# Combine the first and remaining tasks
249-
all_tasks = (first_task, ) + remaining_tasks
237+
ranked_tasks = await queue_get_batch(self._open_queue, max_results)
238+
pending_tasks = tuple(task.original for task in ranked_tasks)
250239

251240
# Generate a pending batch of tasks, so uncompleted tasks can be inferred
252241
next_id = next(self._id_generator)
253-
self._in_progress[next_id] = all_tasks
254-
255-
return (next_id, all_tasks)
242+
self._in_progress[next_id] = pending_tasks
256243

257-
def _get_nowait(self, max_results: int = None) -> Tuple[TTask, ...]:
258-
queue = self._open_queue
259-
260-
# How many results do we want?
261-
available = queue.qsize()
262-
if max_results is None:
263-
num_tasks = available
264-
else:
265-
num_tasks = min((available, max_results))
266-
267-
# Combine the remaining tasks with the first task we already pulled.
268-
ranked_tasks = tuple(queue.get_nowait() for _ in range(num_tasks))
269-
270-
# strip out the wrapper used internally for sorting
271-
return tuple(task.original for task in ranked_tasks)
244+
return (next_id, pending_tasks)
272245

273246
def complete(self, batch_id: int, completed: Tuple[TTask, ...]) -> None:
274247
if batch_id not in self._in_progress:
@@ -538,21 +511,7 @@ async def ready_tasks(self) -> Tuple[TTask, ...]:
538511
Return the next batch of tasks that are ready to process. If none are ready,
539512
hang until at least one task becomes ready.
540513
"""
541-
queue = self._ready_tasks
542-
if queue.empty():
543-
first_task = await queue.get()
544-
else:
545-
first_task = queue.get_nowait()
546-
547-
# In order to return from get() as soon as possible, never await again.
548-
# Instead, take only the tasks that are already available.
549-
available = queue.qsize()
550-
551-
available_tasks = tuple(queue.get_nowait() for _ in range(available))
552-
553-
completed = (first_task, ) + available_tasks
554-
555-
return completed
514+
return await queue_get_batch(self._ready_tasks)
556515

557516
def _mark_complete(self, task_id: TTaskID) -> None:
558517
qualified_tasks = tuple([task_id])

trinity/utils/queues.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from asyncio import ( # noqa: F401
2+
Queue,
3+
)
4+
from typing import (
5+
Tuple,
6+
TypeVar,
7+
)
8+
9+
from eth_utils import (
10+
ValidationError,
11+
)
12+
13+
TQueueItem = TypeVar('TQueueItem')
14+
15+
16+
async def queue_get_batch(
17+
queue: 'Queue[TQueueItem]',
18+
max_results: int = None) -> Tuple[TQueueItem, ...]:
19+
"""
20+
Wait until at least one result is available, and return it and any
21+
other results that are immediately available, up to max_results.
22+
"""
23+
if max_results is not None and max_results < 1:
24+
raise ValidationError("Must request at least one item from a queue, not {max_results!r}")
25+
26+
# if the queue is empty, wait until at least one item is available
27+
if queue.empty():
28+
first_item = await queue.get()
29+
else:
30+
first_item = queue.get_nowait()
31+
32+
# In order to return from queue_get_batch() as soon as possible, never await again.
33+
# Instead, take only the items that are already available.
34+
if max_results is None:
35+
remaining_count = None
36+
else:
37+
remaining_count = max_results - 1
38+
remaining_items = queue_get_nowait(queue, remaining_count)
39+
40+
# Combine the first and remaining items
41+
return (first_item, ) + remaining_items
42+
43+
44+
def queue_get_nowait(queue: 'Queue[TQueueItem]', max_results: int = None) -> Tuple[TQueueItem, ...]:
45+
# How many results do we want?
46+
available = queue.qsize()
47+
if max_results is None:
48+
num_items = available
49+
else:
50+
num_items = min((available, max_results))
51+
52+
return tuple(queue.get_nowait() for _ in range(num_items))

0 commit comments

Comments
 (0)