Skip to content

Commit a460da4

Browse files
author
Anton
committed
refactor: better queue
1 parent e8b0c3b commit a460da4

File tree

2 files changed

+45
-63
lines changed

2 files changed

+45
-63
lines changed

taskiq/receiver/receiver.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from taskiq.result import TaskiqResult
1818
from taskiq.semaphore import PrioritySemaphore
1919
from taskiq.state import TaskiqState
20-
from taskiq.utils import DequeQueue, maybe_awaitable
20+
from taskiq.utils import PriorityQueue, maybe_awaitable
2121

2222
logger = getLogger(__name__)
2323
QUEUE_DONE = b"-1"
@@ -71,7 +71,7 @@ def __init__( # noqa: WPS211
7171
+ "can result in undefined behavior",
7272
)
7373
self.sem_prefetch = PrioritySemaphore(max_prefetch)
74-
self.queue: DequeQueue[bytes] = DequeQueue()
74+
self.queue: PriorityQueue[bytes] = PriorityQueue()
7575

7676
self.sem_sleeping: Optional[asyncio.Semaphore] = None
7777
if max_sleeping_tasks is not None and max_sleeping_tasks <= 0:
@@ -270,7 +270,7 @@ async def listen(self) -> None: # pragma: no cover
270270
gr.start_soon(self.prefetcher, self.queue)
271271
gr.start_soon(self.runner, self.queue)
272272

273-
async def prefetcher(self, queue: "asyncio.Queue[Any]") -> None:
273+
async def prefetcher(self, queue: "PriorityQueue[Any]") -> None:
274274
"""
275275
Prefetch tasks data.
276276
@@ -282,14 +282,14 @@ async def prefetcher(self, queue: "asyncio.Queue[Any]") -> None:
282282
try:
283283
await self.sem_prefetch.acquire()
284284
message = await iterator.__anext__() # noqa: WPS609
285-
await queue.put(message)
285+
await queue.put_last(message)
286286

287287
except StopAsyncIteration:
288288
break
289289

290-
await queue.put(QUEUE_DONE)
290+
await queue.put_last(QUEUE_DONE)
291291

292-
async def runner(self, queue: "asyncio.Queue[bytes]") -> None: # noqa: C901, WPS213
292+
async def runner(self, queue: "PriorityQueue[bytes]") -> None: # noqa: C901, WPS213
293293
"""
294294
Run tasks.
295295
@@ -317,7 +317,7 @@ def task_cb(task: "asyncio.Task[Any]") -> None:
317317
await self.sem.acquire()
318318

319319
self.sem_prefetch.release()
320-
message = await queue.get()
320+
_, _, message = await queue.get()
321321
if message is QUEUE_DONE:
322322
if self.sem is not None:
323323
self.sem.release()

taskiq/utils.py

Lines changed: 38 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
import inspect
3-
from typing import TYPE_CHECKING, Any, Coroutine, Deque, Generic, TypeVar, Union
3+
from typing import TYPE_CHECKING, Any, Coroutine, Generic, Tuple, TypeVar, Union
44

55
_T = TypeVar("_T") # noqa: WPS111
66

@@ -38,70 +38,52 @@ def remove_suffix(text: str, suffix: str) -> str:
3838
return text
3939

4040

41-
class DequeQueue(
42-
asyncio.Queue, # type: ignore
43-
Generic[_T],
44-
):
45-
"""Deque based Queue."""
46-
47-
if TYPE_CHECKING: # noqa: WPS604
48-
_loop: asyncio.BaseEventLoop
49-
_queue: Deque[_T]
50-
_putters: Deque[Any]
51-
_getters: Deque[Any]
52-
_unfinished_tasks: int
53-
_finished: asyncio.Event
54-
_wakeup_next: Any
41+
class PriorityQueue(asyncio.PriorityQueue, Generic[_T]): # type: ignore
42+
"""PriorityQueue based Queue."""
5543

5644
async def put_first(self, item: _T) -> None:
5745
"""
58-
Wait till queue is not full. Put item in Queue as soon as possible. LIFO style.
46+
Put item in Queue with highest priority.
5947
6048
:param item: value to prepend
61-
:raises BaseException: something goes wrong
62-
:returns: nothing
6349
"""
64-
while self.full():
65-
putter = self._loop.create_future()
66-
self._putters.appendleft(putter)
67-
try:
68-
await putter
69-
except BaseException: # noqa: WPS424
70-
putter.cancel() # Just in case putter is not done yet.
71-
try: # noqa: WPS505
72-
# Clean self._putters from canceled putters.
73-
self._putters.remove(putter)
74-
except ValueError:
75-
# The putter could be removed from self._putters by a
76-
# previous get_nowait call.
77-
pass # noqa: WPS420
78-
if not self.full() and not putter.cancelled():
79-
# We were woken up by get_nowait(), but can't take
80-
# the call. Wake up the next in line.
81-
self._wakeup_next(self._putters)
82-
raise
83-
84-
return self.put_first_nowait(item)
85-
86-
def put_first_nowait(self, item: _T) -> None:
50+
self.counter += 1
51+
await self.put((0, self.counter, item))
52+
53+
async def put_last(self, item: _T) -> None:
8754
"""
88-
Put item in Queue as soon as possible. LIFO style.
55+
Put item in Queue with lowest priority.
8956
90-
:param item: value to prepend
91-
:raises QueueFull: queue is full
57+
:param item: value to append
9258
"""
93-
if self.full():
94-
raise asyncio.QueueFull()
59+
self.counter += 1
60+
await self.put((1, self.counter, item))
9561

96-
self._put_first(item)
97-
self._unfinished_tasks += 1
98-
self._finished.clear()
99-
self._wakeup_next(self._getters)
62+
def _init(self, maxsize: int) -> None:
63+
super()._init(maxsize)
64+
self.counter = 0
10065

101-
def _put_first(self, item: _T) -> None:
102-
"""
103-
Prepend item.
10466

105-
:param item: value to prepend
106-
"""
107-
self._queue.appendleft(item)
67+
if TYPE_CHECKING: # pragma: no cover
68+
69+
class PriorityQueue( # type: ignore # noqa: F811
70+
asyncio.PriorityQueue[Tuple[int, int, _T]],
71+
Generic[_T],
72+
):
73+
"""PriorityQueue based Queue."""
74+
75+
async def put_first(self, item: _T) -> None:
76+
"""
77+
Put item in Queue with highest priority.
78+
79+
:param item: value to prepend
80+
"""
81+
...
82+
83+
async def put_last(self, item: _T) -> None:
84+
"""
85+
Put item in Queue with lowest priority.
86+
87+
:param item: value to append
88+
"""
89+
...

0 commit comments

Comments
 (0)