Skip to content

multiprocessing.Barrier does not return if waiting process is terminated (tested on windows) #123899

@jpvolkmann

Description

@jpvolkmann

Bug report

Bug description:

I'm using multiprocessing.Barrier to synchronize processes. The individual processes might get terminated/killed by a GUI user interaction.
In case that a process, that already entered the waiting state, is killed, the barrier will never be released, a timeout is not taken into account.
When debugging this issue I realized that it is cause by

self._woken_count.acquire() # wait for a sleeper to wake

self._woken_count is not released by the terminated process and no timeout is specified in the call to self._woken_count.acquire()

If I add a (hardcoded...) timeout to self._woken_count.acquire(True, 2.0) the program continues as expected.
I hope that there a better approaches to fix this than using a hardcoded timeout...

Example script to reproduce error.

import logging
import time
import sys
import multiprocessing as mp
import multiprocessing.synchronize

from pathlib import Path

def process_main(process_index, barrier: multiprocessing.synchronize.Barrier):
    logging.debug('Process %d: Started', process_index)
    time.sleep(0.5)
    logging.debug('Process %d: Waiting for barrier', process_index)
    barrier.wait(timeout=5.0)
    logging.debug('Process %d: Barrier passed', process_index)
    time.sleep(0.5)
    logging.debug('Process %d: Terminated', process_index)

if __name__ == '__main__':
    # Set up logging
    logfile_name = Path(__file__).with_suffix('.log').name
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s %(levelname)s:%(name)s %(message)s',
        handlers=[
            logging.FileHandler(logfile_name, mode='w'),
            logging.StreamHandler(sys.stdout)
        ]    
    )

    instance_count = 4
    barrier = mp.Barrier(instance_count)

    processes = []
    for i in range(instance_count):
        runner_process = mp.Process(
            target=process_main, args=(i, barrier), daemon=True)
        processes.append(runner_process)

    for i, process in enumerate(processes):
        logging.debug('Starting process %d', i)
        process.start()
        time.sleep(0.200)

    # Terminate already waiting process
    logging.debug('Killing process 0')
    processes[0].kill()

    for process in processes:
        process.join()

CPython versions tested on:

3.12

Operating systems tested on:

Windows

Linked PRs

Metadata

Metadata

Assignees

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions