88)
99
1010import collections
11+ import enum
1112import heapq
1213from types import GenericAlias
1314
@@ -30,9 +31,10 @@ class QueueShutDown(Exception):
3031 pass
3132
3233
33- _queue_alive = "alive"
34- _queue_shutdown = "shutdown"
35- _queue_shutdown_immediate = "shutdown-immediate"
34+ class _QueueState (enum .Enum ):
35+ ALIVE = "alive"
36+ SHUTDOWN = "shutdown"
37+ SHUTDOWN_IMMEDIATE = "shutdown-immediate"
3638
3739
3840class Queue (mixins ._LoopBoundMixin ):
@@ -58,7 +60,7 @@ def __init__(self, maxsize=0):
5860 self ._finished = locks .Event ()
5961 self ._finished .set ()
6062 self ._init (maxsize )
61- self .shutdown_state = _queue_alive
63+ self ._shutdown_state = _QueueState . ALIVE
6264
6365 # These three are overridable in subclasses.
6466
@@ -99,6 +101,8 @@ def _format(self):
99101 result += f' _putters[{ len (self ._putters )} ]'
100102 if self ._unfinished_tasks :
101103 result += f' tasks={ self ._unfinished_tasks } '
104+ if not self ._is_alive ():
105+ result += f' state={ self ._shutdown_state .value } '
102106 return result
103107
104108 def qsize (self ):
@@ -130,8 +134,10 @@ async def put(self, item):
130134
131135 Put an item into the queue. If the queue is full, wait until a free
132136 slot is available before adding item.
137+
138+ Raises QueueShutDown if the queue has been shut down.
133139 """
134- if self .shutdown_state != _queue_alive :
140+ if not self ._is_alive () :
135141 raise QueueShutDown
136142 while self .full ():
137143 putter = self ._get_loop ().create_future ()
@@ -145,23 +151,25 @@ async def put(self, item):
145151 self ._putters .remove (putter )
146152 except ValueError :
147153 # The putter could be removed from self._putters by a
148- # previous get_nowait call.
154+ # previous get_nowait call or a shutdown call .
149155 pass
150156 if not self .full () and not putter .cancelled ():
151157 # We were woken up by get_nowait(), but can't take
152158 # the call. Wake up the next in line.
153159 self ._wakeup_next (self ._putters )
154160 raise
155- if self .shutdown_state != _queue_alive :
161+ if not self ._is_alive () :
156162 raise QueueShutDown
157163 return self .put_nowait (item )
158164
159165 def put_nowait (self , item ):
160166 """Put an item into the queue without blocking.
161167
162168 If no free slot is immediately available, raise QueueFull.
169+
170+ Raises QueueShutDown if the queue has been shut down.
163171 """
164- if self .shutdown_state != _queue_alive :
172+ if not self ._is_alive () :
165173 raise QueueShutDown
166174 if self .full ():
167175 raise QueueFull
@@ -174,11 +182,14 @@ async def get(self):
174182 """Remove and return an item from the queue.
175183
176184 If queue is empty, wait until an item is available.
185+
186+ Raises QueueShutDown if the queue has been shut down and is empty, or
187+ if the queue has been shut down immediately.
177188 """
178- if self .shutdown_state == _queue_shutdown_immediate :
189+ if self ._is_shutdown_immediate () :
179190 raise QueueShutDown
180191 while self .empty ():
181- if self .shutdown_state != _queue_alive :
192+ if self ._is_shutdown () :
182193 raise QueueShutDown
183194 getter = self ._get_loop ().create_future ()
184195 self ._getters .append (getter )
@@ -191,28 +202,32 @@ async def get(self):
191202 self ._getters .remove (getter )
192203 except ValueError :
193204 # The getter could be removed from self._getters by a
194- # previous put_nowait call.
205+ # previous put_nowait call,
206+ # or a shutdown call.
195207 pass
196208 if not self .empty () and not getter .cancelled ():
197209 # We were woken up by put_nowait(), but can't take
198210 # the call. Wake up the next in line.
199211 self ._wakeup_next (self ._getters )
200212 raise
201- if self .shutdown_state == _queue_shutdown_immediate :
213+ if self ._is_shutdown_immediate () :
202214 raise QueueShutDown
203215 return self .get_nowait ()
204216
205217 def get_nowait (self ):
206218 """Remove and return an item from the queue.
207219
208220 Return an item if one is immediately available, else raise QueueEmpty.
221+
222+ Raises QueueShutDown if the queue has been shut down and is empty, or
223+ if the queue has been shut down immediately.
209224 """
225+ if self ._is_shutdown_immediate ():
226+ raise QueueShutDown
210227 if self .empty ():
211- if self .shutdown_state != _queue_alive :
228+ if self ._is_shutdown () :
212229 raise QueueShutDown
213230 raise QueueEmpty
214- elif self .shutdown_state == _queue_shutdown_immediate :
215- raise QueueShutDown
216231 item = self ._get ()
217232 self ._wakeup_next (self ._putters )
218233 return item
@@ -230,7 +245,11 @@ def task_done(self):
230245
231246 Raises ValueError if called more times than there were items placed in
232247 the queue.
248+
249+ Raises QueueShutDown if the queue has been shut down immediately.
233250 """
251+ if self ._is_shutdown_immediate ():
252+ raise QueueShutDown
234253 if self ._unfinished_tasks <= 0 :
235254 raise ValueError ('task_done() called too many times' )
236255 self ._unfinished_tasks -= 1
@@ -244,9 +263,15 @@ async def join(self):
244263 queue. The count goes down whenever a consumer calls task_done() to
245264 indicate that the item was retrieved and all work on it is complete.
246265 When the count of unfinished tasks drops to zero, join() unblocks.
266+
267+ Raises QueueShutDown if the queue has been shut down immediately.
247268 """
269+ if self ._is_shutdown_immediate ():
270+ raise QueueShutDown
248271 if self ._unfinished_tasks > 0 :
249272 await self ._finished .wait ()
273+ if self ._is_shutdown_immediate ():
274+ raise QueueShutDown
250275
251276 def shutdown (self , immediate = False ):
252277 """Shut-down the queue, making queue gets and puts raise.
@@ -257,19 +282,40 @@ def shutdown(self, immediate=False):
257282 All blocked callers of put() will be unblocked, and also get()
258283 and join() if 'immediate'. The QueueShutDown exception is raised.
259284 """
285+ if self ._is_shutdown_immediate ():
286+ return
287+ # here _shutdown_state is ALIVE or SHUTDOWN
260288 if immediate :
261- self .shutdown_state = _queue_shutdown_immediate
289+ self ._set_shutdown_immediate ()
262290 while self ._getters :
263291 getter = self ._getters .popleft ()
264292 if not getter .done ():
265293 getter .set_result (None )
294+ # Release all 'blocked' tasks/coros in `join()`
295+ self ._finished .set ()
266296 else :
267- self .shutdown_state = _queue_shutdown
297+ self ._set_shutdown ()
268298 while self ._putters :
269299 putter = self ._putters .popleft ()
270300 if not putter .done ():
271301 putter .set_result (None )
272302
303+ def _is_alive (self ):
304+ return self ._shutdown_state is _QueueState .ALIVE
305+
306+ def _is_shutdown (self ):
307+ return self ._shutdown_state is _QueueState .SHUTDOWN
308+
309+ def _is_shutdown_immediate (self ):
310+ return self ._shutdown_state is _QueueState .SHUTDOWN_IMMEDIATE
311+
312+ def _set_shutdown (self ):
313+ self ._shutdown_state = _QueueState .SHUTDOWN
314+
315+ def _set_shutdown_immediate (self ):
316+ self ._shutdown_state = _QueueState .SHUTDOWN_IMMEDIATE
317+
318+
273319class PriorityQueue (Queue ):
274320 """A subclass of Queue; retrieves entries in priority order (lowest first).
275321
0 commit comments