diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index cabb41442f8419..14616a6421b179 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -208,6 +208,48 @@ This module defines the following functions: of the result, even when terminated. +.. function:: iter_locked(iterable) + + Convert an iterable into an iterator that performs iteration using locks. + + The ``iter_locked`` makes non-atomic iterators atomic:: + + class non_atomic_iterator: + + def __init__(self, it): + self.it = iter(it) + + def __iter__(self): + return self + + def __next__(self): + a = next(self.it) + b = next(self.it) + return a, b + + atomic_iterator = iter_locked(non_atomic_iterator()) + + The ``iter_locked`` allows concurrent iteration over generator objects. For example:: + + def count(): + i = 0 + while True: + i += 1 + yield i + concurrent_iterator = iter_locked(count()) + + The implementation is roughly equivalent to:: + + class iter_locked(Iterator): + def __init__(self, it): + self._it = iter(it) + self._lock = Lock() + def __next__(self): + with self._lock: + return next(self._it) + + .. versionadded:: next + .. function:: main_thread() Return the main :class:`Thread` object. In normal conditions, the diff --git a/Lib/test/test_free_threading/test_threading_iter_locked.py b/Lib/test/test_free_threading/test_threading_iter_locked.py new file mode 100644 index 00000000000000..30c26b328b5d76 --- /dev/null +++ b/Lib/test/test_free_threading/test_threading_iter_locked.py @@ -0,0 +1,85 @@ +import time +import unittest +from threading import Thread, Barrier, iter_locked +from test.support import threading_helper + + +threading_helper.requires_working_threading(module=True) + +class non_atomic_iterator: + + def __init__(self, it): + self.it = iter(it) + + def __iter__(self): + return self + + def __next__(self): + a = next(self.it) + t = time.perf_counter() + 1e-6 + while time.perf_counter() < t: + pass + b = next(self.it) + return a, b + +def count(): + i = 0 + while True: + i += 1 + yield i + +class iter_lockedThreading(unittest.TestCase): + + @threading_helper.reap_threads + def test_iter_locked(self): + number_of_threads = 10 + number_of_iterations = 8 + barrier = Barrier(number_of_threads) + def work(it): + while True: + try: + a, b = next(it) + assert a + 1 == b + except StopIteration: + break + + data = tuple(range(400)) + for it in range(number_of_iterations): + iter_locked_iterator = iter_locked(non_atomic_iterator(data,)) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[iter_locked_iterator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + + @threading_helper.reap_threads + def test_iter_locked_generator(self): + number_of_threads = 5 + number_of_iterations = 4 + barrier = Barrier(number_of_threads) + def work(it): + barrier.wait() + for _ in range(1_000): + try: + next(it) + except StopIteration: + break + + for it in range(number_of_iterations): + generator = iter_locked(count()) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[generator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 00a3037c3e1e01..da74528a296519 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2496,6 +2496,45 @@ def run_last(): self.assertIn("RuntimeError: can't register atexit after shutdown", err.decode()) +class IterLockedTests(unittest.TestCase): + + def test_iter_locked(self): + for s in ("123", [], [1, 2, 3], tuple(), (1, 2, 3)): + expected = list(s) + actual = list(threading.iter_locked(s)) + self.assertEqual(actual, expected) + for arg in [1, None, True, sys]: + self.assertRaises(TypeError, threading.iter_locked, arg) + + def test_iter_locked_recursive_generator(self): + def g(): + yield next(it) + + it = threading.iter_locked(g()) + with self.assertRaises(ValueError): + next(it) + + def test_iter_locked_recursive_iterator(self): + class evil_it: + def __init__(self): + self.it = iter(range(100)) + self.it2 = None + + def __iter__(self): + return self + + def __next__(self): + if self.it2 is not None: + it2, self.it2 = self.it2, None + return next(it2), next(it2) + return next(self.it) + + a = evil_it() + il = threading.iter_locked(a) + assert next(il) == 0 + a.it2 = il + assert next(il) == (1, 2) + if __name__ == "__main__": unittest.main() diff --git a/Lib/threading.py b/Lib/threading.py index b6c451d1fbaabd..dcc8c53003811c 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -8,6 +8,7 @@ from time import monotonic as _time from _weakrefset import WeakSet from itertools import count as _count +from collections.abc import Iterator try: from _collections import deque as _deque except ImportError: @@ -29,7 +30,7 @@ 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size', 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile', - 'setprofile_all_threads','settrace_all_threads'] + 'setprofile_all_threads','settrace_all_threads', 'iter_locked'] # Rename some stuff so "from threading import *" is safe _start_joinable_thread = _thread.start_joinable_thread @@ -1632,3 +1633,13 @@ def _after_fork(): if hasattr(_os, "register_at_fork"): _os.register_at_fork(after_in_child=_after_fork) + +class iter_locked(Iterator): + def __init__(self, it): + """Convert an iterable into an iterator that performs iteration using locks """ + self._it = iter(it) + self._lock = RLock() + + def __next__(self): + with self._lock: + return next(self._it) diff --git a/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst b/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst new file mode 100644 index 00000000000000..c415cd8ece6202 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst @@ -0,0 +1 @@ +Add :meth:`threading.iter_locked` to make concurrent iteration over an iterable execute using a lock.