Skip to content

Commit 6b76a37

Browse files
committed
Optimize concurrent.futures→asyncio state transfer with atomic snapshot
This PR significantly improves performance when transferring future state from `concurrent.futures.Future` to `asyncio.Future`, a common operation when dispatching executor jobs in asyncio applications. The current `_copy_future_state` implementation requires multiple method calls and lock acquisitions to retrieve the source future's state: 1. `done()` - acquires lock to check state 2. `cancelled()` - acquires lock again 3. `exception()` - acquires lock to get exception 4. `result()` - acquires lock to get result Each method call involves thread synchronization overhead, making this operation a bottleneck for high-frequency executor dispatches. Our use case involves dispatching a large number of small executor jobs from `asyncio` to a thread pool. These jobs typically involve `open` or `stat` on files that are already cached by the OS, so the actual I/O returns almost instantly. However, we still have to offload them to avoid blocking the event loop, since there's no reliable way to determine in advance whether a read will hit the cache. As a result, the majority of the overhead isn't from the I/O itself, but from the cost of scheduling. Most of the time is spent copying future state, which involves locking. This PR reduces that overhead, which has a meaningful impact at scale. Add a new `_get_snapshot()` method to `concurrent.futures.Future` that atomically retrieves all state information in a single lock acquisition: - Returns tuple: `(done, cancelled, result, exception)` - Uses optimized fast path for already-finished futures (no lock needed) - Provides atomic state capture for other states The `_copy_future_state` function in `asyncio` now uses this snapshot method when available, falling back to the traditional approach for backwards compatibility. Benchmark results show dramatic improvements for the common case: - **concurrent.futures→asyncio transfer: 4.12x faster** - asyncio→asyncio transfer: Slightly slower (1.05x) due to hasattr check (I couldn't find any places where this actually happens though as it looks like `_chain_future` the only entry point to `_copy_future_state` and it is always called with `concurrent.futures.Future`) This optimization particularly benefits applications that: - Dispatch many small executor jobs (e.g., filesystem operations, DNS lookups) - Use thread pools for I/O-bound operations in asyncio - Have high frequency of executor task completion - Adds `_get_snapshot()` to `concurrent.futures.Future` for atomic state retrieval - Updates `_copy_future_state()` to prefer snapshot method when available - Maintains full backwards compatibility with existing code - Minimal code changes with focused optimization These show consistent 4x+ speedup for the critical concurrent.futures→asyncio path. ``` === 1. Benchmarking concurrent.futures -> asyncio === Running original... concurrent_to_asyncio: Mean +- std dev: 986 ns +- 16 ns Running optimized... concurrent_to_asyncio: Mean +- std dev: 239 ns +- 4 ns Comparison: Mean +- std dev: [concurrent_original] 986 ns +- 16 ns -> [concurrent_optimized] 239 ns +- 4 ns: 4.12x faster === 2. Benchmarking asyncio -> asyncio === Running original... asyncio_to_asyncio: Mean +- std dev: 221 ns +- 4 ns Running optimized... asyncio_to_asyncio: Mean +- std dev: 232 ns +- 4 ns Comparison: Mean +- std dev: [asyncio_original] 221 ns +- 4 ns -> [asyncio_optimized] 232 ns +- 4 ns: 1.05x slower Cleaning up... ``` ```python import pyperf import concurrent.futures import asyncio import subprocess import os import sys def write_benchmark_scripts(): """Write individual benchmark scripts for each scenario.""" # Common helper code common_imports = ''' import pyperf import concurrent.futures import asyncio def _convert_future_exc(exc): exc_class = type(exc) if exc_class is concurrent.futures.CancelledError: return asyncio.CancelledError(*exc.args) elif exc_class is concurrent.futures.TimeoutError: return asyncio.TimeoutError(*exc.args) elif exc_class is concurrent.futures.InvalidStateError: return asyncio.InvalidStateError(*exc.args) else: return exc ''' # Optimization patch code optimization_patch = ''' FINISHED = concurrent.futures._base.FINISHED CANCELLED = concurrent.futures._base.CANCELLED CANCELLED_AND_NOTIFIED = concurrent.futures._base.CANCELLED_AND_NOTIFIED def _get_snapshot_implementation(self): """Get a snapshot of the future's current state.""" # Fast path: check if already finished without lock if self._state == FINISHED: return True, False, self._result, self._exception # Need lock for other states since they can change with self._condition: if self._state == FINISHED: return True, False, self._result, self._exception if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}: return True, True, None, None return False, False, None, None concurrent.futures.Future._get_snapshot = _get_snapshot_implementation ''' # Original copy implementation original_copy = ''' def copy_future_original(source, dest): """Original implementation using individual method calls.""" if dest.cancelled(): return if hasattr(source, 'done'): assert source.done() if source.cancelled(): dest.cancel() else: exception = source.exception() if exception is not None: dest.set_exception(_convert_future_exc(exception)) else: result = source.result() dest.set_result(result) ''' # Optimized copy implementation optimized_copy = ''' def copy_future_optimized(source, dest): """Optimized implementation using _get_snapshot when available.""" if dest.cancelled(): return # Use _get_snapshot for futures that support it if hasattr(source, '_get_snapshot'): done, cancelled, result, exception = source._get_snapshot() assert done if cancelled: dest.cancel() elif exception is not None: dest.set_exception(_convert_future_exc(exception)) else: dest.set_result(result) return # Traditional fallback for asyncio.Future if hasattr(source, 'done'): assert source.done() if source.cancelled(): dest.cancel() else: exception = source.exception() if exception is not None: dest.set_exception(_convert_future_exc(exception)) else: result = source.result() dest.set_result(result) ''' # 1. concurrent.futures -> asyncio (original) with open('bench_concurrent_to_asyncio_original.py', 'w') as f: f.write(common_imports + original_copy + ''' source = concurrent.futures.Future() source.set_result(42) loop = asyncio.new_event_loop() def task(): """Single copy operation benchmark.""" dest = asyncio.Future(loop=loop) copy_future_original(source, dest) dest.cancel() runner = pyperf.Runner() runner.bench_func('concurrent_to_asyncio', task) ''') # 2. concurrent.futures -> asyncio (optimized) with open('bench_concurrent_to_asyncio_optimized.py', 'w') as f: f.write(common_imports + optimization_patch + optimized_copy + ''' source = concurrent.futures.Future() source.set_result(42) loop = asyncio.new_event_loop() def task(): """Single copy operation benchmark.""" dest = asyncio.Future(loop=loop) copy_future_optimized(source, dest) dest.cancel() runner = pyperf.Runner() runner.bench_func('concurrent_to_asyncio', task) ''') # 3. asyncio -> asyncio (original) with open('bench_asyncio_to_asyncio_original.py', 'w') as f: f.write(common_imports + original_copy + ''' loop = asyncio.new_event_loop() source = asyncio.Future(loop=loop) source.set_result(42) def task(): """Single copy operation benchmark.""" dest = asyncio.Future(loop=loop) copy_future_original(source, dest) dest.cancel() runner = pyperf.Runner() runner.bench_func('asyncio_to_asyncio', task) ''') # 4. asyncio -> asyncio (optimized - should use fallback) with open('bench_asyncio_to_asyncio_optimized.py', 'w') as f: f.write(common_imports + optimization_patch + optimized_copy + ''' loop = asyncio.new_event_loop() source = asyncio.Future(loop=loop) source.set_result(42) def task(): """Single copy operation benchmark.""" dest = asyncio.Future(loop=loop) copy_future_optimized(source, dest) dest.cancel() runner = pyperf.Runner() runner.bench_func('asyncio_to_asyncio', task) ''') def run_benchmarks(): """Run all benchmarks and compare results.""" print("Writing benchmark scripts...") write_benchmark_scripts() # Clean up old results for f in ['concurrent_original.json', 'concurrent_optimized.json', 'asyncio_original.json', 'asyncio_optimized.json']: if os.path.exists(f): os.remove(f) print("\n=== 1. Benchmarking concurrent.futures -> asyncio ===") print("Running original...") subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_original.py', '-o', 'concurrent_original.json', '--quiet']) print("Running optimized...") subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_optimized.py', '-o', 'concurrent_optimized.json', '--quiet']) print("\nComparison:") subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to', 'concurrent_original.json', 'concurrent_optimized.json']) print("\n=== 2. Benchmarking asyncio -> asyncio ===") print("Running original...") subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_original.py', '-o', 'asyncio_original.json', '--quiet']) print("Running optimized...") subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_optimized.py', '-o', 'asyncio_optimized.json', '--quiet']) print("\nComparison:") subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to', 'asyncio_original.json', 'asyncio_optimized.json']) # Clean up print("\nCleaning up...") for f in ['bench_concurrent_to_asyncio_original.py', 'bench_concurrent_to_asyncio_optimized.py', 'bench_asyncio_to_asyncio_original.py', 'bench_asyncio_to_asyncio_optimized.py']: if os.path.exists(f): os.remove(f) print("\n=== Summary ===") print("concurrent.futures -> asyncio: Should show significant speedup") print("asyncio -> asyncio: Should show no regression (fallback path)") if __name__ == "__main__": run_benchmarks() ```
1 parent fa4e088 commit 6b76a37

File tree

4 files changed

+154
-1
lines changed

4 files changed

+154
-1
lines changed

Lib/asyncio/futures.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,10 +353,24 @@ def _copy_future_state(source, dest):
353353
354354
The other Future may be a concurrent.futures.Future.
355355
"""
356-
assert source.done()
357356
if dest.cancelled():
358357
return
359358
assert not dest.done()
359+
360+
# Use _get_snapshot for futures that support it
361+
if hasattr(source, '_get_snapshot'):
362+
done, cancelled, result, exception = source._get_snapshot()
363+
assert done
364+
if cancelled:
365+
dest.cancel()
366+
elif exception is not None:
367+
dest.set_exception(_convert_future_exc(exception))
368+
else:
369+
dest.set_result(result)
370+
return
371+
372+
# Traditional fallback needs done check
373+
assert source.done()
360374
if source.cancelled():
361375
dest.cancel()
362376
else:

Lib/concurrent/futures/_base.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,33 @@ def set_exception(self, exception):
558558
self._condition.notify_all()
559559
self._invoke_callbacks()
560560

561+
def _get_snapshot(self):
562+
"""Get a snapshot of the future's current state.
563+
564+
This method atomically retrieves the state in one lock acquisition,
565+
which is significantly faster than multiple method calls.
566+
567+
Returns:
568+
Tuple of (done, cancelled, result, exception)
569+
- done: True if the future is done (cancelled or finished)
570+
- cancelled: True if the future was cancelled
571+
- result: The result if available and not cancelled
572+
- exception: The exception if available and not cancelled
573+
"""
574+
# Fast path: check if already finished without lock
575+
if self._state == FINISHED:
576+
return True, False, self._result, self._exception
577+
578+
# Need lock for other states since they can change
579+
with self._condition:
580+
# We have to check the state again after acquiring the lock
581+
# because it may have changed in the meantime.
582+
if self._state == FINISHED:
583+
return True, False, self._result, self._exception
584+
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
585+
return True, True, None, None
586+
return False, False, None, None
587+
561588
__class_getitem__ = classmethod(types.GenericAlias)
562589

563590
class Executor(object):

Lib/test/test_asyncio/test_futures.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,56 @@ def test_copy_state(self):
454454
newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__))
455455
self.assertEqual(newf_tb.count('raise concurrent.futures.InvalidStateError'), 1)
456456

457+
def test_copy_state_from_concurrent_futures(self):
458+
"""Test _copy_future_state from concurrent.futures.Future.
459+
460+
This tests the optimized path using _get_snapshot when available.
461+
"""
462+
from asyncio.futures import _copy_future_state
463+
464+
# Test with a result
465+
f_concurrent = concurrent.futures.Future()
466+
f_concurrent.set_result(42)
467+
f_asyncio = self._new_future(loop=self.loop)
468+
_copy_future_state(f_concurrent, f_asyncio)
469+
self.assertTrue(f_asyncio.done())
470+
self.assertEqual(f_asyncio.result(), 42)
471+
472+
# Test with an exception
473+
f_concurrent_exc = concurrent.futures.Future()
474+
f_concurrent_exc.set_exception(ValueError("test exception"))
475+
f_asyncio_exc = self._new_future(loop=self.loop)
476+
_copy_future_state(f_concurrent_exc, f_asyncio_exc)
477+
self.assertTrue(f_asyncio_exc.done())
478+
with self.assertRaises(ValueError) as cm:
479+
f_asyncio_exc.result()
480+
self.assertEqual(str(cm.exception), "test exception")
481+
482+
# Test with cancelled state
483+
f_concurrent_cancelled = concurrent.futures.Future()
484+
f_concurrent_cancelled.cancel()
485+
f_asyncio_cancelled = self._new_future(loop=self.loop)
486+
_copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled)
487+
self.assertTrue(f_asyncio_cancelled.cancelled())
488+
489+
# Test that destination already cancelled prevents copy
490+
f_concurrent_result = concurrent.futures.Future()
491+
f_concurrent_result.set_result(10)
492+
f_asyncio_precancelled = self._new_future(loop=self.loop)
493+
f_asyncio_precancelled.cancel()
494+
_copy_future_state(f_concurrent_result, f_asyncio_precancelled)
495+
self.assertTrue(f_asyncio_precancelled.cancelled())
496+
497+
# Test exception type conversion
498+
f_concurrent_invalid = concurrent.futures.Future()
499+
f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid"))
500+
f_asyncio_invalid = self._new_future(loop=self.loop)
501+
_copy_future_state(f_concurrent_invalid, f_asyncio_invalid)
502+
self.assertTrue(f_asyncio_invalid.done())
503+
with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm:
504+
f_asyncio_invalid.result()
505+
self.assertEqual(str(cm.exception), "invalid")
506+
457507
def test_iter(self):
458508
fut = self._new_future(loop=self.loop)
459509

Lib/test/test_concurrent_futures/test_future.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,68 @@ def test_multiple_set_exception(self):
282282

283283
self.assertEqual(f.exception(), e)
284284

285+
def test_get_snapshot(self):
286+
"""Test the _get_snapshot method for atomic state retrieval."""
287+
# Test with a pending future
288+
f = Future()
289+
done, cancelled, result, exception = f._get_snapshot()
290+
self.assertFalse(done)
291+
self.assertFalse(cancelled)
292+
self.assertIsNone(result)
293+
self.assertIsNone(exception)
294+
295+
# Test with a finished future (successful result)
296+
f = Future()
297+
f.set_result(42)
298+
done, cancelled, result, exception = f._get_snapshot()
299+
self.assertTrue(done)
300+
self.assertFalse(cancelled)
301+
self.assertEqual(result, 42)
302+
self.assertIsNone(exception)
303+
304+
# Test with a finished future (exception)
305+
f = Future()
306+
exc = ValueError("test error")
307+
f.set_exception(exc)
308+
done, cancelled, result, exception = f._get_snapshot()
309+
self.assertTrue(done)
310+
self.assertFalse(cancelled)
311+
self.assertIsNone(result)
312+
self.assertEqual(exception, exc)
313+
314+
# Test with a cancelled future
315+
f = Future()
316+
f.cancel()
317+
done, cancelled, result, exception = f._get_snapshot()
318+
self.assertTrue(done)
319+
self.assertTrue(cancelled)
320+
self.assertIsNone(result)
321+
self.assertIsNone(exception)
322+
323+
# Test concurrent access (basic thread safety check)
324+
f = Future()
325+
f.set_result(100)
326+
results = []
327+
328+
def get_snapshot():
329+
for _ in range(1000):
330+
snapshot = f._get_snapshot()
331+
results.append(snapshot)
332+
333+
threads = []
334+
for _ in range(4):
335+
t = threading.Thread(target=get_snapshot)
336+
threads.append(t)
337+
t.start()
338+
339+
for t in threads:
340+
t.join()
341+
342+
# All snapshots should be identical for a finished future
343+
expected = (True, False, 100, None)
344+
for result in results:
345+
self.assertEqual(result, expected)
346+
285347

286348
def setUpModule():
287349
setup_module()

0 commit comments

Comments
 (0)