- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 33.3k
gh-115258: Fix failed tests on threading queue shutdown #115940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gh-115258: Fix failed tests on threading queue shutdown #115940
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change! Had some minor comments when reviewing, please see inline
8b24985    to
    f246bf8      
    Compare
  
    | Hmm I might be missing something but it looks like I can't see the latest changes? | 
| 
 Hi, I was on vacation. The changes are scheduled for early next week. | 
| Ah sorry for that! Enjoy your holiday :D | 
f246bf8    to
    fd811b5      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test hangs when adding many time.sleep(random.random() / 10.0) calls between each statement
Diff (click to expand)
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py
index 9dc7f62999..d223c358e0 100644
--- a/Lib/test/test_queue.py
+++ b/Lib/test/test_queue.py
@@ -320,58 +320,90 @@ def test_shutdown_immediate_all_methods_in_one_thread(self):
     def _write_msg_thread(self, q, n, results,
                             i_when_exec_shutdown, event_shutdown,
                             barrier_start):
+        time.sleep(random.random() / 10.0)
         # All `write_msg_threads`
         # put several items into the queue.
         for i in range(0, i_when_exec_shutdown//2):
+            time.sleep(random.random() / 10.0)
             q.put((i, 'LOYD'))
+        time.sleep(random.random() / 10.0)
         # Wait for the barrier to be complete.
         barrier_start.wait()
+        time.sleep(random.random() / 10.0)
 
         for i in range(i, n):
+            time.sleep(random.random() / 10.0)
             try:
                 q.put((i, "YDLO"))
             except self.queue.ShutDown:
+                time.sleep(random.random() / 10.0)
                 results.append(False)
+                time.sleep(random.random() / 10.0)
                 break
+            time.sleep(random.random() / 10.0)
 
             # Trigger queue shutdown.
             if i == i_when_exec_shutdown:
+                time.sleep(random.random() / 10.0)
                 # Only once thread do it.
                 if not event_shutdown.is_set():
+                    time.sleep(random.random() / 10.0)
                     event_shutdown.set()
+                    time.sleep(random.random() / 10.0)
                     results.append(True)
+        time.sleep(random.random() / 10.0)
         q.join()
+        time.sleep(random.random() / 10.0)
 
     def _read_msg_thread(self, q, results, barrier_start):
         # Wait for the barrier to be complete.
+        time.sleep(random.random() / 10.0)
         barrier_start.wait()
+        time.sleep(random.random() / 10.0)
         while True:
+            time.sleep(random.random() / 10.0)
             try:
                 q.get(False)
+                time.sleep(random.random() / 10.0)
                 q.task_done()
             except self.queue.ShutDown:
+                time.sleep(random.random() / 10.0)
                 results.append(True)
+                time.sleep(random.random() / 10.0)
                 break
             except self.queue.Empty:
                 pass
+            time.sleep(random.random() / 10.0)
+        time.sleep(random.random() / 10.0)
         q.join()
+        time.sleep(random.random() / 10.0)
 
     def _shutdown_thread(self, q, results, event_end, immediate):
+        time.sleep(random.random() / 10.0)
         event_end.wait()
+        time.sleep(random.random() / 10.0)
         q.shutdown(immediate)
+        time.sleep(random.random() / 10.0)
         results.append(q.qsize() == 0)
+        time.sleep(random.random() / 10.0)
         q.join()
+        time.sleep(random.random() / 10.0)
 
     def _join_thread(self, q, barrier_start):
+        time.sleep(random.random() / 10.0)
         # Wait for the barrier to be complete.
         barrier_start.wait()
+        time.sleep(random.random() / 10.0)
         q.join()
+        time.sleep(random.random() / 10.0)
 
     def _shutdown_all_methods_in_many_threads(self, immediate):
         # Run a 'multi-producers/consumers queue' use case,
         # with enough items into the queue.
         # When shutdown, all running threads will be concerned.
+        time.sleep(random.random() / 10.0)
         q = self.type2test()
+        time.sleep(random.random() / 10.0)
         ps = []
         res_puts = []
         res_gets = []
@@ -382,11 +414,14 @@ def _shutdown_all_methods_in_many_threads(self, immediate):
         nb_msgs = 1024*64
         nb_msgs_w = nb_msgs // write_threads
         when_exec_shutdown = nb_msgs_w // 2
+        time.sleep(random.random() / 10.0)
         # Use of a `threading.Barrier`` to ensure that all `_write_msg_threads`
         # put their part of items into the queue. And trigger the start of
         # other threads as `_read_msg_thread`and `_join_thread`.
         barrier_start = threading.Barrier(write_threads+read_threads+join_threads)
+        time.sleep(random.random() / 10.0)
         ev_exec_shutdown = threading.Event()
+        time.sleep(random.random() / 10.0)
         lprocs = (
             (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
                                             when_exec_shutdown, ev_exec_shutdown,
@@ -395,19 +430,34 @@ def _shutdown_all_methods_in_many_threads(self, immediate):
             (self._join_thread, join_threads, (q, barrier_start)),
             (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)),
             )
+        time.sleep(random.random() / 10.0)
         # start all threads.
         for func, n, args in lprocs:
+            time.sleep(random.random() / 10.0)
             for i in range(n):
+                time.sleep(random.random() / 10.0)
                 ps.append(threading.Thread(target=func, args=args))
+                time.sleep(random.random() / 10.0)
                 ps[-1].start()
+                time.sleep(random.random() / 10.0)
+            time.sleep(random.random() / 10.0)
+        time.sleep(random.random() / 10.0)
         for thread in ps:
+            time.sleep(random.random() / 10.0)
             thread.join()
+            time.sleep(random.random() / 10.0)
+        time.sleep(random.random() / 10.0)
 
         self.assertEqual(res_puts.count(True), 1)
+        time.sleep(random.random() / 10.0)
         self.assertLessEqual(res_gets.count(True), read_threads)
+        time.sleep(random.random() / 10.0)
         if immediate:
+            time.sleep(random.random() / 10.0)
             self.assertListEqual(res_shutdown, [True])
+            time.sleep(random.random() / 10.0)
             self.assertTrue(q.empty())
+            time.sleep(random.random() / 10.0)
 
     def test_shutdown_all_methods_in_many_threads(self):
         return self._shutdown_all_methods_in_many_threads(False)Also, perhaps look into threading_helper.join_thread(thread) and threading_helper.start_threads(threads) (from test.support) for better thread management. They should fail the test instead of hanging
        
          
                Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | 
 When I run your modified version of the test, it hangs, but only because  | 
| 
 @YvesDup it wasn't missing, just was outside of the diff context 
 Tests finish also for me when I instead use  FAIL: test_shutdown_all_methods_in_many_threads (test.test_queue.PyPriorityQueueTest.test_shutdown_all_methods_in_many_threads)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "~/src/cpython/Lib/test/test_queue.py", line 463, in test_shutdown_all_methods_in_many_threads
    return self._shutdown_all_methods_in_many_threads(False)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
  File "~/src/cpython/Lib/test/test_queue.py", line 451, in _shutdown_all_methods_in_many_threads
    self.assertEqual(res_puts.count(True), 1)
    ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: 2 != 1 | 
| Thank for adding all these  
 About this message, I have got this failed test once. That means here that there are 2  That said, I am going to: 
 I will submit a new version of this test. | 
of 'test_shutdown_[immediate_]all_methods_in_many_threads' unittests
…_many_threads` methods, with a code refactoring. Add a `results` list to the `_shutdown_thread` method. Add tests. Fix nit.
50bddd3    to
    b9ee958      
    Compare
  
    Start `join_thread` only when shutdown is immediate. Update tests.
| Please don't force-push. It makes reviewing harder. We squash-merge ultimately anyway. Unless you're not ready to have this reviewed, in which case the PR should be in Draft mode. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe I am now following what this does. Let me summarize, please check my understanding.
- We start 4 writer threads, each of which is going to put 16*1024 items into the queue.
- (Actually, one more per thread, probably by mistake.)
- When the first of these threads reaches half that number, it calls shutdown().
- All writer threads also have to reach halfway before passing the barrier.
- There are also 6 reader threads. These each get at least one item, wait for the barrier, and then busy-wait until the queue gets shut down.
- There are two variants, one for shutdown(True), one for shutdown(False).
- For shutdown(immediate=True), we also start a join thread.
- All threads must complete for the test to pass.
I am a little unclear on what the test is trying to prove. It seems to be stress testing concurrent reading and writing of the queue plus shutdown. Is that what it is after?
        
          
                Misc/NEWS.d/next/Tests/2024-02-26-15-04-57.gh-issue-115258.p__Nfv.rst
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | 
 Your understanding is correct. The aim is to have a queue not empty and check that all running threads stop correctly when queue shutdows. There are 3 threads types which used each one of methods of Queue class ( 
 if you or/and @EpicWink think this test case is not really useful, I'll let you decide what to do with it. | 
| 
 Then shouldn't the assert be         self.assertEqual(res_gets.count(True), read_threads)(instead of  | 
| 
 Yes | 
Fix start value of range. Change `self.assertLessEqual` to `self.assertEqual`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, LGTM.
@EpicWink Do you agree that this test is worth having and that this version looks bullet-proof?
| 🤖 New build scheduled with the buildbot fleet by @gvanrossum for commit 349d08a 🤖 If you want to schedule another build, you need to add the 🔨 test-with-buildbots label again. | 
| I've run the modified tests ~50 times, both with and without GIL, both single-process and 15-processes, both with and without the  I agree the purpose of the test is not very indicative of any real-world scenario, but it does seem to test invariants so I didn't push back against its inclusion. However, the NEWS entry still hasn't been removed. | 
| Thanks @EpicWink for the mention in the whatsnew3.13. | 
…on#115940) This reinstates `test_shutdown_immediate_all_methods_in_many_threads` and improves `test_shutdown_all_methods_in_many_threads`.
…on#115940) This reinstates `test_shutdown_immediate_all_methods_in_many_threads` and improves `test_shutdown_all_methods_in_many_threads`.
…on#115940) This reinstates `test_shutdown_immediate_all_methods_in_many_threads` and improves `test_shutdown_all_methods_in_many_threads`.
Fix infinite loop in
_read_msg_threadoftest_shutdown_[immediate_]all_methods_in_many_threadsunittests.test_queuetimes out #115258