Skip to content

Commit 92f188a

Browse files
Improve code completion performance (ThreadedCompleted)
Fixes in the `generator_to_async_generator` (back-pressure and performance improvements) and `ThreadedCompleter`, where this is used.
1 parent fcbc218 commit 92f188a

File tree

3 files changed

+152
-27
lines changed

3 files changed

+152
-27
lines changed

src/prompt_toolkit/completion/base.py

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
"""
22
"""
33
from abc import ABCMeta, abstractmethod
4-
from typing import AsyncGenerator, Callable, Iterable, Optional, Sequence
4+
from typing import AsyncGenerator, Callable, Iterable, List, Optional, Sequence
55

66
from prompt_toolkit.document import Document
7-
from prompt_toolkit.eventloop import generator_to_async_generator
7+
from prompt_toolkit.eventloop import (
8+
aclosing,
9+
generator_to_async_generator,
10+
get_event_loop,
11+
)
812
from prompt_toolkit.filters import FilterOrBool, to_filter
913
from prompt_toolkit.formatted_text import AnyFormattedText, StyleAndTextTuples
1014

@@ -224,10 +228,61 @@ async def get_completions_async(
224228
"""
225229
Asynchronous generator of completions.
226230
"""
227-
async for completion in generator_to_async_generator(
228-
lambda: self.completer.get_completions(document, complete_event)
229-
):
230-
yield completion
231+
# NOTE: Right now, we are consuming the `get_completions` generator in
232+
# a synchronous background thread, then passing the results one
233+
# at a time over a queue, and consuming this queue in the main
234+
# thread (that's what `generator_to_async_generator` does). That
235+
# means that if the completer is *very* slow, we'll be showing
236+
# completions in the UI once they are computed.
237+
238+
# It's very tempting to replace this implementation with the
239+
# commented code below for several reasons:
240+
241+
# - `generator_to_async_generator` is not perfect and hard to get
242+
# right. It's a lot of complexity for little gain. The
243+
# implementation needs a huge buffer for it to be efficient
244+
# when there are many completions (like 50k+).
245+
# - Normally, a completer is supposed to be fast, users can have
246+
# "complete while typing" enabled, and want to see the
247+
# completions within a second. Handling one completion at a
248+
# time, and rendering once we get it here doesn't make any
249+
# sense if this is quick anyway.
250+
# - Completers like `FuzzyCompleter` prepare all completions
251+
# anyway so that they can be sorted by accuracy before they are
252+
# yielded. At the point that we start yielding completions
253+
# here, we already have all completions.
254+
# - The `Buffer` class has complex logic to invalidate the UI
255+
# while it is consuming the completions. We don't want to
256+
# invalidate the UI for every completion (if there are many),
257+
# but we want to do it often enough so that completions are
258+
# being displayed while they are produced.
259+
260+
# We keep the current behavior mainly for backward-compatibility.
261+
# Similarly, it would be better for this function to not return
262+
# an async generator, but simply be a coroutine that returns a
263+
# list of `Completion` objects, containing all completions at
264+
# once.
265+
266+
# Note that this argument doesn't mean we shouldn't use
267+
# `ThreadedCompleter`. It still makes sense to produce
268+
# completions in a background thread, because we don't want to
269+
# freeze the UI while the user is typing. But sending the
270+
# completions one at a time to the UI maybe isn't worth it.
271+
272+
# def get_all_in_thread() -> List[Completion]:
273+
# return list(self.get_completions(document, complete_event))
274+
275+
# completions = await get_event_loop().run_in_executor(None, get_all_in_thread)
276+
# for completion in completions:
277+
# yield completion
278+
279+
async with aclosing(
280+
generator_to_async_generator(
281+
lambda: self.completer.get_completions(document, complete_event)
282+
)
283+
) as async_generator:
284+
async for completion in async_generator:
285+
yield completion
231286

232287
def __repr__(self) -> str:
233288
return f"ThreadedCompleter({self.completer!r})"
@@ -306,10 +361,11 @@ async def get_completions_async(
306361

307362
# Get all completions in a non-blocking way.
308363
if self.filter():
309-
async for item in self.completer.get_completions_async(
310-
document, complete_event
311-
):
312-
yield item
364+
async with aclosing(
365+
self.completer.get_completions_async(document, complete_event)
366+
) as async_generator:
367+
async for item in async_generator:
368+
yield item
313369

314370

315371
class _MergedCompleter(Completer):
@@ -333,8 +389,11 @@ async def get_completions_async(
333389

334390
# Get all completions from the other completers in a non-blocking way.
335391
for completer in self.completers:
336-
async for item in completer.get_completions_async(document, complete_event):
337-
yield item
392+
async with aclosing(
393+
completer.get_completions_async(document, complete_event)
394+
) as async_generator:
395+
async for item in async_generator:
396+
yield item
338397

339398

340399
def merge_completers(

src/prompt_toolkit/eventloop/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .async_generator import generator_to_async_generator
1+
from .async_generator import aclosing, generator_to_async_generator
22
from .inputhook import (
33
InputHookContext,
44
InputHookSelector,
@@ -15,6 +15,7 @@
1515
__all__ = [
1616
# Async generator
1717
"generator_to_async_generator",
18+
"aclosing",
1819
# Utils.
1920
"run_in_executor_with_context",
2021
"call_soon_threadsafe",
Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,62 @@
11
"""
22
Implementation for async generators.
33
"""
4-
from asyncio import Queue
5-
from typing import AsyncGenerator, Callable, Iterable, TypeVar, Union
4+
from contextlib import asynccontextmanager
5+
from queue import Empty, Full, Queue
6+
from threading import Event
7+
from typing import (
8+
TYPE_CHECKING,
9+
AsyncGenerator,
10+
Awaitable,
11+
Callable,
12+
Iterable,
13+
TypeVar,
14+
Union,
15+
)
616

717
from .utils import get_event_loop, run_in_executor_with_context
818

919
__all__ = [
20+
"aclosing",
1021
"generator_to_async_generator",
1122
]
1223

1324

25+
if TYPE_CHECKING:
26+
# Thanks: https://github.com/python/typeshed/blob/main/stdlib/contextlib.pyi
27+
from typing_extensions import Protocol
28+
29+
class _SupportsAclose(Protocol):
30+
def aclose(self) -> Awaitable[object]:
31+
...
32+
33+
_SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose)
34+
35+
36+
@asynccontextmanager
37+
async def aclosing(
38+
thing: "_SupportsAcloseT",
39+
) -> AsyncGenerator["_SupportsAcloseT", None]:
40+
"Similar to `contextlib.aclosing`, in Python 3.10."
41+
try:
42+
yield thing
43+
finally:
44+
await thing.aclose()
45+
46+
47+
# By default, choose a buffer size that's a good balance between having enough
48+
# throughput, but not consuming too much memory. We use this to consume a sync
49+
# generator of completions as an async generator. If the queue size is very
50+
# small (like 1), consuming the completions goes really slow (when there are a
51+
# lot of items). If the queue size would be unlimited or too big, this can
52+
# cause overconsumption of memory, and cause CPU time spent producing items
53+
# that are no longer needed (if the consumption of the async generator stops at
54+
# some point). We need a fixed size in order to get some back pressure from the
55+
# async consumer to the sync producer. We choose 1000 by default here. If we
56+
# have around 50k completions, measurements show that 1000 is still
57+
# significantly faster than a buffer of 100.
58+
DEFAULT_BUFFER_SIZE: int = 1000
59+
1460
_T = TypeVar("_T")
1561

1662

@@ -19,7 +65,8 @@ class _Done:
1965

2066

2167
async def generator_to_async_generator(
22-
get_iterable: Callable[[], Iterable[_T]]
68+
get_iterable: Callable[[], Iterable[_T]],
69+
buffer_size: int = DEFAULT_BUFFER_SIZE,
2370
) -> AsyncGenerator[_T, None]:
2471
"""
2572
Turn a generator or iterable into an async generator.
@@ -28,10 +75,12 @@ async def generator_to_async_generator(
2875
2976
:param get_iterable: Function that returns a generator or iterable when
3077
called.
78+
:param buffer_size: Size of the queue between the async consumer and the
79+
synchronous generator that produces items.
3180
"""
3281
quitting = False
33-
_done = _Done()
34-
q: Queue[Union[_T, _Done]] = Queue()
82+
# NOTE: We are limiting the queue size in order to have back-pressure.
83+
q: Queue[Union[_T, _Done]] = Queue(maxsize=buffer_size)
3584
loop = get_event_loop()
3685

3786
def runner() -> None:
@@ -44,19 +93,38 @@ def runner() -> None:
4493
# When this async generator was cancelled (closed), stop this
4594
# thread.
4695
if quitting:
47-
break
48-
49-
loop.call_soon_threadsafe(q.put_nowait, item)
96+
return
97+
98+
while True:
99+
try:
100+
q.put(item, timeout=1)
101+
except Full:
102+
if quitting:
103+
return
104+
continue
105+
else:
106+
break
50107

51108
finally:
52-
loop.call_soon_threadsafe(q.put_nowait, _done)
109+
while True:
110+
try:
111+
q.put(_Done(), timeout=1)
112+
except Full:
113+
if quitting:
114+
return
115+
continue
116+
else:
117+
break
53118

54119
# Start background thread.
55120
runner_f = run_in_executor_with_context(runner)
56121

57122
try:
58123
while True:
59-
item = await q.get()
124+
try:
125+
item = q.get_nowait()
126+
except Empty:
127+
item = await loop.run_in_executor(None, q.get)
60128
if isinstance(item, _Done):
61129
break
62130
else:
@@ -67,8 +135,5 @@ def runner() -> None:
67135
quitting = True
68136

69137
# Wait for the background thread to finish. (should happen right after
70-
# the next item is yielded). If we don't do this, and the event loop
71-
# gets closed before the runner is done, then we'll get a
72-
# `RuntimeError: Event loop is closed` exception printed to stdout that
73-
# we can't handle.
138+
# the last item is yielded).
74139
await runner_f

0 commit comments

Comments
 (0)