@@ -25,6 +25,10 @@ class Full(Exception):
2525 pass
2626
2727
28+ class ShutDown (Exception ):
29+ '''Raised when put/get with shut-down queue.'''
30+
31+
2832class Queue :
2933 '''Create a queue object with a given maximum size.
3034
@@ -54,6 +58,9 @@ def __init__(self, maxsize=0):
5458 self .all_tasks_done = threading .Condition (self .mutex )
5559 self .unfinished_tasks = 0
5660
61+ # Queue shutdown state
62+ self .is_shutdown = False
63+
5764 def task_done (self ):
5865 '''Indicate that a formerly enqueued task is complete.
5966
@@ -67,6 +74,8 @@ def task_done(self):
6774
6875 Raises a ValueError if called more times than there were items
6976 placed in the queue.
77+
78+ Raises ShutDown if the queue has been shut down immediately.
7079 '''
7180 with self .all_tasks_done :
7281 unfinished = self .unfinished_tasks - 1
@@ -84,6 +93,8 @@ def join(self):
8493 to indicate the item was retrieved and all work on it is complete.
8594
8695 When the count of unfinished tasks drops to zero, join() unblocks.
96+
97+ Raises ShutDown if the queue has been shut down immediately.
8798 '''
8899 with self .all_tasks_done :
89100 while self .unfinished_tasks :
@@ -129,15 +140,21 @@ def put(self, item, block=True, timeout=None):
129140 Otherwise ('block' is false), put an item on the queue if a free slot
130141 is immediately available, else raise the Full exception ('timeout'
131142 is ignored in that case).
143+
144+ Raises ShutDown if the queue has been shut down.
132145 '''
133146 with self .not_full :
147+ if self .is_shutdown :
148+ raise ShutDown
134149 if self .maxsize > 0 :
135150 if not block :
136151 if self ._qsize () >= self .maxsize :
137152 raise Full
138153 elif timeout is None :
139154 while self ._qsize () >= self .maxsize :
140155 self .not_full .wait ()
156+ if self .is_shutdown :
157+ raise ShutDown
141158 elif timeout < 0 :
142159 raise ValueError ("'timeout' must be a non-negative number" )
143160 else :
@@ -147,6 +164,8 @@ def put(self, item, block=True, timeout=None):
147164 if remaining <= 0.0 :
148165 raise Full
149166 self .not_full .wait (remaining )
167+ if self .is_shutdown :
168+ raise ShutDown
150169 self ._put (item )
151170 self .unfinished_tasks += 1
152171 self .not_empty .notify ()
@@ -161,14 +180,21 @@ def get(self, block=True, timeout=None):
161180 Otherwise ('block' is false), return an item if one is immediately
162181 available, else raise the Empty exception ('timeout' is ignored
163182 in that case).
183+
184+ Raises ShutDown if the queue has been shut down and is empty,
185+ or if the queue has been shut down immediately.
164186 '''
165187 with self .not_empty :
188+ if self .is_shutdown and not self ._qsize ():
189+ raise ShutDown
166190 if not block :
167191 if not self ._qsize ():
168192 raise Empty
169193 elif timeout is None :
170194 while not self ._qsize ():
171195 self .not_empty .wait ()
196+ if self .is_shutdown and not self ._qsize ():
197+ raise ShutDown
172198 elif timeout < 0 :
173199 raise ValueError ("'timeout' must be a non-negative number" )
174200 else :
@@ -178,6 +204,8 @@ def get(self, block=True, timeout=None):
178204 if remaining <= 0.0 :
179205 raise Empty
180206 self .not_empty .wait (remaining )
207+ if self .is_shutdown and not self ._qsize ():
208+ raise ShutDown
181209 item = self ._get ()
182210 self .not_full .notify ()
183211 return item
@@ -198,6 +226,28 @@ def get_nowait(self):
198226 '''
199227 return self .get (block = False )
200228
229+ def shutdown (self , immediate = False ):
230+ '''Shut-down the queue, making queue gets and puts raise.
231+
232+ By default, gets will only raise once the queue is empty. Set
233+ 'immediate' to True to make gets raise immediately instead.
234+
235+ All blocked callers of put() will be unblocked, and also get()
236+ and join() if 'immediate'. The ShutDown exception is raised.
237+ '''
238+ with self .mutex :
239+ self .is_shutdown = True
240+ if immediate :
241+ n_items = self ._qsize ()
242+ while self ._qsize ():
243+ self ._get ()
244+ if self .unfinished_tasks > 0 :
245+ self .unfinished_tasks -= 1
246+ self .not_empty .notify_all ()
247+ # release all blocked threads in `join()`
248+ self .all_tasks_done .notify_all ()
249+ self .not_full .notify_all ()
250+
201251 # Override these methods to implement other queue organizations
202252 # (e.g. stack or priority queue).
203253 # These will only be called with appropriate locks held
0 commit comments