-
-
Notifications
You must be signed in to change notification settings - Fork 33.3k
gh-96471: Add asyncio queue shutdown #104228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
440a702
fb458db
e5951ac
a72aedd
d5e925d
f3517fb
bd2a7c3
e9ac8de
1e7813a
1275bb6
eec29bb
2c6156f
17f1f32
a233830
420a247
25ad2ac
f3321b4
6d9edd6
1135d85
ddc6ad6
2fa1bd9
aef4063
d49c6dd
5a435a6
c8db40e
ca01ee1
b02c4dd
8deca77
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,9 @@ Queue | |
| Remove and return an item from the queue. If queue is empty, | ||
| wait until an item is available. | ||
|
|
||
| Raises :exc:`QueueShutDown` if the queue has been shut down and | ||
| is empty, or if the queue has been shut down immediately. | ||
|
|
||
| .. method:: get_nowait() | ||
|
|
||
| Return an item if one is immediately available, else raise | ||
|
|
@@ -77,11 +80,16 @@ Queue | |
| work on it is complete. When the count of unfinished tasks drops | ||
| to zero, :meth:`join` unblocks. | ||
|
|
||
| Raises :exc:`QueueShutDown` if the queue has been shut down | ||
| immediately. | ||
|
|
||
| .. coroutinemethod:: put(item) | ||
|
|
||
| Put an item into the queue. If the queue is full, wait until a | ||
| free slot is available before adding the item. | ||
|
|
||
| Raises :exc:`QueueShutDown` if the queue has been shut down. | ||
|
|
||
| .. method:: put_nowait(item) | ||
|
|
||
| Put an item into the queue without blocking. | ||
|
|
@@ -92,6 +100,21 @@ Queue | |
|
|
||
| Return the number of items in the queue. | ||
|
|
||
| .. method:: shutdown(immediate=False) | ||
|
|
||
| Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` | ||
| raise :exc:`QueueShutDown`. | ||
|
|
||
| By default, :meth:`~Queue.get` on a shut down queue will only raise once | ||
| the queue is empty. Set *immediate* to true to make gets raise | ||
| immediately instead. | ||
|
|
||
| All blocked callers of :meth:`~Queue.put` will be unblocked. If | ||
| *immediate* is true, also unblock callers of :meth:`~Queue.get` and | ||
| :meth:`~Queue.join`. | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry but I have a doubt, shouldn't this documentation block be rather: In event of change, the docstring of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Also, I think the threading queue docs are the same. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It's very precise, better.
Yes, I commented here so as not to forget (see #117532 (comment)). English is your native language, I think it'is best if you update documentations and docstrings. Update: but I can create the follow-up PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've made a PR: #117621 |
||
| .. versionadded:: 3.13 | ||
|
|
||
| .. method:: task_done() | ||
|
|
||
| Indicate that a formerly enqueued task is complete. | ||
|
|
@@ -108,6 +131,9 @@ Queue | |
| Raises :exc:`ValueError` if called more times than there were | ||
| items placed in the queue. | ||
|
|
||
| Raises :exc:`QueueShutDown` if the queue has been shut down | ||
| immediately. | ||
|
|
||
|
|
||
| Priority Queue | ||
| ============== | ||
|
|
@@ -145,6 +171,14 @@ Exceptions | |
| on a queue that has reached its *maxsize*. | ||
|
|
||
|
|
||
| .. exception:: QueueShutDown | ||
|
|
||
| Exception raised when getting an item from or putting an item onto a | ||
| queue which has been shut down. | ||
|
|
||
| .. versionadded:: 3.13 | ||
|
|
||
|
|
||
| Examples | ||
| ======== | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,14 @@ | ||
| __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') | ||
| __all__ = ( | ||
| 'Queue', | ||
| 'PriorityQueue', | ||
| 'LifoQueue', | ||
| 'QueueFull', | ||
| 'QueueEmpty', | ||
| 'QueueShutDown', | ||
| ) | ||
|
|
||
| import collections | ||
| import enum | ||
| import heapq | ||
| from types import GenericAlias | ||
|
|
||
|
|
@@ -18,6 +26,17 @@ class QueueFull(Exception): | |
| pass | ||
|
|
||
|
|
||
| class QueueShutDown(Exception): | ||
| """Raised when putting on to or getting from a shut-down Queue.""" | ||
| pass | ||
|
|
||
|
|
||
| class _QueueState(enum.Enum): | ||
| ALIVE = "alive" | ||
| SHUTDOWN = "shutdown" | ||
| SHUTDOWN_IMMEDIATE = "shutdown-immediate" | ||
|
|
||
|
|
||
| class Queue(mixins._LoopBoundMixin): | ||
| """A queue, useful for coordinating producer and consumer coroutines. | ||
|
|
||
|
|
@@ -41,6 +60,7 @@ def __init__(self, maxsize=0): | |
| self._finished = locks.Event() | ||
| self._finished.set() | ||
| self._init(maxsize) | ||
| self._shutdown_state = _QueueState.ALIVE | ||
|
|
||
| # These three are overridable in subclasses. | ||
|
|
||
|
|
@@ -81,6 +101,8 @@ def _format(self): | |
| result += f' _putters[{len(self._putters)}]' | ||
| if self._unfinished_tasks: | ||
| result += f' tasks={self._unfinished_tasks}' | ||
| if not self._is_alive(): | ||
| result += f' state={self._shutdown_state.value}' | ||
| return result | ||
|
|
||
| def qsize(self): | ||
|
|
@@ -112,7 +134,11 @@ async def put(self, item): | |
|
|
||
| Put an item into the queue. If the queue is full, wait until a free | ||
| slot is available before adding item. | ||
|
|
||
| Raises QueueShutDown if the queue has been shut down. | ||
| """ | ||
| if not self._is_alive(): | ||
| raise QueueShutDown | ||
| while self.full(): | ||
| putter = self._get_loop().create_future() | ||
| self._putters.append(putter) | ||
|
|
@@ -125,20 +151,26 @@ async def put(self, item): | |
| self._putters.remove(putter) | ||
| except ValueError: | ||
| # The putter could be removed from self._putters by a | ||
| # previous get_nowait call. | ||
| # previous get_nowait call or a shutdown call. | ||
| pass | ||
| if not self.full() and not putter.cancelled(): | ||
| # We were woken up by get_nowait(), but can't take | ||
| # the call. Wake up the next in line. | ||
| self._wakeup_next(self._putters) | ||
| raise | ||
| if not self._is_alive(): | ||
| raise QueueShutDown | ||
| return self.put_nowait(item) | ||
|
|
||
| def put_nowait(self, item): | ||
| """Put an item into the queue without blocking. | ||
|
|
||
| If no free slot is immediately available, raise QueueFull. | ||
|
|
||
| Raises QueueShutDown if the queue has been shut down. | ||
| """ | ||
| if not self._is_alive(): | ||
| raise QueueShutDown | ||
| if self.full(): | ||
| raise QueueFull | ||
| self._put(item) | ||
|
|
@@ -150,8 +182,15 @@ async def get(self): | |
| """Remove and return an item from the queue. | ||
|
|
||
| If queue is empty, wait until an item is available. | ||
|
|
||
| Raises QueueShutDown if the queue has been shut down and is empty, or | ||
| if the queue has been shut down immediately. | ||
| """ | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| while self.empty(): | ||
| if self._is_shutdown(): | ||
| raise QueueShutDown | ||
| getter = self._get_loop().create_future() | ||
| self._getters.append(getter) | ||
| try: | ||
|
|
@@ -163,21 +202,31 @@ async def get(self): | |
| self._getters.remove(getter) | ||
| except ValueError: | ||
| # The getter could be removed from self._getters by a | ||
| # previous put_nowait call. | ||
| # previous put_nowait call, | ||
| # or a shutdown call. | ||
| pass | ||
| if not self.empty() and not getter.cancelled(): | ||
| # We were woken up by put_nowait(), but can't take | ||
| # the call. Wake up the next in line. | ||
| self._wakeup_next(self._getters) | ||
| raise | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| return self.get_nowait() | ||
|
|
||
| def get_nowait(self): | ||
| """Remove and return an item from the queue. | ||
|
|
||
| Return an item if one is immediately available, else raise QueueEmpty. | ||
|
|
||
| Raises QueueShutDown if the queue has been shut down and is empty, or | ||
| if the queue has been shut down immediately. | ||
| """ | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| if self.empty(): | ||
EpicWink marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if self._is_shutdown(): | ||
| raise QueueShutDown | ||
| raise QueueEmpty | ||
| item = self._get() | ||
| self._wakeup_next(self._putters) | ||
|
|
@@ -196,7 +245,11 @@ def task_done(self): | |
|
|
||
| Raises ValueError if called more times than there were items placed in | ||
| the queue. | ||
|
|
||
| Raises QueueShutDown if the queue has been shut down immediately. | ||
| """ | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| if self._unfinished_tasks <= 0: | ||
| raise ValueError('task_done() called too many times') | ||
| self._unfinished_tasks -= 1 | ||
|
|
@@ -210,9 +263,57 @@ async def join(self): | |
| queue. The count goes down whenever a consumer calls task_done() to | ||
| indicate that the item was retrieved and all work on it is complete. | ||
| When the count of unfinished tasks drops to zero, join() unblocks. | ||
|
|
||
| Raises QueueShutDown if the queue has been shut down immediately. | ||
| """ | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
| if self._unfinished_tasks > 0: | ||
| await self._finished.wait() | ||
| if self._is_shutdown_immediate(): | ||
| raise QueueShutDown | ||
|
|
||
| def shutdown(self, immediate=False): | ||
| """Shut-down the queue, making queue gets and puts raise. | ||
|
|
||
| By default, gets will only raise once the queue is empty. Set | ||
| 'immediate' to True to make gets raise immediately instead. | ||
|
|
||
| All blocked callers of put() will be unblocked, and also get() | ||
| and join() if 'immediate'. The QueueShutDown exception is raised. | ||
| """ | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docstring to modify depending of agree/disagree on my first remark about `blocked callers'. |
||
| if self._is_shutdown_immediate(): | ||
| return | ||
| # here _shutdown_state is ALIVE or SHUTDOWN | ||
| if immediate: | ||
| self._set_shutdown_immediate() | ||
| while self._getters: | ||
| getter = self._getters.popleft() | ||
| if not getter.done(): | ||
| getter.set_result(None) | ||
| # Release all 'blocked' tasks/coros in `join()` | ||
| self._finished.set() | ||
| else: | ||
| self._set_shutdown() | ||
| while self._putters: | ||
| putter = self._putters.popleft() | ||
| if not putter.done(): | ||
| putter.set_result(None) | ||
|
|
||
| def _is_alive(self): | ||
| return self._shutdown_state is _QueueState.ALIVE | ||
|
|
||
| def _is_shutdown(self): | ||
| return self._shutdown_state is _QueueState.SHUTDOWN | ||
|
|
||
| def _is_shutdown_immediate(self): | ||
| return self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE | ||
|
|
||
| def _set_shutdown(self): | ||
| self._shutdown_state = _QueueState.SHUTDOWN | ||
|
|
||
| def _set_shutdown_immediate(self): | ||
| self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE | ||
|
|
||
|
|
||
| class PriorityQueue(Queue): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.