Skip to content
Merged
108 changes: 108 additions & 0 deletions Lib/_pyrepl/base_eventqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright 2000-2008 Michael Hudson-Doyle <[email protected]>
# Armin Rigo
#
# All Rights Reserved
#
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose is hereby granted without fee,
# provided that the above copyright notice appear in all copies and
# that both that copyright notice and this permission notice appear in
# supporting documentation.
#
# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

"""
OS-independent base for an event and VT sequence scanner
See unix_eventqueue and windows_eventqueue for subclasses.
"""

from collections import deque

from . import keymap
from .console import Event
from .trace import trace

class BaseEventQueue:
def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None:
self.compiled_keymap = keymap.compile_keymap(keymap_dict)
self.keymap = self.compiled_keymap
trace("keymap {k!r}", k=self.keymap)
self.encoding = encoding
self.events: deque[Event] = deque()
self.buf = bytearray()

def get(self) -> Event | None:
"""
Retrieves the next event from the queue.
"""
if self.events:
return self.events.popleft()
else:
return None

def empty(self) -> bool:
"""
Checks if the queue is empty.
"""
return not self.events

def flush_buf(self) -> bytearray:
"""
Flushes the buffer and returns its contents.
"""
old = self.buf
self.buf = bytearray()
return old

def insert(self, event: Event) -> None:
"""
Inserts an event into the queue.
"""
trace('added event {event}', event=event)
self.events.append(event)

def push(self, char: int | bytes) -> None:
"""
Processes a character by updating the buffer and handling special key mappings.
"""
ord_char = char if isinstance(char, int) else ord(char)
char = bytes(bytearray((ord_char,)))
self.buf.append(ord_char)
if char in self.keymap:
if self.keymap is self.compiled_keymap:
# sanity check, buffer is empty when a special key comes
assert len(self.buf) == 1
k = self.keymap[char]
trace('found map {k!r}', k=k)
if isinstance(k, dict):
self.keymap = k
else:
self.insert(Event('key', k, self.flush_buf()))
self.keymap = self.compiled_keymap

elif self.buf and self.buf[0] == 27: # escape
# escape sequence not recognized by our keymap: propagate it
# outside so that i can be recognized as an M-... key (see also
# the docstring in keymap.py
trace('unrecognized escape sequence, propagating...')
self.keymap = self.compiled_keymap
self.insert(Event('key', '\033', bytearray(b'\033')))
for _c in self.flush_buf()[1:]:
self.push(_c)

else:
try:
decoded = bytes(self.buf).decode(self.encoding)
except UnicodeError:
return
else:
self.insert(Event('key', decoded, self.flush_buf()))
self.keymap = self.compiled_keymap
86 changes: 5 additions & 81 deletions Lib/_pyrepl/unix_eventqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

from collections import deque

from . import keymap
from .console import Event
from . import curses
from .trace import trace
from .base_eventqueue import BaseEventQueue
from termios import tcgetattr, VERASE
import os

Expand Down Expand Up @@ -70,83 +67,10 @@ def get_terminal_keycodes() -> dict[bytes, str]:
keycodes.update(CTRL_ARROW_KEYCODES)
return keycodes

class EventQueue:
class EventQueue(BaseEventQueue):
def __init__(self, fd: int, encoding: str) -> None:
self.keycodes = get_terminal_keycodes()
keycodes = get_terminal_keycodes()
if os.isatty(fd):
backspace = tcgetattr(fd)[6][VERASE]
self.keycodes[backspace] = "backspace"
self.compiled_keymap = keymap.compile_keymap(self.keycodes)
self.keymap = self.compiled_keymap
trace("keymap {k!r}", k=self.keymap)
self.encoding = encoding
self.events: deque[Event] = deque()
self.buf = bytearray()

def get(self) -> Event | None:
"""
Retrieves the next event from the queue.
"""
if self.events:
return self.events.popleft()
else:
return None

def empty(self) -> bool:
"""
Checks if the queue is empty.
"""
return not self.events

def flush_buf(self) -> bytearray:
"""
Flushes the buffer and returns its contents.
"""
old = self.buf
self.buf = bytearray()
return old

def insert(self, event: Event) -> None:
"""
Inserts an event into the queue.
"""
trace('added event {event}', event=event)
self.events.append(event)

def push(self, char: int | bytes) -> None:
"""
Processes a character by updating the buffer and handling special key mappings.
"""
ord_char = char if isinstance(char, int) else ord(char)
char = bytes(bytearray((ord_char,)))
self.buf.append(ord_char)
if char in self.keymap:
if self.keymap is self.compiled_keymap:
#sanity check, buffer is empty when a special key comes
assert len(self.buf) == 1
k = self.keymap[char]
trace('found map {k!r}', k=k)
if isinstance(k, dict):
self.keymap = k
else:
self.insert(Event('key', k, self.flush_buf()))
self.keymap = self.compiled_keymap

elif self.buf and self.buf[0] == 27: # escape
# escape sequence not recognized by our keymap: propagate it
# outside so that i can be recognized as an M-... key (see also
# the docstring in keymap.py
trace('unrecognized escape sequence, propagating...')
self.keymap = self.compiled_keymap
self.insert(Event('key', '\033', bytearray(b'\033')))
for _c in self.flush_buf()[1:]:
self.push(_c)

else:
try:
decoded = bytes(self.buf).decode(self.encoding)
except UnicodeError:
return
else:
self.insert(Event('key', decoded, self.flush_buf()))
self.keymap = self.compiled_keymap
keycodes[backspace] = "backspace"
BaseEventQueue.__init__(self, encoding, keycodes)
67 changes: 58 additions & 9 deletions Lib/_pyrepl/windows_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from .console import Event, Console
from .trace import trace
from .utils import wlen
from .windows_eventqueue import EventQueue

try:
from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined]
Expand Down Expand Up @@ -94,7 +95,9 @@ def __init__(self, err: int | None, descr: str | None = None) -> None:
0x83: "f20", # VK_F20
}

# Console escape codes: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
# Virtual terminal output sequences
# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#output-sequences
# Check `windows_eventqueue.py` for input sequences
ERASE_IN_LINE = "\x1b[K"
MOVE_LEFT = "\x1b[{}D"
MOVE_RIGHT = "\x1b[{}C"
Expand All @@ -110,6 +113,12 @@ def __init__(self, err: int | None, descr: str | None = None) -> None:
class _error(Exception):
pass

def _supports_vt():
try:
import nt
return nt._supports_virtual_terminal()
except (ImportError, AttributeError):
return False

class WindowsConsole(Console):
def __init__(
Expand All @@ -121,17 +130,29 @@ def __init__(
):
super().__init__(f_in, f_out, term, encoding)

self.__vt_support = _supports_vt()

if self.__vt_support:
trace('console supports virtual terminal')

# Save original console modes so we can recover on cleanup.
original_input_mode = DWORD()
GetConsoleMode(InHandle, original_input_mode)
trace(f'saved original input mode 0x{original_input_mode.value:x}')
self.__original_input_mode = original_input_mode.value

SetConsoleMode(
OutHandle,
ENABLE_WRAP_AT_EOL_OUTPUT
| ENABLE_PROCESSED_OUTPUT
| ENABLE_VIRTUAL_TERMINAL_PROCESSING,
)

self.screen: list[str] = []
self.width = 80
self.height = 25
self.__offset = 0
self.event_queue: deque[Event] = deque()
self.event_queue = EventQueue(encoding)
try:
self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined]
except ValueError:
Expand Down Expand Up @@ -295,6 +316,12 @@ def _enable_blinking(self):
def _disable_blinking(self):
self.__write("\x1b[?12l")

def _enable_bracketed_paste(self) -> None:
self.__write("\x1b[?2004h")

def _disable_bracketed_paste(self) -> None:
self.__write("\x1b[?2004l")

def __write(self, text: str) -> None:
if "\x1a" in text:
text = ''.join(["^Z" if x == '\x1a' else x for x in text])
Expand Down Expand Up @@ -324,8 +351,15 @@ def prepare(self) -> None:
self.__gone_tall = 0
self.__offset = 0

if self.__vt_support:
SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT)
self._enable_bracketed_paste()

def restore(self) -> None:
pass
if self.__vt_support:
# Recover to original mode before running REPL
self._disable_bracketed_paste()
SetConsoleMode(InHandle, self.__original_input_mode)

def _move_relative(self, x: int, y: int) -> None:
"""Moves relative to the current posxy"""
Expand All @@ -346,7 +380,7 @@ def move_cursor(self, x: int, y: int) -> None:
raise ValueError(f"Bad cursor position {x}, {y}")

if y < self.__offset or y >= self.__offset + self.height:
self.event_queue.insert(0, Event("scroll", ""))
self.event_queue.insert(Event("scroll", ""))
else:
self._move_relative(x, y)
self.posxy = x, y
Expand Down Expand Up @@ -394,10 +428,8 @@ def get_event(self, block: bool = True) -> Event | None:
"""Return an Event instance. Returns None if |block| is false
and there is no event pending, otherwise waits for the
completion of an event."""
if self.event_queue:
return self.event_queue.pop()

while True:
while self.event_queue.empty():
rec = self._read_input(block)
if rec is None:
return None
Expand Down Expand Up @@ -428,20 +460,25 @@ def get_event(self, block: bool = True) -> Event | None:
key = f"ctrl {key}"
elif key_event.dwControlKeyState & ALT_ACTIVE:
# queue the key, return the meta command
self.event_queue.insert(0, Event(evt="key", data=key, raw=key))
self.event_queue.insert(Event(evt="key", data=key, raw=key))
return Event(evt="key", data="\033") # keymap.py uses this for meta
return Event(evt="key", data=key, raw=key)
if block:
continue

return None
elif self.__vt_support:
# If virtual terminal is enabled, scanning VT sequences
self.event_queue.push(rec.Event.KeyEvent.uChar.UnicodeChar)
continue

if key_event.dwControlKeyState & ALT_ACTIVE:
# queue the key, return the meta command
self.event_queue.insert(0, Event(evt="key", data=key, raw=raw_key))
self.event_queue.insert(Event(evt="key", data=key, raw=raw_key))
return Event(evt="key", data="\033") # keymap.py uses this for meta

return Event(evt="key", data=key, raw=raw_key)
return self.event_queue.get()

def push_char(self, char: int | bytes) -> None:
"""
Expand Down Expand Up @@ -563,6 +600,13 @@ class INPUT_RECORD(Structure):
MOUSE_EVENT = 0x02
WINDOW_BUFFER_SIZE_EVENT = 0x04

ENABLE_PROCESSED_INPUT = 0x0001
ENABLE_LINE_INPUT = 0x0002
ENABLE_ECHO_INPUT = 0x0004
ENABLE_MOUSE_INPUT = 0x0010
ENABLE_INSERT_MODE = 0x0020
ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200

ENABLE_PROCESSED_OUTPUT = 0x01
ENABLE_WRAP_AT_EOL_OUTPUT = 0x02
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04
Expand Down Expand Up @@ -594,6 +638,10 @@ class INPUT_RECORD(Structure):
]
ScrollConsoleScreenBuffer.restype = BOOL

GetConsoleMode = _KERNEL32.GetConsoleMode
GetConsoleMode.argtypes = [HANDLE, POINTER(DWORD)]
GetConsoleMode.restype = BOOL

SetConsoleMode = _KERNEL32.SetConsoleMode
SetConsoleMode.argtypes = [HANDLE, DWORD]
SetConsoleMode.restype = BOOL
Expand All @@ -620,6 +668,7 @@ def _win_only(*args, **kwargs):
GetStdHandle = _win_only
GetConsoleScreenBufferInfo = _win_only
ScrollConsoleScreenBuffer = _win_only
GetConsoleMode = _win_only
SetConsoleMode = _win_only
ReadConsoleInput = _win_only
GetNumberOfConsoleInputEvents = _win_only
Expand Down
Loading
Loading