1- __all__  =  ('Queue' , 'PriorityQueue' , 'LifoQueue' , 'QueueFull' , 'QueueEmpty' )
1+ __all__  =  (
2+     'Queue' ,
3+     'PriorityQueue' ,
4+     'LifoQueue' ,
5+     'QueueFull' ,
6+     'QueueEmpty' ,
7+     'QueueShutDown' ,
8+ )
29
310import  collections 
411import  heapq 
@@ -18,6 +25,11 @@ class QueueFull(Exception):
1825    pass 
1926
2027
28+ class  QueueShutDown (Exception ):
29+     """Raised when putting on to or getting from a shut-down Queue.""" 
30+     pass 
31+ 
32+ 
2133class  Queue (mixins ._LoopBoundMixin ):
2234    """A queue, useful for coordinating producer and consumer coroutines. 
2335
@@ -41,6 +53,7 @@ def __init__(self, maxsize=0):
4153        self ._finished  =  locks .Event ()
4254        self ._finished .set ()
4355        self ._init (maxsize )
56+         self ._is_shutdown  =  False 
4457
4558    # These three are overridable in subclasses. 
4659
@@ -81,6 +94,8 @@ def _format(self):
8194            result  +=  f' _putters[{ len (self ._putters )}  
8295        if  self ._unfinished_tasks :
8396            result  +=  f' tasks={ self ._unfinished_tasks }  
97+         if  self ._is_shutdown :
98+             result  +=  ' shutdown' 
8499        return  result 
85100
86101    def  qsize (self ):
@@ -112,8 +127,12 @@ async def put(self, item):
112127
113128        Put an item into the queue. If the queue is full, wait until a free 
114129        slot is available before adding item. 
130+ 
131+         Raises QueueShutDown if the queue has been shut down. 
115132        """ 
116133        while  self .full ():
134+             if  self ._is_shutdown :
135+                 raise  QueueShutDown 
117136            putter  =  self ._get_loop ().create_future ()
118137            self ._putters .append (putter )
119138            try :
@@ -125,7 +144,7 @@ async def put(self, item):
125144                    self ._putters .remove (putter )
126145                except  ValueError :
127146                    # The putter could be removed from self._putters by a 
128-                     # previous get_nowait call. 
147+                     # previous get_nowait call or a shutdown call . 
129148                    pass 
130149                if  not  self .full () and  not  putter .cancelled ():
131150                    # We were woken up by get_nowait(), but can't take 
@@ -138,7 +157,11 @@ def put_nowait(self, item):
138157        """Put an item into the queue without blocking. 
139158
140159        If no free slot is immediately available, raise QueueFull. 
160+ 
161+         Raises QueueShutDown if the queue has been shut down. 
141162        """ 
163+         if  self ._is_shutdown :
164+             raise  QueueShutDown 
142165        if  self .full ():
143166            raise  QueueFull 
144167        self ._put (item )
@@ -150,8 +173,13 @@ async def get(self):
150173        """Remove and return an item from the queue. 
151174
152175        If queue is empty, wait until an item is available. 
176+ 
177+         Raises QueueShutDown if the queue has been shut down and is empty, or 
178+         if the queue has been shut down immediately. 
153179        """ 
154180        while  self .empty ():
181+             if  self ._is_shutdown  and  self .empty ():
182+                 raise  QueueShutDown 
155183            getter  =  self ._get_loop ().create_future ()
156184            self ._getters .append (getter )
157185            try :
@@ -163,7 +191,7 @@ async def get(self):
163191                    self ._getters .remove (getter )
164192                except  ValueError :
165193                    # The getter could be removed from self._getters by a 
166-                     # previous put_nowait call. 
194+                     # previous put_nowait call, or a shutdown call . 
167195                    pass 
168196                if  not  self .empty () and  not  getter .cancelled ():
169197                    # We were woken up by put_nowait(), but can't take 
@@ -176,8 +204,13 @@ def get_nowait(self):
176204        """Remove and return an item from the queue. 
177205
178206        Return an item if one is immediately available, else raise QueueEmpty. 
207+ 
208+         Raises QueueShutDown if the queue has been shut down and is empty, or 
209+         if the queue has been shut down immediately. 
179210        """ 
180211        if  self .empty ():
212+             if  self ._is_shutdown :
213+                 raise  QueueShutDown 
181214            raise  QueueEmpty 
182215        item  =  self ._get ()
183216        self ._wakeup_next (self ._putters )
@@ -194,6 +227,9 @@ def task_done(self):
194227        been processed (meaning that a task_done() call was received for every 
195228        item that had been put() into the queue). 
196229
230+         shutdown(immediate=True) calls task_done() for each remaining item in 
231+         the queue. 
232+ 
197233        Raises ValueError if called more times than there were items placed in 
198234        the queue. 
199235        """ 
@@ -214,6 +250,32 @@ async def join(self):
214250        if  self ._unfinished_tasks  >  0 :
215251            await  self ._finished .wait ()
216252
253+     def  shutdown (self , immediate = False ):
254+         """Shut-down the queue, making queue gets and puts raise QueueShutDown. 
255+ 
256+         By default, gets will only raise once the queue is empty. Set 
257+         'immediate' to True to make gets raise immediately instead. 
258+ 
259+         All blocked callers of put() will be unblocked, and also get() 
260+         and join() if 'immediate'. 
261+         """ 
262+         self ._is_shutdown  =  True 
263+         if  immediate :
264+             while  not  self .empty ():
265+                 self ._get ()
266+                 if  self ._unfinished_tasks  >  0 :
267+                     self ._unfinished_tasks  -=  1 
268+             if  self ._unfinished_tasks  ==  0 :
269+                 self ._finished .set ()
270+         while  self ._getters :
271+             getter  =  self ._getters .popleft ()
272+             if  not  getter .done ():
273+                 getter .set_result (None )
274+         while  self ._putters :
275+             putter  =  self ._putters .popleft ()
276+             if  not  putter .done ():
277+                 putter .set_result (None )
278+ 
217279
218280class  PriorityQueue (Queue ):
219281    """A subclass of Queue; retrieves entries in priority order (lowest first). 
0 commit comments