@@ -317,97 +317,107 @@ def test_shutdown_all_methods_in_one_thread(self):
317317 def test_shutdown_immediate_all_methods_in_one_thread (self ):
318318 return self ._shutdown_all_methods_in_one_thread (True )
319319
320- def _write_msg_thread (self , q , n , results , delay ,
321- i_when_exec_shutdown ,
322- event_start , event_end ):
323- event_start .wait ()
324- for i in range (1 , n + 1 ):
320+ def _write_msg_thread (self , q , n , results ,
321+ i_when_exec_shutdown , event_shutdown ,
322+ barrier_start ):
323+ # All `write_msg_threads`
324+ # put several items into the queue.
325+ for i in range (0 , i_when_exec_shutdown // 2 ):
326+ q .put ((i , 'LOYD' ))
327+ # Wait for the barrier to be complete.
328+ barrier_start .wait ()
329+
330+ for i in range (i_when_exec_shutdown // 2 , n ):
325331 try :
326332 q .put ((i , "YDLO" ))
327- results .append (True )
328333 except self .queue .ShutDown :
329334 results .append (False )
330- # triggers shutdown of queue
331- if i == i_when_exec_shutdown :
332- event_end .set ()
333- time .sleep (delay )
334- # end of all puts
335- q .join ()
335+ break
336336
337- def _read_msg_thread (self , q , nb , results , delay , event_start ):
338- event_start .wait ()
339- block = True
340- while nb :
341- time .sleep (delay )
337+ # Trigger queue shutdown.
338+ if i == i_when_exec_shutdown :
339+ # Only one thread should call shutdown().
340+ if not event_shutdown .is_set ():
341+ event_shutdown .set ()
342+ results .append (True )
343+
344+ def _read_msg_thread (self , q , results , barrier_start ):
345+ # Get at least one item.
346+ q .get (True )
347+ q .task_done ()
348+ # Wait for the barrier to be complete.
349+ barrier_start .wait ()
350+ while True :
342351 try :
343- # Get at least one message
344- q .get (block )
345- block = False
352+ q .get (False )
346353 q .task_done ()
347- results .append (True )
348- nb -= 1
349354 except self .queue .ShutDown :
350- results .append (False )
351- nb -= 1
355+ results .append (True )
356+ break
352357 except self .queue .Empty :
353358 pass
354- q .join ()
355359
356- def _shutdown_thread (self , q , event_end , immediate ):
360+ def _shutdown_thread (self , q , results , event_end , immediate ):
357361 event_end .wait ()
358362 q .shutdown (immediate )
359- q . join ( )
363+ results . append ( q . qsize () == 0 )
360364
361- def _join_thread (self , q , delay , event_start ):
362- event_start . wait ()
363- time . sleep ( delay )
365+ def _join_thread (self , q , barrier_start ):
366+ # Wait for the barrier to be complete.
367+ barrier_start . wait ( )
364368 q .join ()
365369
366370 def _shutdown_all_methods_in_many_threads (self , immediate ):
371+ # Run a 'multi-producers/consumers queue' use case,
372+ # with enough items into the queue.
373+ # When shutdown, all running threads will be joined.
367374 q = self .type2test ()
368375 ps = []
369- ev_start = threading .Event ()
370- ev_exec_shutdown = threading .Event ()
371376 res_puts = []
372377 res_gets = []
373- delay = 1e-4
374- read_process = 4
375- nb_msgs = read_process * 16
376- nb_msgs_r = nb_msgs // read_process
377- when_exec_shutdown = nb_msgs // 2
378- lprocs = (
379- (self ._write_msg_thread , 1 , (q , nb_msgs , res_puts , delay ,
380- when_exec_shutdown ,
381- ev_start , ev_exec_shutdown )),
382- (self ._read_msg_thread , read_process , (q , nb_msgs_r ,
383- res_gets , delay * 2 ,
384- ev_start )),
385- (self ._join_thread , 2 , (q , delay * 2 , ev_start )),
386- (self ._shutdown_thread , 1 , (q , ev_exec_shutdown , immediate )),
387- )
388- # start all threds
378+ res_shutdown = []
379+ write_threads = 4
380+ read_threads = 6
381+ join_threads = 2
382+ nb_msgs = 1024 * 64
383+ nb_msgs_w = nb_msgs // write_threads
384+ when_exec_shutdown = nb_msgs_w // 2
385+ # Use of a Barrier to ensure that
386+ # - all write threads put all their items into the queue,
387+ # - all read thread get at least one item from the queue,
388+ # and keep on running until shutdown.
389+ # The join thread is started only when shutdown is immediate.
390+ nparties = write_threads + read_threads
391+ if immediate :
392+ nparties += join_threads
393+ barrier_start = threading .Barrier (nparties )
394+ ev_exec_shutdown = threading .Event ()
395+ lprocs = [
396+ (self ._write_msg_thread , write_threads , (q , nb_msgs_w , res_puts ,
397+ when_exec_shutdown , ev_exec_shutdown ,
398+ barrier_start )),
399+ (self ._read_msg_thread , read_threads , (q , res_gets , barrier_start )),
400+ (self ._shutdown_thread , 1 , (q , res_shutdown , ev_exec_shutdown , immediate )),
401+ ]
402+ if immediate :
403+ lprocs .append ((self ._join_thread , join_threads , (q , barrier_start )))
404+ # start all threads.
389405 for func , n , args in lprocs :
390406 for i in range (n ):
391407 ps .append (threading .Thread (target = func , args = args ))
392408 ps [- 1 ].start ()
393- # set event in order to run q.shutdown()
394- ev_start .set ()
395-
396- if not immediate :
397- assert (len (res_gets ) == len (res_puts ))
398- assert (res_gets .count (True ) == res_puts .count (True ))
399- else :
400- assert (len (res_gets ) <= len (res_puts ))
401- assert (res_gets .count (True ) <= res_puts .count (True ))
402-
403- for thread in ps [1 :]:
409+ for thread in ps :
404410 thread .join ()
405411
406- @unittest .skip ("test times out (gh-115258)" )
412+ self .assertTrue (True in res_puts )
413+ self .assertEqual (res_gets .count (True ), read_threads )
414+ if immediate :
415+ self .assertListEqual (res_shutdown , [True ])
416+ self .assertTrue (q .empty ())
417+
407418 def test_shutdown_all_methods_in_many_threads (self ):
408419 return self ._shutdown_all_methods_in_many_threads (False )
409420
410- @unittest .skip ("test times out (gh-115258)" )
411421 def test_shutdown_immediate_all_methods_in_many_threads (self ):
412422 return self ._shutdown_all_methods_in_many_threads (True )
413423
0 commit comments