Skip to content

Commit fba84aa

Browse files
committed
Use _WindowsConsoleIO for output
1 parent d1289af commit fba84aa

File tree

2 files changed

+61
-57
lines changed

2 files changed

+61
-57
lines changed

Lib/_pyrepl/windows_console.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import os
2323
import sys
2424

25+
from _io import _WindowsConsoleIO
2526
from abc import ABC, abstractmethod
2627
from collections import deque
2728
from dataclasses import dataclass, field
@@ -125,6 +126,7 @@ def __init__(
125126
self.height = 25
126127
self.__offset = 0
127128
self.event_queue: deque[Event] = deque()
129+
self.out = _WindowsConsoleIO(self.output_fd, 'w')
128130

129131
def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None:
130132
"""
@@ -275,8 +277,9 @@ def _disable_blinking(self):
275277
self.__write("\x1b[?12l")
276278

277279
def __write(self, text: str) -> None:
278-
os.write(self.output_fd, text.encode(self.encoding, "replace"))
279-
280+
self.out.write(text.encode(self.encoding, "replace"))
281+
self.out.flush()
282+
280283
@property
281284
def screen_xy(self) -> tuple[int, int]:
282285
info = CONSOLE_SCREEN_BUFFER_INFO()

Lib/test/test_pyrepl/test_windows_console.py

Lines changed: 56 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
pass
2424

2525

26-
@patch("os.write")
26+
# @patch("os.write")
2727
@unittest.skipIf(sys.platform != "win32", "No Unix event queue on Windows")
2828
class WindowsConsoleTests(TestCase):
2929
def console(self, events, **kwargs) -> Console:
@@ -33,6 +33,7 @@ def console(self, events, **kwargs) -> Console:
3333
console._hide_cursor = MagicMock()
3434
console._show_cursor = MagicMock()
3535
console._getscrollbacksize = MagicMock(42)
36+
console.out.write = MagicMock()
3637

3738
height = kwargs.get("height", 25)
3839
width = kwargs.get("width", 80)
@@ -55,31 +56,31 @@ def handle_events_short(self, events):
5556
def handle_events_height_3(self, events):
5657
return self.handle_events(events, height=3)
5758

58-
def test_simple_addition(self, _os_write):
59+
def test_simple_addition(self):
5960
code = "12+34"
6061
events = code_to_events(code)
6162
_, con = self.handle_events(events)
62-
_os_write.assert_any_call(ANY, b"1")
63-
_os_write.assert_any_call(ANY, b"2")
64-
_os_write.assert_any_call(ANY, b"+")
65-
_os_write.assert_any_call(ANY, b"3")
66-
_os_write.assert_any_call(ANY, b"4")
63+
con.out.write.assert_any_call(b"1")
64+
con.out.write.assert_any_call(b"2")
65+
con.out.write.assert_any_call(b"+")
66+
con.out.write.assert_any_call(b"3")
67+
con.out.write.assert_any_call(b"4")
6768
con.restore()
6869

69-
def test_wrap(self, _os_write):
70+
def test_wrap(self):
7071
code = "12+34"
7172
events = code_to_events(code)
7273
_, con = self.handle_events_narrow(events)
73-
_os_write.assert_any_call(ANY, b"1")
74-
_os_write.assert_any_call(ANY, b"2")
75-
_os_write.assert_any_call(ANY, b"+")
76-
_os_write.assert_any_call(ANY, b"3")
77-
_os_write.assert_any_call(ANY, b"\\")
78-
_os_write.assert_any_call(ANY, b"\n")
79-
_os_write.assert_any_call(ANY, b"4")
74+
con.out.write.assert_any_call(b"1")
75+
con.out.write.assert_any_call(b"2")
76+
con.out.write.assert_any_call(b"+")
77+
con.out.write.assert_any_call(b"3")
78+
con.out.write.assert_any_call(b"\\")
79+
con.out.write.assert_any_call(b"\n")
80+
con.out.write.assert_any_call(b"4")
8081
con.restore()
8182

82-
def test_resize_wider(self, _os_write):
83+
def test_resize_wider(self):
8384
code = "1234567890"
8485
events = code_to_events(code)
8586
reader, console = self.handle_events_narrow(events)
@@ -101,13 +102,13 @@ def same_console(events):
101102
prepare_console=same_console,
102103
)
103104

104-
_os_write.assert_any_call(ANY, self.move_right(2))
105-
_os_write.assert_any_call(ANY, self.move_up(2))
106-
_os_write.assert_any_call(ANY, b"567890")
105+
con.out.write.assert_any_call(self.move_right(2))
106+
con.out.write.assert_any_call(self.move_up(2))
107+
con.out.write.assert_any_call(b"567890")
107108

108109
con.restore()
109110

110-
def test_resize_narrower(self, _os_write):
111+
def test_resize_narrower(self):
111112
code = "1234567890"
112113
events = code_to_events(code)
113114
reader, console = self.handle_events(events)
@@ -129,22 +130,22 @@ def same_console(events):
129130
prepare_console=same_console,
130131
)
131132

132-
_os_write.assert_any_call(ANY, b"456\\")
133-
_os_write.assert_any_call(ANY, b"789\\")
133+
con.out.write.assert_any_call(b"456\\")
134+
con.out.write.assert_any_call(b"789\\")
134135

135136
con.restore()
136137

137-
def test_cursor_left(self, _os_write):
138+
def test_cursor_left(self):
138139
code = "1"
139140
events = itertools.chain(
140141
code_to_events(code),
141142
[Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
142143
)
143144
_, con = self.handle_events(events)
144-
_os_write.assert_any_call(ANY, self.move_left())
145+
con.out.write.assert_any_call(self.move_left())
145146
con.restore()
146147

147-
def test_cursor_left_right(self, _os_write):
148+
def test_cursor_left_right(self):
148149
code = "1"
149150
events = itertools.chain(
150151
code_to_events(code),
@@ -154,21 +155,21 @@ def test_cursor_left_right(self, _os_write):
154155
],
155156
)
156157
_, con = self.handle_events(events)
157-
_os_write.assert_any_call(ANY, self.move_left())
158-
_os_write.assert_any_call(ANY, self.move_right())
158+
con.out.write.assert_any_call(self.move_left())
159+
con.out.write.assert_any_call(self.move_right())
159160
con.restore()
160161

161-
def test_cursor_up(self, _os_write):
162+
def test_cursor_up(self):
162163
code = "1\n2+3"
163164
events = itertools.chain(
164165
code_to_events(code),
165166
[Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
166167
)
167168
_, con = self.handle_events(events)
168-
_os_write.assert_any_call(ANY, self.move_up())
169+
con.out.write.assert_any_call(self.move_up())
169170
con.restore()
170171

171-
def test_cursor_up_down(self, _os_write):
172+
def test_cursor_up_down(self):
172173
code = "1\n2+3"
173174
events = itertools.chain(
174175
code_to_events(code),
@@ -178,23 +179,23 @@ def test_cursor_up_down(self, _os_write):
178179
],
179180
)
180181
_, con = self.handle_events(events)
181-
_os_write.assert_any_call(ANY, self.move_up())
182-
_os_write.assert_any_call(ANY, self.move_down())
182+
con.out.write.assert_any_call(self.move_up())
183+
con.out.write.assert_any_call(self.move_down())
183184
con.restore()
184185

185-
def test_cursor_back_write(self, _os_write):
186+
def test_cursor_back_write(self):
186187
events = itertools.chain(
187188
code_to_events("1"),
188189
[Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
189190
code_to_events("2"),
190191
)
191192
_, con = self.handle_events(events)
192-
_os_write.assert_any_call(ANY, b"1")
193-
_os_write.assert_any_call(ANY, self.move_left())
194-
_os_write.assert_any_call(ANY, b"21")
193+
con.out.write.assert_any_call(b"1")
194+
con.out.write.assert_any_call(self.move_left())
195+
con.out.write.assert_any_call(b"21")
195196
con.restore()
196197

197-
def test_multiline_function_move_up_short_terminal(self, _os_write):
198+
def test_multiline_function_move_up_short_terminal(self):
198199
# fmt: off
199200
code = (
200201
"def f():\n"
@@ -210,11 +211,11 @@ def test_multiline_function_move_up_short_terminal(self, _os_write):
210211
],
211212
)
212213
_, con = self.handle_events_short(events)
213-
_os_write.assert_any_call(ANY, self.move_left(5))
214-
_os_write.assert_any_call(ANY, self.move_up())
214+
con.out.write.assert_any_call(self.move_left(5))
215+
con.out.write.assert_any_call(self.move_up())
215216
con.restore()
216217

217-
def test_multiline_function_move_up_down_short_terminal(self, _os_write):
218+
def test_multiline_function_move_up_down_short_terminal(self):
218219
# fmt: off
219220
code = (
220221
"def f():\n"
@@ -232,11 +233,11 @@ def test_multiline_function_move_up_down_short_terminal(self, _os_write):
232233
],
233234
)
234235
_, con = self.handle_events_short(events)
235-
_os_write.assert_any_call(ANY, self.move_left(8))
236-
_os_write.assert_any_call(ANY, self.erase_in_line())
236+
con.out.write.assert_any_call(self.move_left(8))
237+
con.out.write.assert_any_call(self.erase_in_line())
237238
con.restore()
238239

239-
def test_resize_bigger_on_multiline_function(self, _os_write):
240+
def test_resize_bigger_on_multiline_function(self):
240241
# fmt: off
241242
code = (
242243
"def f():\n"
@@ -262,19 +263,19 @@ def same_console(events):
262263
prepare_reader=same_reader,
263264
prepare_console=same_console,
264265
)
265-
_os_write.assert_has_calls(
266+
con.out.write.assert_has_calls(
266267
[
267-
call(ANY, self.move_left(5)),
268-
call(ANY, self.move_up()),
269-
call(ANY, b"def f():"),
270-
call(ANY, self.move_left(3)),
271-
call(ANY, self.move_down()),
268+
call(self.move_left(5)),
269+
call(self.move_up()),
270+
call(b"def f():"),
271+
call(self.move_left(3)),
272+
call(self.move_down()),
272273
]
273274
)
274275
console.restore()
275276
con.restore()
276277

277-
def test_resize_smaller_on_multiline_function(self, _os_write):
278+
def test_resize_smaller_on_multiline_function(self):
278279
# fmt: off
279280
code = (
280281
"def f():\n"
@@ -300,12 +301,12 @@ def same_console(events):
300301
prepare_reader=same_reader,
301302
prepare_console=same_console,
302303
)
303-
_os_write.assert_has_calls(
304+
con.out.write.assert_has_calls(
304305
[
305-
call(ANY, self.move_left(5)),
306-
call(ANY, self.move_up()),
307-
call(ANY, self.erase_in_line()),
308-
call(ANY, b" foo"),
306+
call(self.move_left(5)),
307+
call(self.move_up()),
308+
call(self.erase_in_line()),
309+
call(b" foo"),
309310
]
310311
)
311312
console.restore()

0 commit comments

Comments
 (0)