1- __all__ = ('Queue' , 'PriorityQueue' , 'LifoQueue' , 'QueueFull' , 'QueueEmpty' )
1+ __all__ = (
2+ 'Queue' ,
3+ 'PriorityQueue' ,
4+ 'LifoQueue' ,
5+ 'QueueFull' ,
6+ 'QueueEmpty' ,
7+ 'QueueShutDown' ,
8+ )
29
310import collections
411import heapq
@@ -18,6 +25,11 @@ class QueueFull(Exception):
1825 pass
1926
2027
28+ class QueueShutDown (Exception ):
29+ """Raised when putting on to or getting from a shut-down Queue."""
30+ pass
31+
32+
2133class Queue (mixins ._LoopBoundMixin ):
2234 """A queue, useful for coordinating producer and consumer coroutines.
2335
@@ -41,6 +53,7 @@ def __init__(self, maxsize=0):
4153 self ._finished = locks .Event ()
4254 self ._finished .set ()
4355 self ._init (maxsize )
56+ self ._is_shutdown = False
4457
4558 # These three are overridable in subclasses.
4659
@@ -81,6 +94,8 @@ def _format(self):
8194 result += f' _putters[{ len (self ._putters )} ]'
8295 if self ._unfinished_tasks :
8396 result += f' tasks={ self ._unfinished_tasks } '
97+ if self ._is_shutdown :
98+ result += ' shutdown'
8499 return result
85100
86101 def qsize (self ):
@@ -112,8 +127,12 @@ async def put(self, item):
112127
113128 Put an item into the queue. If the queue is full, wait until a free
114129 slot is available before adding item.
130+
131+ Raises QueueShutDown if the queue has been shut down.
115132 """
116133 while self .full ():
134+ if self ._is_shutdown :
135+ raise QueueShutDown
117136 putter = self ._get_loop ().create_future ()
118137 self ._putters .append (putter )
119138 try :
@@ -125,7 +144,7 @@ async def put(self, item):
125144 self ._putters .remove (putter )
126145 except ValueError :
127146 # The putter could be removed from self._putters by a
128- # previous get_nowait call.
147+ # previous get_nowait call or a shutdown call .
129148 pass
130149 if not self .full () and not putter .cancelled ():
131150 # We were woken up by get_nowait(), but can't take
@@ -138,7 +157,11 @@ def put_nowait(self, item):
138157 """Put an item into the queue without blocking.
139158
140159 If no free slot is immediately available, raise QueueFull.
160+
161+ Raises QueueShutDown if the queue has been shut down.
141162 """
163+ if self ._is_shutdown :
164+ raise QueueShutDown
142165 if self .full ():
143166 raise QueueFull
144167 self ._put (item )
@@ -150,8 +173,13 @@ async def get(self):
150173 """Remove and return an item from the queue.
151174
152175 If queue is empty, wait until an item is available.
176+
177+ Raises QueueShutDown if the queue has been shut down and is empty, or
178+ if the queue has been shut down immediately.
153179 """
154180 while self .empty ():
181+ if self ._is_shutdown and self .empty ():
182+ raise QueueShutDown
155183 getter = self ._get_loop ().create_future ()
156184 self ._getters .append (getter )
157185 try :
@@ -163,7 +191,7 @@ async def get(self):
163191 self ._getters .remove (getter )
164192 except ValueError :
165193 # The getter could be removed from self._getters by a
166- # previous put_nowait call.
194+ # previous put_nowait call, or a shutdown call .
167195 pass
168196 if not self .empty () and not getter .cancelled ():
169197 # We were woken up by put_nowait(), but can't take
@@ -176,8 +204,13 @@ def get_nowait(self):
176204 """Remove and return an item from the queue.
177205
178206 Return an item if one is immediately available, else raise QueueEmpty.
207+
208+ Raises QueueShutDown if the queue has been shut down and is empty, or
209+ if the queue has been shut down immediately.
179210 """
180211 if self .empty ():
212+ if self ._is_shutdown :
213+ raise QueueShutDown
181214 raise QueueEmpty
182215 item = self ._get ()
183216 self ._wakeup_next (self ._putters )
@@ -194,6 +227,9 @@ def task_done(self):
194227 been processed (meaning that a task_done() call was received for every
195228 item that had been put() into the queue).
196229
230+ shutdown(immediate=True) calls task_done() for each remaining item in
231+ the queue.
232+
197233 Raises ValueError if called more times than there were items placed in
198234 the queue.
199235 """
@@ -214,6 +250,32 @@ async def join(self):
214250 if self ._unfinished_tasks > 0 :
215251 await self ._finished .wait ()
216252
253+ def shutdown (self , immediate = False ):
254+ """Shut-down the queue, making queue gets and puts raise QueueShutDown.
255+
256+ By default, gets will only raise once the queue is empty. Set
257+ 'immediate' to True to make gets raise immediately instead.
258+
259+ All blocked callers of put() will be unblocked, and also get()
260+ and join() if 'immediate'.
261+ """
262+ self ._is_shutdown = True
263+ if immediate :
264+ while not self .empty ():
265+ self ._get ()
266+ if self ._unfinished_tasks > 0 :
267+ self ._unfinished_tasks -= 1
268+ if self ._unfinished_tasks == 0 :
269+ self ._finished .set ()
270+ while self ._getters :
271+ getter = self ._getters .popleft ()
272+ if not getter .done ():
273+ getter .set_result (None )
274+ while self ._putters :
275+ putter = self ._putters .popleft ()
276+ if not putter .done ():
277+ putter .set_result (None )
278+
217279
218280class PriorityQueue (Queue ):
219281 """A subclass of Queue; retrieves entries in priority order (lowest first).
0 commit comments