Skip to content

Commit 34f9641

Browse files
miss-islingtonserhiy-storchaka
authored andcommitted
[3.14] pythongh-138253: Fix compatibility of sub-interpreters queues with queue.Queue (pythonGH-138256) (python#138367)
Co-authored-by: Serhiy Storchaka <[email protected]>
1 parent b27ff3b commit 34f9641

File tree

3 files changed

+24
-5
lines changed

3 files changed

+24
-5
lines changed

Lib/concurrent/interpreters/_queues.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,13 @@ def full(self):
171171
def qsize(self):
172172
return _queues.get_count(self._id)
173173

174-
def put(self, obj, timeout=None, *,
174+
def put(self, obj, block=True, timeout=None, *,
175175
unbounditems=None,
176176
_delay=10 / 1000, # 10 milliseconds
177177
):
178178
"""Add the object to the queue.
179179
180-
This blocks while the queue is full.
180+
If "block" is true, this blocks while the queue is full.
181181
182182
For most objects, the object received through Queue.get() will
183183
be a new one, equivalent to the original and not sharing any
@@ -210,6 +210,8 @@ def put(self, obj, timeout=None, *,
210210
If "unbounditems" is UNBOUND then it is returned by get() in place
211211
of the unbound item.
212212
"""
213+
if not block:
214+
return self.put_nowait(obj, unbounditems=unbounditems)
213215
if unbounditems is None:
214216
unboundop = -1
215217
else:
@@ -236,17 +238,19 @@ def put_nowait(self, obj, *, unbounditems=None):
236238
unboundop, = _serialize_unbound(unbounditems)
237239
_queues.put(self._id, obj, unboundop)
238240

239-
def get(self, timeout=None, *,
241+
def get(self, block=True, timeout=None, *,
240242
_delay=10 / 1000, # 10 milliseconds
241243
):
242244
"""Return the next object from the queue.
243245
244-
This blocks while the queue is empty.
246+
If "block" is true, this blocks while the queue is empty.
245247
246248
If the next item's original interpreter has been destroyed
247249
then the "next object" is determined by the value of the
248250
"unbounditems" argument to put().
249251
"""
252+
if not block:
253+
return self.get_nowait()
250254
if timeout is not None:
251255
timeout = int(timeout)
252256
if timeout < 0:

Lib/test/test_interpreters/test_queues.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import test._crossinterp_definitions as defs
1313
from .utils import _run_output, TestBase as _TestBase
1414

15-
15+
HUGE_TIMEOUT = 3600
1616
REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
1717

1818

@@ -307,6 +307,8 @@ def test_put_timeout(self):
307307
queue.put(None)
308308
with self.assertRaises(queues.QueueFull):
309309
queue.put(None, timeout=0.1)
310+
with self.assertRaises(queues.QueueFull):
311+
queue.put(None, HUGE_TIMEOUT, 0.1)
310312
queue.get()
311313
queue.put(None)
312314

@@ -316,6 +318,10 @@ def test_put_nowait(self):
316318
queue.put_nowait(None)
317319
with self.assertRaises(queues.QueueFull):
318320
queue.put_nowait(None)
321+
with self.assertRaises(queues.QueueFull):
322+
queue.put(None, False)
323+
with self.assertRaises(queues.QueueFull):
324+
queue.put(None, False, timeout=HUGE_TIMEOUT)
319325
queue.get()
320326
queue.put_nowait(None)
321327

@@ -346,11 +352,17 @@ def test_get_timeout(self):
346352
queue = queues.create()
347353
with self.assertRaises(queues.QueueEmpty):
348354
queue.get(timeout=0.1)
355+
with self.assertRaises(queues.QueueEmpty):
356+
queue.get(HUGE_TIMEOUT, 0.1)
349357

350358
def test_get_nowait(self):
351359
queue = queues.create()
352360
with self.assertRaises(queues.QueueEmpty):
353361
queue.get_nowait()
362+
with self.assertRaises(queues.QueueEmpty):
363+
queue.get(False)
364+
with self.assertRaises(queues.QueueEmpty):
365+
queue.get(False, timeout=HUGE_TIMEOUT)
354366

355367
def test_put_get_full_fallback(self):
356368
expected = list(range(20))
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add the *block* parameter in the :meth:`!put` and :meth:`!get` methods
2+
of the :mod:`concurrent.interpreters` queues for compatibility with the
3+
:class:`queue.Queue` interface.

0 commit comments

Comments
 (0)