-
-
Notifications
You must be signed in to change notification settings - Fork 33.2k
gh-69605: Add module autocomplete to PyREPL #129329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
b3bcd67
bcd3527
df3e8ec
48ee6ad
0917d24
589cf63
62d0b55
46ca249
75e4b55
3c13f86
8eb656f
7a2fde0
5c11124
10da15b
fd81999
8fba3d3
f4e290a
602121d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,8 +28,14 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import importlib | ||
| import pkgutil | ||
| import tokenize | ||
| import warnings | ||
| from io import StringIO | ||
| from contextlib import contextmanager | ||
| from dataclasses import dataclass, field | ||
| from tokenize import TokenInfo | ||
|
|
||
| import os | ||
| from site import gethistoryfile # type: ignore[attr-defined] | ||
|
|
@@ -59,6 +65,7 @@ | |
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Any, Mapping | ||
| from types import ModuleType | ||
|
|
||
|
|
||
| MoreLinesCallable = Callable[[str], bool] | ||
|
|
@@ -132,6 +139,8 @@ def get_stem(self) -> str: | |
| return "".join(b[p + 1 : self.pos]) | ||
|
|
||
| def get_completions(self, stem: str) -> list[str]: | ||
| if module_completions := self.get_module_completions(): | ||
| return module_completions | ||
| if len(stem) == 0 and self.more_lines is not None: | ||
| b = self.buffer | ||
| p = self.pos | ||
|
|
@@ -161,6 +170,11 @@ def get_completions(self, stem: str) -> list[str]: | |
| result.sort() | ||
| return result | ||
|
|
||
| def get_module_completions(self) -> list[str]: | ||
| completer = ModuleCompleter(namespace={'__package__': '_pyrepl'}) # TODO: namespace? | ||
|
||
| line = self.get_line() | ||
| return completer.get_completions(line) | ||
|
||
|
|
||
| def get_trimmed_history(self, maxlength: int) -> list[str]: | ||
| if maxlength >= 0: | ||
| cut = len(self.history) - maxlength | ||
|
|
@@ -596,3 +610,321 @@ def _setup(namespace: Mapping[str, Any]) -> None: | |
|
|
||
|
|
||
| raw_input: Callable[[object], str] | None = None | ||
|
|
||
|
|
||
| class ModuleCompleter: | ||
| """A completer for Python import statements. | ||
|
|
||
| Examples: | ||
| - import <tab> | ||
| - import foo<tab> | ||
| - import foo.<tab> | ||
| - import foo as bar, baz<tab> | ||
|
|
||
| - from <tab> | ||
| - from foo<tab> | ||
| - from foo import <tab> | ||
| - from foo import bar<tab> | ||
| - from foo import (bar as baz, qux<tab> | ||
| """ | ||
|
|
||
| def __init__(self, namespace: Mapping[str, Any] | None = None): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.namespace = namespace or {} | ||
| self._global_cache: list[str] = [] | ||
| self._curr_sys_path: list[str] = sys.path[:] | ||
|
|
||
| def get_completions(self, line: str) -> list[str]: | ||
| """Return the next possible import completions for 'line'.""" | ||
|
|
||
| parser = ImportParser(line) | ||
| if not (result := parser.parse()): | ||
| return [] | ||
| return self.complete(*result) | ||
|
|
||
| def complete(self, from_name: str | None, name: str | None) -> list[str]: | ||
| # import x.y.z<tab> | ||
| if from_name is None: | ||
| if not name: | ||
| return [] | ||
| return self.complete_import(name) | ||
|
|
||
| # from x.y.z<tab> | ||
| if name is None: | ||
| if not from_name: | ||
| return [] | ||
| return self.complete_import(from_name) | ||
|
|
||
| # from x.y import z<tab> | ||
| if not (module := self.import_module(from_name)): | ||
| return [] | ||
|
|
||
| submodules = self.filter_submodules(module, name) | ||
| attributes = self.filter_attributes(module, name) | ||
| return list(set(submodules + attributes)) | ||
|
|
||
| def complete_import(self, name: str) -> list[str]: | ||
| is_relative = name.startswith('.') | ||
| path, prefix = self.get_path_and_prefix(name) | ||
|
|
||
| if not is_relative and not path: | ||
| return [name for name in self.global_cache if name.startswith(prefix)] | ||
|
|
||
| if not (module := self.import_module(path)): | ||
| return [] | ||
|
|
||
| submodules = self.filter_submodules(module, prefix) | ||
| if not is_relative: | ||
| return [f'{path}.{name}' for name in submodules] | ||
| return [f'.{name}' for name in submodules] | ||
|
|
||
| def import_module(self, path: str) -> ModuleType | None: | ||
| package = self.namespace.get('__package__') | ||
| is_relative = path.startswith('.') | ||
| if is_relative and not package: | ||
| return None | ||
| try: | ||
| module = importlib.import_module( | ||
| path, | ||
| package=package if is_relative else None) | ||
| except ImportError: | ||
| return None | ||
| return module | ||
|
|
||
| def filter_submodules(self, module: ModuleType, prefix: str) -> list[str]: | ||
| if not hasattr(module, '__path__'): | ||
| return [] | ||
| return [name for _, name, _ in pkgutil.iter_modules(module.__path__) | ||
| if name.startswith(prefix)] | ||
|
|
||
| def filter_attributes(self, module: ModuleType, prefix: str) -> list[str]: | ||
| return [attr for attr in module.__dict__ if attr.startswith(prefix)] | ||
|
|
||
| def get_path_and_prefix(self, dotted_name: str) -> tuple[str, str]: | ||
| if '.' not in dotted_name: | ||
| return '', dotted_name | ||
| if dotted_name.startswith('.'): | ||
| stripped = dotted_name.lstrip('.') | ||
| dots = '.' * (len(dotted_name) - len(stripped)) | ||
| if '.' not in stripped: | ||
| return dots, stripped | ||
| path, prefix = stripped.rsplit('.', 1) | ||
| return dots + path, prefix | ||
| path, prefix = dotted_name.rsplit('.', 1) | ||
| return path, prefix | ||
|
|
||
| @property | ||
| def global_cache(self) -> list[str]: | ||
| if not self._global_cache or self._curr_sys_path != sys.path: | ||
| self._curr_sys_path = sys.path[:] | ||
| self._global_cache = [ | ||
| name for _, name, _ in pkgutil.iter_modules()] | ||
| return self._global_cache | ||
|
|
||
|
|
||
| class ImportParser: | ||
| """ | ||
| Parses incomplete import statements that are | ||
| suitable for autocomplete suggestions. | ||
|
|
||
| Examples: | ||
| - import foo -> Result(from_name=None, name='foo') | ||
| - import foo. -> Result(from_name=None, name='foo.') | ||
| - from foo -> Result(from_name='foo', name=None) | ||
| - from foo import bar -> Result(from_name='foo', name='bar') | ||
| - from .foo import ( -> Result(from_name='.foo', name='') | ||
|
|
||
| Note that the parser works in reverse order, starting from the | ||
| last token in the input string. This makes the parser more robust | ||
| when parsing multiple statements. | ||
| """ | ||
| _ignored_tokens = { | ||
| tokenize.INDENT, tokenize.DEDENT, tokenize.COMMENT, | ||
| tokenize.NL, tokenize.NEWLINE, tokenize.ENDMARKER | ||
| } | ||
| _keywords = {'import', 'from', 'as'} | ||
|
|
||
| def __init__(self, code: str): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.code = code | ||
| tokens = [] | ||
| try: | ||
| for t in tokenize.generate_tokens(StringIO(code).readline): | ||
| if t.type not in self._ignored_tokens: | ||
| tokens.append(t) | ||
| except tokenize.TokenError as e: | ||
| if 'unexpected EOF' not in str(e): | ||
| # unexpected EOF is fine, since we're parsing an | ||
| # incomplete statement, but other errors are not | ||
| # because we may not have all the tokens so it's | ||
| # safer to bail out | ||
| tokens = [] | ||
| except SyntaxError: | ||
| tokens = [] | ||
| self.tokens = TokenQueue(tokens[::-1]) | ||
|
|
||
| def parse(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if not (res := self._parse()): | ||
| return None | ||
| return res.from_name, res.name | ||
|
|
||
| def _parse(self): | ||
| with self.tokens.save_state(): | ||
| return self.parse_from_import() | ||
| with self.tokens.save_state(): | ||
| return self.parse_import() | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def parse_import(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if self.code.rstrip().endswith('import') and self.code.endswith(' '): | ||
| return Result(name='') | ||
| if self.tokens.peek_string(','): | ||
| name = '' | ||
| else: | ||
| if self.code.endswith(' '): | ||
| raise ParseError('parse_import') | ||
| name = self.parse_dotted_name() | ||
| if name.startswith('.'): | ||
| raise ParseError('parse_import') | ||
| while self.tokens.peek_string(','): | ||
| self.tokens.pop() | ||
| self.parse_dotted_as_name() | ||
| if self.tokens.peek_string('import'): | ||
| return Result(name=name) | ||
| raise ParseError('parse_import') | ||
|
|
||
| def parse_from_import(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if self.code.rstrip().endswith('import') and self.code.endswith(' '): | ||
| return Result(from_name=self.parse_empty_from_import(), name='') | ||
| if self.code.rstrip().endswith('from') and self.code.endswith(' '): | ||
| return Result(from_name='') | ||
|
||
| if self.tokens.peek_string('(') or self.tokens.peek_string(','): | ||
| return Result(from_name=self.parse_empty_from_import(), name='') | ||
| if self.code.endswith(' '): | ||
| raise ParseError('parse_from_import') | ||
| name = self.parse_dotted_name() | ||
| if '.' in name: | ||
| self.tokens.pop_string('from') | ||
| return Result(from_name=name) | ||
| if self.tokens.peek_string('from'): | ||
| return Result(from_name=name) | ||
| from_name = self.parse_empty_from_import() | ||
| return Result(from_name=from_name, name=name) | ||
|
|
||
| def parse_empty_from_import(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if self.tokens.peek_string(','): | ||
| self.tokens.pop() | ||
| self.parse_as_names() | ||
| if self.tokens.peek_string('('): | ||
| self.tokens.pop() | ||
| self.tokens.pop_string('import') | ||
| return self.parse_from() | ||
|
|
||
| def parse_from(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| from_name = self.parse_dotted_name() | ||
| self.tokens.pop_string('from') | ||
| return from_name | ||
|
|
||
| def parse_dotted_as_name(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.tokens.pop_name() | ||
| if self.tokens.peek_string('as'): | ||
| self.tokens.pop() | ||
| with self.tokens.save_state(): | ||
| return self.parse_dotted_name() | ||
|
|
||
| def parse_dotted_name(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| name = [] | ||
| if self.tokens.peek_string('.'): | ||
| name.append('.') | ||
| self.tokens.pop() | ||
| if self.tokens.peek_name() and self.tokens.peek().string not in self._keywords: | ||
| name.append(self.tokens.pop_name()) | ||
| if not name: | ||
| raise ParseError('parse_dotted_name') | ||
| while self.tokens.peek_string('.'): | ||
| name.append('.') | ||
| self.tokens.pop() | ||
| if self.tokens.peek_name() and self.tokens.peek().string not in self._keywords: | ||
| name.append(self.tokens.pop_name()) | ||
| else: | ||
| break | ||
|
|
||
| while self.tokens.peek_string('.'): | ||
| name.append('.') | ||
| self.tokens.pop() | ||
| return ''.join(name[::-1]) | ||
|
|
||
| def parse_as_names(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.parse_as_name() | ||
| while self.tokens.peek_string(','): | ||
| self.tokens.pop() | ||
| self.parse_as_name() | ||
|
|
||
| def parse_as_name(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.tokens.pop_name() | ||
| if self.tokens.peek_string('as'): | ||
| self.tokens.pop() | ||
| self.tokens.pop_name() | ||
|
|
||
|
|
||
| class ParseError(Exception): | ||
| pass | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class Result: | ||
| from_name: str | None = None | ||
| name: str | None = None | ||
|
|
||
|
|
||
| class TokenQueue: | ||
| """Provides helper functions for working with a sequence of tokens.""" | ||
|
|
||
| def __init__(self, tokens: list[TokenInfo]) -> None: | ||
| self.tokens: list[TokenInfo] = tokens | ||
| self.index: int = 0 | ||
| self.stack: list[int] = [] | ||
|
|
||
| @contextmanager | ||
| def save_state(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| try: | ||
| self.stack.append(self.index) | ||
| yield | ||
| except ParseError: | ||
| self.index = self.stack.pop() | ||
| else: | ||
| self.stack.pop() | ||
|
|
||
| def __bool__(self): | ||
tomasr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return self.index < len(self.tokens) | ||
|
|
||
| def peek(self) -> TokenInfo | None: | ||
| if not self: | ||
| return None | ||
| return self.tokens[self.index] | ||
|
|
||
| def peek_name(self) -> bool: | ||
| if not (tok := self.peek()): | ||
| return False | ||
| return tok.type == tokenize.NAME | ||
|
|
||
| def pop_name(self) -> str: | ||
| tok = self.pop() | ||
| if tok.type != tokenize.NAME: | ||
| raise ParseError('pop_name') | ||
| return tok.string | ||
|
|
||
| def peek_string(self, string: str) -> bool: | ||
| if not (tok := self.peek()): | ||
| return False | ||
| return tok.string == string | ||
|
|
||
| def pop_string(self, string: str) -> str: | ||
| tok = self.pop() | ||
| if tok.string != string: | ||
| raise ParseError('pop_string') | ||
| return tok.string | ||
|
|
||
| def pop(self) -> TokenInfo: | ||
| if not self: | ||
| raise ParseError('pop') | ||
| tok = self.tokens[self.index] | ||
| self.index += 1 | ||
| return tok | ||
Uh oh!
There was an error while loading. Please reload this page.