Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions Lib/_pyrepl/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
from dataclasses import dataclass, field, fields
from _colorize import can_colorize, ANSIColors


from . import commands, console, input
from .utils import wlen, unbracket, disp_str
from .utils import wlen, unbracket, disp_str, gen_colors
from .trace import trace


Expand All @@ -38,8 +37,7 @@
from .types import Callback, SimpleContextManager, KeySpec, CommandName


# syntax classes:

# syntax classes
SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3)


Expand Down Expand Up @@ -144,16 +142,17 @@ class Reader:
Instance variables of note include:

* buffer:
A *list* (*not* a string at the moment :-) containing all the
characters that have been entered.
A per-character list containing all the characters that have been
entered. Does not include color information.
* console:
Hopefully encapsulates the OS dependent stuff.
* pos:
A 0-based index into 'buffer' for where the insertion point
is.
* screeninfo:
Ahem. This list contains some info needed to move the
insertion point around reasonably efficiently.
A list of screen position tuples. Each list element is a tuple
representing information on visible line length for a given line.
Allows for efficient skipping of color escape sequences.
* cxy, lxy:
the position of the insertion point in screen ...
* syntax_table:
Expand Down Expand Up @@ -316,6 +315,11 @@ def calc_screen(self) -> list[str]:
pos -= offset

prompt_from_cache = (offset and self.buffer[offset - 1] != "\n")

if self.can_colorize:
colors = list(gen_colors(self.get_unicode()))
else:
colors = None
lines = "".join(self.buffer[offset:]).split("\n")
cursor_found = False
lines_beyond_cursor = 0
Expand Down Expand Up @@ -343,7 +347,7 @@ def calc_screen(self) -> list[str]:
screeninfo.append((0, []))
pos -= line_len + 1
prompt, prompt_len = self.process_prompt(prompt)
chars, char_widths = disp_str(line)
chars, char_widths = disp_str(line, colors, offset)
wrapcount = (sum(char_widths) + prompt_len) // self.console.width
trace("wrapcount = {wrapcount}", wrapcount=wrapcount)
if wrapcount == 0 or not char_widths:
Expand Down Expand Up @@ -567,6 +571,7 @@ def insert(self, text: str | list[str]) -> None:
def update_cursor(self) -> None:
"""Move the cursor to reflect changes in self.pos"""
self.cxy = self.pos2xy()
trace("update_cursor({pos}) = {cxy}", pos=self.pos, cxy=self.cxy)
self.console.move_cursor(*self.cxy)

def after_command(self, cmd: Command) -> None:
Expand Down
123 changes: 119 additions & 4 deletions Lib/_pyrepl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,56 @@
import unicodedata
import functools

from idlelib import colorizer
from typing import cast, Iterator, Literal, Match, NamedTuple, Pattern, Self
from _colorize import ANSIColors

from .types import CharBuffer, CharWidths
from .trace import trace

ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]")
ZERO_WIDTH_BRACKET = re.compile(r"\x01.*?\x02")
ZERO_WIDTH_TRANS = str.maketrans({"\x01": "", "\x02": ""})
COLORIZE_RE: Pattern[str] = colorizer.prog
IDENTIFIER_RE: Pattern[str] = colorizer.idprog
IDENTIFIERS_AFTER = {"def", "class"}
COLORIZE_GROUP_NAME_MAP: dict[str, str] = colorizer.prog_group_name_to_tag

type ColorTag = (
Literal["KEYWORD"]
| Literal["BUILTIN"]
| Literal["COMMENT"]
| Literal["STRING"]
| Literal["DEFINITION"]
| Literal["SYNC"]
)


class Span(NamedTuple):
"""Span indexing that's inclusive on both ends."""

start: int
end: int

@classmethod
def from_re(cls, m: Match[str], group: int | str) -> Self:
re_span = m.span(group)
return cls(re_span[0], re_span[1] - 1)


class ColorSpan(NamedTuple):
span: Span
tag: ColorTag


TAG_TO_ANSI: dict[ColorTag, str] = {
"KEYWORD": ANSIColors.BOLD_BLUE,
"BUILTIN": ANSIColors.CYAN,
"COMMENT": ANSIColors.RED,
"STRING": ANSIColors.GREEN,
"DEFINITION": ANSIColors.BOLD_WHITE,
"SYNC": ANSIColors.RESET,
}


@functools.cache
Expand Down Expand Up @@ -41,25 +85,82 @@ def unbracket(s: str, including_content: bool = False) -> str:
return s.translate(ZERO_WIDTH_TRANS)


def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
r"""Decompose the input buffer into a printable variant.
def gen_colors(buffer: str) -> Iterator[ColorSpan]:
"""Returns a list of index spans to color using the given color tag.

The input `buffer` should be a valid start of a Python code block, i.e.
it cannot be a block starting in the middle of a multiline string.
"""
for match in COLORIZE_RE.finditer(buffer):
yield from gen_color_spans(match)


def gen_color_spans(re_match: Match[str]) -> Iterator[ColorSpan]:
"""Generate non-empty color spans."""
for tag, data in re_match.groupdict().items():
if not data:
continue
span = Span.from_re(re_match, tag)
tag = COLORIZE_GROUP_NAME_MAP.get(tag, tag)
yield ColorSpan(span, cast(ColorTag, tag))
if data in IDENTIFIERS_AFTER:
if name_match := IDENTIFIER_RE.match(re_match.string, span.end + 1):
span = Span.from_re(name_match, 1)
yield ColorSpan(span, "DEFINITION")


def disp_str(
buffer: str, colors: list[ColorSpan] | None = None, start_index: int = 0
) -> tuple[CharBuffer, CharWidths]:
r"""Decompose the input buffer into a printable variant with applied colors.

Returns a tuple of two lists:
- the first list is the input buffer, character by character;
- the first list is the input buffer, character by character, with color
escape codes added (while those codes contain multiple ASCII characters,
each code is considered atomic *and is attached for the corresponding
visible character*);
- the second list is the visible width of each character in the input
buffer.

Note on colors:
- The `colors` list, if provided, is partially consumed within. We're using
a list and not a generator since we need to hold onto the current
unfinished span between calls to disp_str in case of multiline strings.
- The `colors` list is computed from the start of the input block. `buffer`
is only a subset of that input block, a single line within. This is why
we need `start_index` to inform us which position is the start of `buffer`
actually within user input. This allows us to match color spans correctly.

Examples:
>>> utils.disp_str("a = 9")
(['a', ' ', '=', ' ', '9'], [1, 1, 1, 1, 1])

>>> line = "while 1:"
>>> colors = list(utils.gen_colors(line))
>>> utils.disp_str(line, colors=colors)
(['\x1b[1;34mw', 'h', 'i', 'l', 'e\x1b[0m', ' ', '1', ':'], [1, 1, 1, 1, 1, 1, 1, 1])

"""
chars: CharBuffer = []
char_widths: CharWidths = []

if not buffer:
return chars, char_widths

for c in buffer:
while colors and colors[0].span.end < start_index:
# move past irrelevant spans
colors.pop(0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this list is not big enough that it matters but it would recommend modifying this to figure out the cutting index and then slicing once


pre_color = ""
post_color = ""
if colors and colors[0].span.start < start_index:
# looks like we're continuing a previous color (e.g. a multiline str)
pre_color = TAG_TO_ANSI[colors[0].tag]

for i, c in enumerate(buffer, start_index):
if colors and colors[0].span.start == i: # new color starts now
pre_color = TAG_TO_ANSI[colors[0].tag]

if c == "\x1a": # CTRL-Z on Windows
chars.append(c)
char_widths.append(2)
Expand All @@ -73,5 +174,19 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
else:
chars.append(c)
char_widths.append(str_width(c))

if colors and colors[0].span.end == i: # current color ends now
post_color = TAG_TO_ANSI["SYNC"]
colors.pop(0)

chars[-1] = pre_color + chars[-1] + post_color

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this matters but this being in a tight loop may be a performance problem

pre_color = ""
post_color = ""

if colors and colors[0].span.start < i and colors[0].span.end > i:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach assumes the next call to disp_str() will properly handle the color continuation. If any buffer modification happens between calls, or if the next call uses different parameters, you'll get incorrect highlighting no?

Maybe this is a no problem but I would recommend to either add some defensive check or an assert

SYNC"]

# Then in the next call, verify consistency
if pending_color_span and colors and colors[0].span == pending_color_span.span:
    # Spans match, continue highlighting
    pre_color = TAG_TO_ANSI[colors[0].tag]
else:
    …

# even though the current color should be continued, reset it for now.
# the next call to `disp_str()` will revive it.
chars[-1] += TAG_TO_ANSI["SYNC"]

trace("disp_str({buffer}) = {s}, {b}", buffer=repr(buffer), s=chars, b=char_widths)
return chars, char_widths
68 changes: 66 additions & 2 deletions Lib/test/test_pyrepl/test_reader.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import itertools
import functools
import rlcompleter
from textwrap import dedent
from unittest import TestCase
from unittest.mock import MagicMock

from .support import handle_all_events, handle_events_narrow_console
from .support import ScreenEqualMixin, code_to_events
from .support import prepare_reader, prepare_console
from .support import prepare_reader, prepare_console, reader_force_colors
from _pyrepl.console import Event
from _pyrepl.reader import Reader
from _pyrepl.utils import TAG_TO_ANSI


colors = {k[0].lower(): v for k, v in TAG_TO_ANSI.items() if k != "SYNC"}
colors["z"] = TAG_TO_ANSI["SYNC"]


class TestReader(ScreenEqualMixin, TestCase):
Expand Down Expand Up @@ -123,8 +129,9 @@ def test_setpos_for_xy_simple(self):
def test_control_characters(self):
code = 'flag = "🏳️‍🌈"'
events = code_to_events(code)
reader, _ = handle_all_events(events)
reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, 'flag = "🏳️\\u200d🌈"', clean=True)
self.assert_screen_equal(reader, 'flag = {s}"🏳️\\u200d🌈"{z}'.format(**colors))

def test_setpos_from_xy_multiple_lines(self):
# fmt: off
Expand Down Expand Up @@ -355,3 +362,60 @@ def test_setpos_from_xy_for_non_printing_char(self):
reader, _ = handle_all_events(events)
reader.setpos_from_xy(8, 0)
self.assertEqual(reader.pos, 7)

def test_syntax_highlighting_basic(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend adding some extra test for edge cases and failure modes such as incomplete code blocks, syntax errors and other known failure conditions so we know how this will behave

code = dedent(
"""\
import re, sys
def funct(case: str = sys.platform) -> None:
match = re.search(
"(me)",
'''
Come on
Come on now
You know that it's time to emerge
''',
)
match case:
case "emscripten": print("on the web")
case "ios" | "android": print("on the phone")
case _: print('arms around', match.group(1))
"""
)
expected = dedent(
"""\
{k}import{z} re, sys
{a}{k}def{z} {d}funct{z}(case: {b}str{z} = sys.platform) -> {k}None{z}:
match = re.search(
{s}"(me)"{z},
{s}'''{z}
{s} Come on{z}
{s} Come on now{z}
{s} You know that it's time to emerge{z}
{s} '''{z},
)
{k}match{z} case:
{k}case{z} {s}"emscripten"{z}: {b}print{z}({s}"on the web"{z})
{k}case{z} {s}"ios"{z} | {s}"android"{z}: {b}print{z}({s}"on the phone"{z})
{k}case{z} {k}_{z}: {b}print{z}({s}'arms around'{z}, match.group(1))
"""
)
expected_sync = expected.format(a="", **colors)
events = code_to_events(code)
reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, code, clean=True)
self.assert_screen_equal(reader, expected_sync)
self.assertEqual(reader.pos, 2**7 + 2**8)
self.assertEqual(reader.cxy, (0, 14))

async_msg = "{k}async{z} ".format(**colors)
expected_async = expected.format(a=async_msg, **colors)
more_events = itertools.chain(
code_to_events(code),
[Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))] * 13,
code_to_events("async "),
)
reader, _ = handle_all_events(more_events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, expected_async)
self.assertEqual(reader.pos, 21)
self.assertEqual(reader.cxy, (6, 1))
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PyREPL now supports syntax highlighing. Contributed by Łukasz Langa.
Loading