Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions Lib/concurrent/interpreters/_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ def full(self):
def qsize(self):
return _queues.get_count(self._id)

def put(self, obj, timeout=None, *,
def put(self, obj, block=True, timeout=None, *,
unbounditems=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.

This blocks while the queue is full.
If "block" is true, this blocks while the queue is full.

For most objects, the object received through Queue.get() will
be a new one, equivalent to the original and not sharing any
Expand Down Expand Up @@ -210,6 +210,8 @@ def put(self, obj, timeout=None, *,
If "unbounditems" is UNBOUND then it is returned by get() in place
of the unbound item.
"""
if not block:
return self.put_nowait(obj, unbounditems=unbounditems)
if unbounditems is None:
unboundop = -1
else:
Expand All @@ -236,17 +238,19 @@ def put_nowait(self, obj, *, unbounditems=None):
unboundop, = _serialize_unbound(unbounditems)
_queues.put(self._id, obj, unboundop)

def get(self, timeout=None, *,
def get(self, block=True, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
):
"""Return the next object from the queue.

This blocks while the queue is empty.
If "block" is true, this blocks while the queue is empty.

If the next item's original interpreter has been destroyed
then the "next object" is determined by the value of the
"unbounditems" argument to put().
"""
if not block:
return self.get_nowait()
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
Expand Down
14 changes: 13 additions & 1 deletion Lib/test/test_interpreters/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import test._crossinterp_definitions as defs
from .utils import _run_output, TestBase as _TestBase


HUGE_TIMEOUT = 3600
REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]


Expand Down Expand Up @@ -307,6 +307,8 @@ def test_put_timeout(self):
queue.put(None)
with self.assertRaises(queues.QueueFull):
queue.put(None, timeout=0.1)
with self.assertRaises(queues.QueueFull):
queue.put(None, HUGE_TIMEOUT, 0.1)
queue.get()
queue.put(None)

Expand All @@ -316,6 +318,10 @@ def test_put_nowait(self):
queue.put_nowait(None)
with self.assertRaises(queues.QueueFull):
queue.put_nowait(None)
with self.assertRaises(queues.QueueFull):
queue.put(None, False)
with self.assertRaises(queues.QueueFull):
queue.put(None, False, timeout=HUGE_TIMEOUT)
queue.get()
queue.put_nowait(None)

Expand Down Expand Up @@ -346,11 +352,17 @@ def test_get_timeout(self):
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get(timeout=0.1)
with self.assertRaises(queues.QueueEmpty):
queue.get(HUGE_TIMEOUT, 0.1)

def test_get_nowait(self):
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
with self.assertRaises(queues.QueueEmpty):
queue.get(False)
with self.assertRaises(queues.QueueEmpty):
queue.get(False, timeout=HUGE_TIMEOUT)

def test_put_get_full_fallback(self):
expected = list(range(20))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add the *block* parameter in the :meth:`!put` and :meth:`!get` methods
of the :mod:`concurrent.interpreters` queues for compatibility with the
:class:`queue.Queue` interface.
Loading