Skip to content
Open
42 changes: 42 additions & 0 deletions Doc/library/threading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,48 @@ This module defines the following functions:
of the result, even when terminated.


.. function:: iter_locked(iterable)

Convert an iterable into an iterator that performs iteration using locks.

The ``iter_locked`` makes non-atomic iterators atomic::

class non_atomic_iterator:

def __init__(self, it):
self.it = iter(it)

def __iter__(self):
return self

def __next__(self):
a = next(self.it)
b = next(self.it)
return a, b

atomic_iterator = iter_locked(non_atomic_iterator())

The ``iter_locked`` allows concurrent iteration over generator objects. For example::

def count():
i = 0
while True:
i += 1
yield i
concurrent_iterator = iter_locked(count())

The implementation is roughly equivalent to::

class iter_locked(Iterator):
def __init__(self, it):
self._it = iter(it)
self._lock = Lock()
def __next__(self):
with self._lock:
return next(self._it)

.. versionadded:: next

.. function:: main_thread()

Return the main :class:`Thread` object. In normal conditions, the
Expand Down
85 changes: 85 additions & 0 deletions Lib/test/test_free_threading/test_threading_iter_locked.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import time
import unittest
from threading import Thread, Barrier, iter_locked
from test.support import threading_helper


threading_helper.requires_working_threading(module=True)

class non_atomic_iterator:

def __init__(self, it):
self.it = iter(it)

def __iter__(self):
return self

def __next__(self):
a = next(self.it)
t = time.perf_counter() + 1e-6
while time.perf_counter() < t:
pass
b = next(self.it)
return a, b

def count():
i = 0
while True:
i += 1
yield i

class iter_lockedThreading(unittest.TestCase):

@threading_helper.reap_threads
def test_iter_locked(self):
number_of_threads = 10
number_of_iterations = 8
barrier = Barrier(number_of_threads)
def work(it):
while True:
try:
a, b = next(it)
assert a + 1 == b
except StopIteration:
break

data = tuple(range(400))
for it in range(number_of_iterations):
iter_locked_iterator = iter_locked(non_atomic_iterator(data,))
worker_threads = []
for ii in range(number_of_threads):
worker_threads.append(
Thread(target=work, args=[iter_locked_iterator]))

with threading_helper.start_threads(worker_threads):
pass

barrier.reset()

@threading_helper.reap_threads
def test_iter_locked_generator(self):
number_of_threads = 5
number_of_iterations = 4
barrier = Barrier(number_of_threads)
def work(it):
barrier.wait()
for _ in range(1_000):
try:
next(it)
except StopIteration:
break

for it in range(number_of_iterations):
generator = iter_locked(count())
worker_threads = []
for ii in range(number_of_threads):
worker_threads.append(
Thread(target=work, args=[generator]))

with threading_helper.start_threads(worker_threads):
pass

barrier.reset()

if __name__ == "__main__":
unittest.main()
39 changes: 39 additions & 0 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -2496,6 +2496,45 @@ def run_last():
self.assertIn("RuntimeError: can't register atexit after shutdown",
err.decode())

class IterLockedTests(unittest.TestCase):

def test_iter_locked(self):
for s in ("123", [], [1, 2, 3], tuple(), (1, 2, 3)):
expected = list(s)
actual = list(threading.iter_locked(s))
self.assertEqual(actual, expected)
for arg in [1, None, True, sys]:
self.assertRaises(TypeError, threading.iter_locked, arg)

def test_iter_locked_recursive_generator(self):
def g():
yield next(it)

it = threading.iter_locked(g())
with self.assertRaises(ValueError):
next(it)

def test_iter_locked_recursive_iterator(self):
class evil_it:
def __init__(self):
self.it = iter(range(100))
self.it2 = None

def __iter__(self):
return self

def __next__(self):
if self.it2 is not None:
it2, self.it2 = self.it2, None
return next(it2), next(it2)
return next(self.it)

a = evil_it()
il = threading.iter_locked(a)
assert next(il) == 0
a.it2 = il
assert next(il) == (1, 2)


if __name__ == "__main__":
unittest.main()
13 changes: 12 additions & 1 deletion Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from time import monotonic as _time
from _weakrefset import WeakSet
from itertools import count as _count
from collections.abc import Iterator
try:
from _collections import deque as _deque
except ImportError:
Expand All @@ -29,7 +30,7 @@
'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
'setprofile', 'settrace', 'local', 'stack_size',
'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile',
'setprofile_all_threads','settrace_all_threads']
'setprofile_all_threads','settrace_all_threads', 'iter_locked']

# Rename some stuff so "from threading import *" is safe
_start_joinable_thread = _thread.start_joinable_thread
Expand Down Expand Up @@ -1632,3 +1633,13 @@ def _after_fork():

if hasattr(_os, "register_at_fork"):
_os.register_at_fork(after_in_child=_after_fork)

class iter_locked(Iterator):
def __init__(self, it):
"""Convert an iterable into an iterator that performs iteration using locks """
self._it = iter(it)
self._lock = RLock()

def __next__(self):
with self._lock:
return next(self._it)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add :meth:`threading.iter_locked` to make concurrent iteration over an iterable execute using a lock.
Loading