diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index e4a39f4d345c79..d89324b59fc850 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -812,7 +812,7 @@ def call_at(self, when, callback, *args, context=None): timer = events.TimerHandle(when, callback, args, self, context) if timer._source_traceback: del timer._source_traceback[-1] - heapq.heappush(self._scheduled, timer) + heapq.heappush(self._scheduled, (when, timer)) timer._scheduled = True return timer @@ -1959,20 +1959,21 @@ def _run_once(self): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] - for handle in self._scheduled: + for when_handle in self._scheduled: + handle = when_handle[1] if handle._cancelled: handle._scheduled = False else: - new_scheduled.append(handle) + new_scheduled.append(when_handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. - while self._scheduled and self._scheduled[0]._cancelled: + while self._scheduled and self._scheduled[0][1]._cancelled: self._timer_cancelled_count -= 1 - handle = heapq.heappop(self._scheduled) + _, handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None @@ -1980,7 +1981,7 @@ def _run_once(self): timeout = 0 elif self._scheduled: # Compute the desired timeout. - timeout = self._scheduled[0]._when - self.time() + timeout = self._scheduled[0][0] - self.time() if timeout > MAXIMUM_SELECT_TIMEOUT: timeout = MAXIMUM_SELECT_TIMEOUT elif timeout < 0: @@ -1993,13 +1994,14 @@ def _run_once(self): # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution + ready = self._ready while self._scheduled: - handle = self._scheduled[0] - if handle._when >= end_time: + when, handle = self._scheduled[0] + if when >= end_time: break - handle = heapq.heappop(self._scheduled) + heapq.heappop(self._scheduled) handle._scheduled = False - self._ready.append(handle) + ready.append(handle) # This is the only place where callbacks are actually *called*. # All other places just add them to ready. @@ -2007,9 +2009,9 @@ def _run_once(self): # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. - ntodo = len(self._ready) + ntodo = len(ready) for i in range(ntodo): - handle = self._ready.popleft() + handle = ready.popleft() if handle._cancelled: continue if self._debug: diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index c14a0bb180d79b..60fe0633934eae 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -268,7 +268,7 @@ def cb(): h = self.loop.call_later(10.0, cb) self.assertIsInstance(h, asyncio.TimerHandle) - self.assertIn(h, self.loop._scheduled) + self.assertIn((h.when(), h), self.loop._scheduled) self.assertNotIn(h, self.loop._ready) with self.assertRaises(TypeError, msg="delay must not be None"): self.loop.call_later(None, cb) @@ -378,13 +378,13 @@ def test__run_once(self): h1.cancel() self.loop._process_events = mock.Mock() - self.loop._scheduled.append(h1) - self.loop._scheduled.append(h2) + self.loop._scheduled.append((h1.when(), h1)) + self.loop._scheduled.append((h2.when(), h2)) self.loop._run_once() t = self.loop._selector.select.call_args[0][0] self.assertTrue(9.5 < t < 10.5, t) - self.assertEqual([h2], self.loop._scheduled) + self.assertEqual([(h2.when(), h2)], self.loop._scheduled) self.assertTrue(self.loop._process_events.called) def test_set_debug(self): @@ -406,7 +406,7 @@ def cb(loop): self.loop, None) self.loop._process_events = mock.Mock() - self.loop._scheduled.append(h) + self.loop._scheduled.append((h.when(), h)) self.loop._run_once() self.assertTrue(processed) @@ -486,7 +486,7 @@ def cb(): self.assertEqual(len(self.loop._scheduled), not_cancelled_count) # Ensure only uncancelled events remain scheduled - self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) + self.assertTrue(all([not x._cancelled for _, x in self.loop._scheduled])) def test_run_until_complete_type_error(self): self.assertRaises(TypeError, diff --git a/bench/call_at.py b/bench/call_at.py new file mode 100644 index 00000000000000..ac28e042846aba --- /dev/null +++ b/bench/call_at.py @@ -0,0 +1,29 @@ +import asyncio +import timeit + + +def run(): + asyncio.run(call_at()) + + +async def call_at(): + loop = asyncio.get_running_loop() + when = loop.time() + future = loop.create_future() + + def callback(): + """Callback function.""" + + def done(): + """Done function.""" + future.set_result(None) + + for _ in range(100): + when += 0.00000001 + loop.call_at(when, callback) + + loop.call_at(when, done) + await future + + +print("call_at_benchmark", timeit.timeit(run)) diff --git a/bench/timer_handle_heap.py b/bench/timer_handle_heap.py new file mode 100644 index 00000000000000..16707bfed51bfa --- /dev/null +++ b/bench/timer_handle_heap.py @@ -0,0 +1,45 @@ +from asyncio import TimerHandle +import heapq +import timeit + + +def callback(): + """This is the callback function that will be called when the timer expires.""" + + +class MockLoop: + def get_debug(self): + return False + + +loop = MockLoop() + + +def heap_tuple(): + scheduled = [] + when = 1 + + for _ in range(100): + when += 1 + handle = TimerHandle(when, callback, (), loop) + heapq.heappush(scheduled, (when, handle)) + + while scheduled: + when, handle = heapq.heappop(scheduled) + + +def heap_handle(): + scheduled = [] + when = 1 + + for _ in range(100): + when += 1 + handle = TimerHandle(when, callback, (), loop) + heapq.heappush(scheduled, handle) + + while scheduled: + handle = heapq.heappop(scheduled) + + +print("wrap when, TimerHandle in tuple", timeit.timeit(heap_tuple)) +print("bare TimerHandle", timeit.timeit(heap_handle))