-
-
Notifications
You must be signed in to change notification settings - Fork 33k
Closed as not planned
Closed as not planned
Copy link
Labels
stdlibStandard Library Python modules in the Lib/ directoryStandard Library Python modules in the Lib/ directorytopic-multiprocessingtype-bugAn unexpected behavior, bug, or errorAn unexpected behavior, bug, or error
Description
Bug report
Bug description:
I found that the following code will encounter a deadlock when running, and delete lines 34 and 39 or use PYTHONUNBUFFERED=1 will not cause a deadlock.
Is this related to the buffer zone?
code:
import threading
import queue
import time
import random
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
class ConsumerThreadManager:
def __init__(self, num_consumers=3, name="ConsumerThreadManager"):
self.task_queue = queue.Queue()
self.num_consumers = num_consumers
self.consumer_threads = []
self.name = name
def start(self):
print(f"thread name: {self.name} thread num: {self.num_consumers}")
for i in range(self.num_consumers):
t = threading.Thread(
target=self._consumer_worker,
name=f"{self.name}-Worker-{i}",
daemon=True
)
t.start()
self.consumer_threads.append(t)
print(f"{self.name} is running")
def _consumer_worker(self):
thread_name = threading.current_thread().name
print(f"{thread_name} start to run")
while True:
try:
task = self.task_queue.get(timeout=10)
print(f"{thread_name} get task: {task} (queue size: {self.task_queue.qsize()})")
time.sleep(random.uniform(0.01, 0.02))
self.task_queue.task_done()
print(f"{thread_name} complete task: {task}")
except queue.Empty:
print(f"{thread_name} detected that the queue has been empty for a long time,quit")
break
print(f"{thread_name} is stop")
def add_task(self, task):
self.task_queue.put(task)
def wait_completion(self):
print("wait all task end...")
self.task_queue.join()
print("all task end")
def child_process_task(process_id):
print(f"(PID: {os.getpid()}) start to run")
time.sleep(random.uniform(0.01, 0.02))
print(f"{process_id} end to run")
return 0
def compute():
num_processes = 11
all_tasks = []
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = []
for i in range(20):
future = executor.submit(child_process_task, i)
futures.append(future)
for future in as_completed(futures):
try:
result = future.result()
all_tasks.append(result)
except Exception as e:
raise e
return all_tasks
def main():
consumer_manager = ConsumerThreadManager(num_consumers=3)
consumer_manager.start()
for i in range(0, 10):
all_tasks = compute()
for task in all_tasks:
consumer_manager.add_task(task)
consumer_manager.wait_completion()
print("main program end")
if __name__ == "__main__":
main()
CPython versions tested on:
3.13
Operating systems tested on:
Linux
Metadata
Metadata
Assignees
Labels
stdlibStandard Library Python modules in the Lib/ directoryStandard Library Python modules in the Lib/ directorytopic-multiprocessingtype-bugAn unexpected behavior, bug, or errorAn unexpected behavior, bug, or error