-
-
Notifications
You must be signed in to change notification settings - Fork 33.2k
Description
Bug report
Bug description:
When using two multiprocessing queues and multiple producers to put into these queues the queues will, at some point, get out of sync. Given two multiprocessing queues which are meant to be "synced" acquiring a shared lock before putting into both of the queues should ensure that there is no scenario where another producer can jump in and put elements in the queues. Acquiring the shared lock before getting from both the queues should also prevent any concurrency abnormalities on the consumer side. Despite this, under these conditions when multiple producers are used the consumer will at some point read elements from the queues which were not put into the queue by the same producer. For clarity see the reproduction below.
import multiprocessing as mp
import time
from queue import Empty
def producer(q0, q1, lock, num_puts):
for i in range(num_puts):
with lock:
q0.put(i)
q1.put(i)
def main():
num_puts = 10000
num_producers = 2
q0 = mp.Queue()
q1 = mp.Queue()
lock = mp.Lock()
producers = [mp.Process(target=producer, args=(q0, q1, lock, num_puts)) for _ in range(num_producers)]
[producer.start() for producer in producers]
while True:
if lock.acquire(timeout=1):
try:
r0 = q0.get(timeout=1)
r1 = q1.get(timeout=1)
lock.release()
except Empty:
print("Empty queue")
lock.release()
time.sleep(0.1)
continue
else:
print("Lock acquisition failed")
continue
print(f"res = {(r0, r1)}")
# This assert will fail after a seemingly random number of gets
assert r0 == r1
if __name__ == "__main__":
main()
Using a different Queue implementation such as faster-fifo while maintaining all other aspects removes the issue and everything behaves as expected.
Tested on these versions (provided by uv):
3.10, 3.11, 3.12, 3.13, 3.14.0a5
CPython versions tested on:
3.10
Operating systems tested on:
Linux