Skip to content
17 changes: 7 additions & 10 deletions Lib/asyncio/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,22 +351,19 @@ def _set_concurrent_future_state(concurrent, source):
def _copy_future_state(source, dest):
"""Internal helper to copy state from another Future.

The other Future may be a concurrent.futures.Future.
The other Future must be a concurrent.futures.Future.
"""
assert source.done()
if dest.cancelled():
return
assert not dest.done()
if source.cancelled():
done, cancelled, result, exception = source._get_snapshot()
assert done
if cancelled:
dest.cancel()
elif exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
exception = source.exception()
if exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
result = source.result()
dest.set_result(result)

dest.set_result(result)

def _chain_future(source, destination):
"""Chain two futures so that when one completes, so does the other.
Expand Down
27 changes: 27 additions & 0 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,33 @@ def set_exception(self, exception):
self._condition.notify_all()
self._invoke_callbacks()

def _get_snapshot(self):
"""Get a snapshot of the future's current state.

This method atomically retrieves the state in one lock acquisition,
which is significantly faster than multiple method calls.

Returns:
Tuple of (done, cancelled, result, exception)
- done: True if the future is done (cancelled or finished)
- cancelled: True if the future was cancelled
- result: The result if available and not cancelled
- exception: The exception if available and not cancelled
"""
# Fast path: check if already finished without lock
if self._state == FINISHED:
return True, False, self._result, self._exception

# Need lock for other states since they can change
with self._condition:
# We have to check the state again after acquiring the lock
# because it may have changed in the meantime.
if self._state == FINISHED:
return True, False, self._result, self._exception
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
return True, True, None, None
return False, False, None, None

__class_getitem__ = classmethod(types.GenericAlias)

class Executor(object):
Expand Down
58 changes: 54 additions & 4 deletions Lib/test/test_asyncio/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,23 +413,23 @@ def func_repr(func):
def test_copy_state(self):
from asyncio.futures import _copy_future_state

f = self._new_future(loop=self.loop)
f = concurrent.futures.Future()
f.set_result(10)

newf = self._new_future(loop=self.loop)
_copy_future_state(f, newf)
self.assertTrue(newf.done())
self.assertEqual(newf.result(), 10)

f_exception = self._new_future(loop=self.loop)
f_exception = concurrent.futures.Future()
f_exception.set_exception(RuntimeError())

newf_exception = self._new_future(loop=self.loop)
_copy_future_state(f_exception, newf_exception)
self.assertTrue(newf_exception.done())
self.assertRaises(RuntimeError, newf_exception.result)

f_cancelled = self._new_future(loop=self.loop)
f_cancelled = concurrent.futures.Future()
f_cancelled.cancel()

newf_cancelled = self._new_future(loop=self.loop)
Expand All @@ -441,7 +441,7 @@ def test_copy_state(self):
except BaseException as e:
f_exc = e

f_conexc = self._new_future(loop=self.loop)
f_conexc = concurrent.futures.Future()
f_conexc.set_exception(f_exc)

newf_conexc = self._new_future(loop=self.loop)
Expand All @@ -454,6 +454,56 @@ def test_copy_state(self):
newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__))
self.assertEqual(newf_tb.count('raise concurrent.futures.InvalidStateError'), 1)

def test_copy_state_from_concurrent_futures(self):
"""Test _copy_future_state from concurrent.futures.Future.

This tests the optimized path using _get_snapshot when available.
"""
from asyncio.futures import _copy_future_state

# Test with a result
f_concurrent = concurrent.futures.Future()
f_concurrent.set_result(42)
f_asyncio = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent, f_asyncio)
self.assertTrue(f_asyncio.done())
self.assertEqual(f_asyncio.result(), 42)

# Test with an exception
f_concurrent_exc = concurrent.futures.Future()
f_concurrent_exc.set_exception(ValueError("test exception"))
f_asyncio_exc = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent_exc, f_asyncio_exc)
self.assertTrue(f_asyncio_exc.done())
with self.assertRaises(ValueError) as cm:
f_asyncio_exc.result()
self.assertEqual(str(cm.exception), "test exception")

# Test with cancelled state
f_concurrent_cancelled = concurrent.futures.Future()
f_concurrent_cancelled.cancel()
f_asyncio_cancelled = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled)
self.assertTrue(f_asyncio_cancelled.cancelled())

# Test that destination already cancelled prevents copy
f_concurrent_result = concurrent.futures.Future()
f_concurrent_result.set_result(10)
f_asyncio_precancelled = self._new_future(loop=self.loop)
f_asyncio_precancelled.cancel()
_copy_future_state(f_concurrent_result, f_asyncio_precancelled)
self.assertTrue(f_asyncio_precancelled.cancelled())

# Test exception type conversion
f_concurrent_invalid = concurrent.futures.Future()
f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid"))
f_asyncio_invalid = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent_invalid, f_asyncio_invalid)
self.assertTrue(f_asyncio_invalid.done())
with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm:
f_asyncio_invalid.result()
self.assertEqual(str(cm.exception), "invalid")

def test_iter(self):
fut = self._new_future(loop=self.loop)

Expand Down
62 changes: 62 additions & 0 deletions Lib/test/test_concurrent_futures/test_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,68 @@ def test_multiple_set_exception(self):

self.assertEqual(f.exception(), e)

def test_get_snapshot(self):
"""Test the _get_snapshot method for atomic state retrieval."""
# Test with a pending future
f = Future()
done, cancelled, result, exception = f._get_snapshot()
self.assertFalse(done)
self.assertFalse(cancelled)
self.assertIsNone(result)
self.assertIsNone(exception)

# Test with a finished future (successful result)
f = Future()
f.set_result(42)
done, cancelled, result, exception = f._get_snapshot()
self.assertTrue(done)
self.assertFalse(cancelled)
self.assertEqual(result, 42)
self.assertIsNone(exception)

# Test with a finished future (exception)
f = Future()
exc = ValueError("test error")
f.set_exception(exc)
done, cancelled, result, exception = f._get_snapshot()
self.assertTrue(done)
self.assertFalse(cancelled)
self.assertIsNone(result)
self.assertEqual(exception, exc)

# Test with a cancelled future
f = Future()
f.cancel()
done, cancelled, result, exception = f._get_snapshot()
self.assertTrue(done)
self.assertTrue(cancelled)
self.assertIsNone(result)
self.assertIsNone(exception)

# Test concurrent access (basic thread safety check)
f = Future()
f.set_result(100)
results = []

def get_snapshot():
for _ in range(1000):
snapshot = f._get_snapshot()
results.append(snapshot)

threads = []
for _ in range(4):
t = threading.Thread(target=get_snapshot)
threads.append(t)
t.start()

for t in threads:
t.join()

# All snapshots should be identical for a finished future
expected = (True, False, 100, None)
for result in results:
self.assertEqual(result, expected)


def setUpModule():
setup_module()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add ``_get_snapshot()`` method to :class:`concurrent.futures.Future` to
atomically retrieve all future state in a single lock acquisition. This speeds
up :mod:`asyncio`'s ``_copy_future_state()`` by up to 4x when transferring state
from :class:`concurrent.futures.Future` to :class:`asyncio.Future`. Patch by J. Nick Koston.
Loading