Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Lib/_pyrepl/completing_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,7 @@ def get_stem(self) -> str:

def get_completions(self, stem: str) -> list[str]:
return []

def get_line(self):
"""Return the current line until the cursor position."""
return ''.join(self.buffer[:self.pos])
332 changes: 332 additions & 0 deletions Lib/_pyrepl/readline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@

from __future__ import annotations

import importlib
import pkgutil
import tokenize
import warnings
from io import StringIO
from contextlib import contextmanager
from dataclasses import dataclass, field
from tokenize import TokenInfo

import os
from site import gethistoryfile # type: ignore[attr-defined]
Expand Down Expand Up @@ -59,6 +65,7 @@

if TYPE_CHECKING:
from typing import Any, Mapping
from types import ModuleType


MoreLinesCallable = Callable[[str], bool]
Expand Down Expand Up @@ -132,6 +139,8 @@ def get_stem(self) -> str:
return "".join(b[p + 1 : self.pos])

def get_completions(self, stem: str) -> list[str]:
if module_completions := self.get_module_completions():
return module_completions
if len(stem) == 0 and self.more_lines is not None:
b = self.buffer
p = self.pos
Expand Down Expand Up @@ -161,6 +170,11 @@ def get_completions(self, stem: str) -> list[str]:
result.sort()
return result

def get_module_completions(self) -> list[str]:
completer = ModuleCompleter(namespace={'__package__': '_pyrepl'}) # TODO: namespace?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I wasn't sure about is how to handle relative imports in regards to __package__. In PyREPL, it is set to _pyrepl so I replicate that here but maybe we want it to be configurable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either remove the TODO or we should handle this differently

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the TODO and left a comment. Inside pyrepl, __package__ is set to _pyrepl so the module completer should use the same value when resolving relative packages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this creating one of this guys per request? Should we cache the ModuleCompleter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is, and yes we should cache it. Updated in 5c11124 (I also moved the completer to a separate file to avoid having to reorder everything when I added it to the ReadlineConfig)

Copy link
Member Author

@tomasr8 tomasr8 Apr 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also changed in f4e290a so that each Reader gets a new ModuleCompleter instance (with its own module cache)

line = self.get_line()
return completer.get_completions(line)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this raise? What do we want to do if any of the pkgutil raises for any reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't unless iter_modules or find_spec raise. Since those can call code from user-supplied finders, they can raise an arbitrary exception. I could add a simple except Exception block and simply return no completions if that happens. To the user it would look like no completions are available rather than outright crashing the repl.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in fd81999


def get_trimmed_history(self, maxlength: int) -> list[str]:
if maxlength >= 0:
cut = len(self.history) - maxlength
Expand Down Expand Up @@ -596,3 +610,321 @@ def _setup(namespace: Mapping[str, Any]) -> None:


raw_input: Callable[[object], str] | None = None


class ModuleCompleter:
"""A completer for Python import statements.

Examples:
- import <tab>
- import foo<tab>
- import foo.<tab>
- import foo as bar, baz<tab>

- from <tab>
- from foo<tab>
- from foo import <tab>
- from foo import bar<tab>
- from foo import (bar as baz, qux<tab>
"""

def __init__(self, namespace: Mapping[str, Any] | None = None):
self.namespace = namespace or {}
self._global_cache: list[str] = []
self._curr_sys_path: list[str] = sys.path[:]

def get_completions(self, line: str) -> list[str]:
"""Return the next possible import completions for 'line'."""

parser = ImportParser(line)
if not (result := parser.parse()):
return []
return self.complete(*result)

def complete(self, from_name: str | None, name: str | None) -> list[str]:
# import x.y.z<tab>
if from_name is None:
if not name:
return []
return self.complete_import(name)

# from x.y.z<tab>
if name is None:
if not from_name:
return []
return self.complete_import(from_name)

# from x.y import z<tab>
if not (module := self.import_module(from_name)):
return []

submodules = self.filter_submodules(module, name)
attributes = self.filter_attributes(module, name)
return list(set(submodules + attributes))

def complete_import(self, name: str) -> list[str]:
is_relative = name.startswith('.')
path, prefix = self.get_path_and_prefix(name)

if not is_relative and not path:
return [name for name in self.global_cache if name.startswith(prefix)]

if not (module := self.import_module(path)):
return []

submodules = self.filter_submodules(module, prefix)
if not is_relative:
return [f'{path}.{name}' for name in submodules]
return [f'.{name}' for name in submodules]

def import_module(self, path: str) -> ModuleType | None:
package = self.namespace.get('__package__')
is_relative = path.startswith('.')
if is_relative and not package:
return None
try:
module = importlib.import_module(
path,
package=package if is_relative else None)
except ImportError:
return None
return module

def filter_submodules(self, module: ModuleType, prefix: str) -> list[str]:
if not hasattr(module, '__path__'):
return []
return [name for _, name, _ in pkgutil.iter_modules(module.__path__)
if name.startswith(prefix)]

def filter_attributes(self, module: ModuleType, prefix: str) -> list[str]:
return [attr for attr in module.__dict__ if attr.startswith(prefix)]

def get_path_and_prefix(self, dotted_name: str) -> tuple[str, str]:
if '.' not in dotted_name:
return '', dotted_name
if dotted_name.startswith('.'):
stripped = dotted_name.lstrip('.')
dots = '.' * (len(dotted_name) - len(stripped))
if '.' not in stripped:
return dots, stripped
path, prefix = stripped.rsplit('.', 1)
return dots + path, prefix
path, prefix = dotted_name.rsplit('.', 1)
return path, prefix

@property
def global_cache(self) -> list[str]:
if not self._global_cache or self._curr_sys_path != sys.path:
self._curr_sys_path = sys.path[:]
self._global_cache = [
name for _, name, _ in pkgutil.iter_modules()]
return self._global_cache


class ImportParser:
"""
Parses incomplete import statements that are
suitable for autocomplete suggestions.

Examples:
- import foo -> Result(from_name=None, name='foo')
- import foo. -> Result(from_name=None, name='foo.')
- from foo -> Result(from_name='foo', name=None)
- from foo import bar -> Result(from_name='foo', name='bar')
- from .foo import ( -> Result(from_name='.foo', name='')

Note that the parser works in reverse order, starting from the
last token in the input string. This makes the parser more robust
when parsing multiple statements.
"""
_ignored_tokens = {
tokenize.INDENT, tokenize.DEDENT, tokenize.COMMENT,
tokenize.NL, tokenize.NEWLINE, tokenize.ENDMARKER
}
_keywords = {'import', 'from', 'as'}

def __init__(self, code: str):
self.code = code
tokens = []
try:
for t in tokenize.generate_tokens(StringIO(code).readline):
if t.type not in self._ignored_tokens:
tokens.append(t)
except tokenize.TokenError as e:
if 'unexpected EOF' not in str(e):
# unexpected EOF is fine, since we're parsing an
# incomplete statement, but other errors are not
# because we may not have all the tokens so it's
# safer to bail out
tokens = []
except SyntaxError:
tokens = []
self.tokens = TokenQueue(tokens[::-1])

def parse(self):
if not (res := self._parse()):
return None
return res.from_name, res.name

def _parse(self):
with self.tokens.save_state():
return self.parse_from_import()
with self.tokens.save_state():
return self.parse_import()

def parse_import(self):
if self.code.rstrip().endswith('import') and self.code.endswith(' '):
return Result(name='')
if self.tokens.peek_string(','):
name = ''
else:
if self.code.endswith(' '):
raise ParseError('parse_import')
name = self.parse_dotted_name()
if name.startswith('.'):
raise ParseError('parse_import')
while self.tokens.peek_string(','):
self.tokens.pop()
self.parse_dotted_as_name()
if self.tokens.peek_string('import'):
return Result(name=name)
raise ParseError('parse_import')

def parse_from_import(self):
if self.code.rstrip().endswith('import') and self.code.endswith(' '):
return Result(from_name=self.parse_empty_from_import(), name='')
if self.code.rstrip().endswith('from') and self.code.endswith(' '):
return Result(from_name='')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calls code.rstrip() a lot, maybe is worth setting that as an attribute or just always use self.code = code.rstrip(). What do you think?

At the very least maybe set it as a local so you don't strip twice

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it as a local since it's only called in 3 places

if self.tokens.peek_string('(') or self.tokens.peek_string(','):
return Result(from_name=self.parse_empty_from_import(), name='')
if self.code.endswith(' '):
raise ParseError('parse_from_import')
name = self.parse_dotted_name()
if '.' in name:
self.tokens.pop_string('from')
return Result(from_name=name)
if self.tokens.peek_string('from'):
return Result(from_name=name)
from_name = self.parse_empty_from_import()
return Result(from_name=from_name, name=name)

def parse_empty_from_import(self):
if self.tokens.peek_string(','):
self.tokens.pop()
self.parse_as_names()
if self.tokens.peek_string('('):
self.tokens.pop()
self.tokens.pop_string('import')
return self.parse_from()

def parse_from(self):
from_name = self.parse_dotted_name()
self.tokens.pop_string('from')
return from_name

def parse_dotted_as_name(self):
self.tokens.pop_name()
if self.tokens.peek_string('as'):
self.tokens.pop()
with self.tokens.save_state():
return self.parse_dotted_name()

def parse_dotted_name(self):
name = []
if self.tokens.peek_string('.'):
name.append('.')
self.tokens.pop()
if self.tokens.peek_name() and self.tokens.peek().string not in self._keywords:
name.append(self.tokens.pop_name())
if not name:
raise ParseError('parse_dotted_name')
while self.tokens.peek_string('.'):
name.append('.')
self.tokens.pop()
if self.tokens.peek_name() and self.tokens.peek().string not in self._keywords:
name.append(self.tokens.pop_name())
else:
break

while self.tokens.peek_string('.'):
name.append('.')
self.tokens.pop()
return ''.join(name[::-1])

def parse_as_names(self):
self.parse_as_name()
while self.tokens.peek_string(','):
self.tokens.pop()
self.parse_as_name()

def parse_as_name(self):
self.tokens.pop_name()
if self.tokens.peek_string('as'):
self.tokens.pop()
self.tokens.pop_name()


class ParseError(Exception):
pass


@dataclass(frozen=True)
class Result:
from_name: str | None = None
name: str | None = None


class TokenQueue:
"""Provides helper functions for working with a sequence of tokens."""

def __init__(self, tokens: list[TokenInfo]) -> None:
self.tokens: list[TokenInfo] = tokens
self.index: int = 0
self.stack: list[int] = []

@contextmanager
def save_state(self):
try:
self.stack.append(self.index)
yield
except ParseError:
self.index = self.stack.pop()
else:
self.stack.pop()

def __bool__(self):
return self.index < len(self.tokens)

def peek(self) -> TokenInfo | None:
if not self:
return None
return self.tokens[self.index]

def peek_name(self) -> bool:
if not (tok := self.peek()):
return False
return tok.type == tokenize.NAME

def pop_name(self) -> str:
tok = self.pop()
if tok.type != tokenize.NAME:
raise ParseError('pop_name')
return tok.string

def peek_string(self, string: str) -> bool:
if not (tok := self.peek()):
return False
return tok.string == string

def pop_string(self, string: str) -> str:
tok = self.pop()
if tok.string != string:
raise ParseError('pop_string')
return tok.string

def pop(self) -> TokenInfo:
if not self:
raise ParseError('pop')
tok = self.tokens[self.index]
self.index += 1
return tok
Loading
Loading