2626
2727from  .util  import  debug , info , Finalize , register_after_fork , is_exiting 
2828
29- _queue_alive  =  0 
30- _queue_shutdown  =  1 
31- _queue_shutdown_immediate  =  2 
32- 
3329# 
3430# Queue type using a pipe, buffer and thread 
3531# 
@@ -52,7 +48,7 @@ def __init__(self, maxsize=0, *, ctx):
5248        # For use by concurrent.futures 
5349        self ._ignore_epipe  =  False 
5450        self ._reset ()
55-         self ._shutdown_state  =  ctx .Value ('i ' , _queue_alive )
51+         self ._is_shutdown  =  ctx .Value ('B ' , False ,  lock = self . _rlock )
5652
5753        if  sys .platform  !=  'win32' :
5854            register_after_fork (self , Queue ._after_fork )
@@ -61,12 +57,12 @@ def __getstate__(self):
6157        context .assert_spawning (self )
6258        return  (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
6359                self ._rlock , self ._wlock , self ._sem , self ._opid ,
64-                 self ._shutdown_state )
60+                 self ._is_shutdown )
6561
6662    def  __setstate__ (self , state ):
6763        (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
6864         self ._rlock , self ._wlock , self ._sem , self ._opid ,
69-          self ._shutdown_state ) =  state 
65+          self ._is_shutdown ) =  state 
7066        self ._reset ()
7167
7268    def  _after_fork (self ):
@@ -88,32 +84,19 @@ def _reset(self, after_fork=False):
8884        self ._recv_bytes  =  self ._reader .recv_bytes 
8985        self ._poll  =  self ._reader .poll 
9086
91-     def  _is_alive (self ):
92-         return  self ._shutdown_state .value  ==  _queue_alive 
93- 
94-     def  _is_shutdown (self ):
95-         return  self ._shutdown_state .value  ==  _queue_shutdown 
96- 
97-     def  _is_shutdown_immediate (self ):
98-         return  self ._shutdown_state .value  ==  _queue_shutdown_immediate 
99- 
100-     def  _set_shutdown (self ):
101-         self ._shutdown_state .value  =  _queue_shutdown 
102- 
103-     def  _set_shutdown_immediate (self ):
104-         self ._shutdown_state .value  =  _queue_shutdown_immediate 
105- 
10687    def  put (self , obj , block = True , timeout = None ):
10788        if  self ._closed :
10889            raise  ValueError (f"Queue { self !r}  )
109-         if  not   self ._is_alive () :
90+         if  self ._is_shutdown . value :
11091            raise  ShutDown 
11192        if  not  self ._sem .acquire (block , timeout ):
112-             if  not   self ._is_alive () :
93+             if  self ._is_shutdown . value :
11394                raise  ShutDown 
11495            raise  Full 
11596
11697        with  self ._notempty :
98+             if  self ._is_shutdown .value :
99+                 raise  ShutDown 
117100            if  self ._thread  is  None :
118101                self ._start_thread ()
119102            self ._buffer .append (obj )
@@ -124,36 +107,29 @@ def get(self, block=True, timeout=None):
124107            raise  ValueError (f"Queue { self !r}  )
125108        if  block  and  timeout  is  None :
126109            with  self ._rlock :
127-                 # checks shutdown state 
128-                 if  (self ._is_shutdown_immediate ()
129-                     or  (self ._is_shutdown () and  self .empty ())):
110+                 if  self ._is_shutdown .value  and  self .empty ():
130111                    raise  ShutDown 
131112                res  =  self ._recv_bytes ()
132113            self ._sem .release ()
133114        else :
134115            if  block :
135116                deadline  =  time .monotonic () +  timeout 
136117            if  not  self ._rlock .acquire (block , timeout ):
137-                 if  (self ._is_shutdown_immediate ()
138-                     or  (self ._is_shutdown () and  self .empty ())):
118+                 if  self ._is_shutdown .value  and  self .empty ():
139119                    raise  ShutDown 
140120                raise  Empty 
141121            try :
142122                if  block :
143123                    timeout  =  deadline  -  time .monotonic ()
144124                    if  not  self ._poll (timeout ):
145-                         if  not   self ._is_alive () :
125+                         if  self ._is_shutdown . value :
146126                            raise  ShutDown 
147127                        raise  Empty 
148128                elif  not  self ._poll ():
149-                     if  not   self ._is_alive () :
129+                     if  self ._is_shutdown . value :
150130                        raise  ShutDown 
151131                    raise  Empty 
152132
153-                 # here queue is not empty 
154-                 if  self ._is_shutdown_immediate ():
155-                     raise  ShutDown 
156-                 # here shutdown state queue is alive or shutdown 
157133                res  =  self ._recv_bytes ()
158134                self ._sem .release ()
159135            finally :
@@ -178,18 +154,21 @@ def get_nowait(self):
178154    def  put_nowait (self , obj ):
179155        return  self .put (obj , False )
180156
157+     def  _clear (self ):
158+         with  self ._rlock :
159+             while  self ._poll ():
160+                 self ._recv_bytes ()
161+ 
181162    def  shutdown (self , immediate = False ):
182163        if  self ._closed :
183164            raise  ValueError (f"Queue { self !r}  )
184-         with  self ._shutdown_state .get_lock ():
185-             if  self ._is_shutdown_immediate ():
186-                 return 
165+         with  self ._is_shutdown .get_lock ():
166+             self ._is_shutdown .value  =  True 
187167            if  immediate :
188-                 self ._set_shutdown_immediate ()
168+                 self ._clear ()
189169                with  self ._notempty :
190170                    self ._notempty .notify_all ()
191-             else :
192-                 self ._set_shutdown ()
171+             self ._sem .release (self .qsize ())
193172
194173    def  close (self ):
195174        self ._closed  =  True 
@@ -384,14 +363,16 @@ def __setstate__(self, state):
384363    def  put (self , obj , block = True , timeout = None ):
385364        if  self ._closed :
386365            raise  ValueError (f"Queue { self !r}  )
387-         if  not   self ._is_alive () :
366+         if  self ._is_shutdown . value :
388367            raise  ShutDown 
389368        if  not  self ._sem .acquire (block , timeout ):
390-             if  not   self ._is_alive () :
369+             if  self ._is_shutdown . value :
391370                raise  ShutDown 
392371            raise  Full 
393372
394373        with  self ._notempty , self ._cond :
374+             if  self ._is_shutdown .value :
375+                 raise  ShutDown 
395376            if  self ._thread  is  None :
396377                self ._start_thread ()
397378            self ._buffer .append (obj )
@@ -400,27 +381,22 @@ def put(self, obj, block=True, timeout=None):
400381
401382    def  task_done (self ):
402383        with  self ._cond :
403-             if  self ._is_shutdown_immediate ():
404-                 raise  ShutDown 
405384            if  not  self ._unfinished_tasks .acquire (False ):
406385                raise  ValueError ('task_done() called too many times' )
407386            if  self ._unfinished_tasks ._semlock ._is_zero ():
408387                self ._cond .notify_all ()
409388
410389    def  join (self ):
411390        with  self ._cond :
412-             if  self ._is_shutdown_immediate ():
413-                 raise  ShutDown 
414391            if  not  self ._unfinished_tasks ._semlock ._is_zero ():
415392                self ._cond .wait ()
416-                 if  self ._is_shutdown_immediate ():
417-                     raise  ShutDown 
418393
419-     def  shutdown (self , immediate = False ):
420-         with  self ._cond :
421-             is_alive  =  self ._is_alive ()
422-             super ().shutdown (immediate )
423-             if  is_alive :
394+     def  _clear (self ):
395+         with  self ._rlock :
396+             while  self ._poll ():
397+                 self ._recv_bytes ()
398+                 self ._unfinished_tasks .acquire (block = False )
399+             with  self ._cond :
424400                self ._cond .notify_all ()
425401
426402# 
0 commit comments