diff --git a/billiard/pool.py b/billiard/pool.py index da4cc4e..e4c12a8 100644 --- a/billiard/pool.py +++ b/billiard/pool.py @@ -1659,7 +1659,12 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, debug('helping task handler/workers to finish') cls._help_stuff_finish(*help_stuff_finish_args) - result_handler.terminate() + # Send the sentinel to the result handler but don't terminate the + # result handler thread. This allows the thread to continue + # processing results in ResultHandler.finish_at_shutdown() until + # the cache is drained, ensuring that all task results are properly + # stored. A call to ResultHandler.terminate() is not necessary here + # because the thread will exit naturally when the cache becomes empty. cls._set_result_sentinel(outqueue, pool) if timeout_handler is not None: diff --git a/t/unit/test_pool.py b/t/unit/test_pool.py index fba1546..77ff91c 100644 --- a/t/unit/test_pool.py +++ b/t/unit/test_pool.py @@ -15,6 +15,8 @@ def get_on_ready_count(): worker = inspect.stack()[1].frame.f_locals['self'] return worker.on_ready_counter.value +def simple_task(x): + return x * 2 class test_pool: def test_raises(self): @@ -56,3 +58,53 @@ def test_on_ready_counter_is_synchronized(self): pool.close() pool.join() pool.terminate() + + def test_graceful_shutdown_delivers_results(self): + """Test that queued results are delivered during pool shutdown. + + Specifically, this test verifies that when _terminate_pool() is called, + the ResultHandler.finish_at_shutdown() continues processing results + that workers have placed in the outqueue. + """ + + # Create pool with threads=False so that the result handler thread does + # not start and the task results are allowed to build up in the queue. + pool = billiard.pool.Pool(processes=2, threads=False) + + # Submit tasks so that results are queued but not processed. + results = [pool.apply_async(simple_task, (i,)) for i in range(8)] + + # Allow a small amount of time for tasks to complete. + time.sleep(0.5) + + # Close and join the pool to ensure workers stop. + pool.close() + pool.join() + + # Call the _terminate_pool() class method to trigger the finish_at_shutdown() + # function that will process results in the queue. Normally _terminate_pool() + # is called by a Finalize object when the Pool object is destroyed. We cannot + # call pool.terminate() here because it will call the Finalize object, which + # won't do anything until the Pool object is destroyed at the end of this test. + # We can simulate the shutdown behaviour by calling _terminate_pool() directly. + billiard.pool.Pool._terminate_pool( + pool._taskqueue, + pool._inqueue, + pool._outqueue, + pool._pool, + pool._worker_handler, + pool._task_handler, + pool._result_handler, + pool._cache, + pool._timeout_handler, + pool._help_stuff_finish_args() + ) + + # Cancel the Finalize object to prevent _terminate_pool() from being called + # a second time when the Pool object is destroyed. + pool._terminate.cancel() + + # Verify that all results were delivered by finish_at_shutdown() and can be + # retrieved. + for i, result in enumerate(results): + assert result.get() == i * 2 \ No newline at end of file