35
35
36
36
from . import terminfo
37
37
from .console import Console , Event
38
- from .fancy_termios import tcgetattr , tcsetattr
38
+ from .fancy_termios import tcgetattr , tcsetattr , TermState
39
39
from .trace import trace
40
40
from .unix_eventqueue import EventQueue
41
41
from .utils import wlen
51
51
52
52
# types
53
53
if TYPE_CHECKING :
54
- from typing import IO , Literal , overload
54
+ from typing import AbstractSet , IO , Literal , overload , cast
55
55
else :
56
56
overload = lambda func : None
57
+ cast = lambda typ , val : val
57
58
58
59
59
60
class InvalidTerminal (RuntimeError ):
60
- pass
61
+ def __init__ (self , message : str ) -> None :
62
+ super ().__init__ (errno .EIO , message )
61
63
62
64
63
65
_error = (termios .error , InvalidTerminal )
66
+ _error_codes_to_ignore = frozenset ([errno .EIO , errno .ENXIO , errno .EPERM ])
64
67
65
68
SIGWINCH_EVENT = "repaint"
66
69
@@ -125,12 +128,13 @@ def __init__(self):
125
128
126
129
def register (self , fd , flag ):
127
130
self .fd = fd
131
+
128
132
# note: The 'timeout' argument is received as *milliseconds*
129
133
def poll (self , timeout : float | None = None ) -> list [int ]:
130
134
if timeout is None :
131
135
r , w , e = select .select ([self .fd ], [], [])
132
136
else :
133
- r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
137
+ r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
134
138
return r
135
139
136
140
poll = MinimalPoll # type: ignore[assignment]
@@ -164,8 +168,15 @@ def __init__(
164
168
and os .getenv ("TERM_PROGRAM" ) == "Apple_Terminal"
165
169
)
166
170
171
+ try :
172
+ self .__input_fd_set (tcgetattr (self .input_fd ), ignore = frozenset ())
173
+ except _error as e :
174
+ raise RuntimeError (f"termios failure ({ e .args [1 ]} )" )
175
+
167
176
@overload
168
- def _my_getstr (cap : str , optional : Literal [False ] = False ) -> bytes : ...
177
+ def _my_getstr (
178
+ cap : str , optional : Literal [False ] = False
179
+ ) -> bytes : ...
169
180
170
181
@overload
171
182
def _my_getstr (cap : str , optional : bool ) -> bytes | None : ...
@@ -205,7 +216,9 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
205
216
206
217
self .__setup_movement ()
207
218
208
- self .event_queue = EventQueue (self .input_fd , self .encoding , self .terminfo )
219
+ self .event_queue = EventQueue (
220
+ self .input_fd , self .encoding , self .terminfo
221
+ )
209
222
self .cursor_visible = 1
210
223
211
224
signal .signal (signal .SIGCONT , self ._sigcont_handler )
@@ -217,7 +230,6 @@ def _sigcont_handler(self, signum, frame):
217
230
def __read (self , n : int ) -> bytes :
218
231
return os .read (self .input_fd , n )
219
232
220
-
221
233
def change_encoding (self , encoding : str ) -> None :
222
234
"""
223
235
Change the encoding used for I/O operations.
@@ -329,6 +341,8 @@ def prepare(self):
329
341
"""
330
342
Prepare the console for input/output operations.
331
343
"""
344
+ self .__buffer = []
345
+
332
346
self .__svtermstate = tcgetattr (self .input_fd )
333
347
raw = self .__svtermstate .copy ()
334
348
raw .iflag &= ~ (termios .INPCK | termios .ISTRIP | termios .IXON )
@@ -340,14 +354,7 @@ def prepare(self):
340
354
raw .lflag |= termios .ISIG
341
355
raw .cc [termios .VMIN ] = 1
342
356
raw .cc [termios .VTIME ] = 0
343
- try :
344
- tcsetattr (self .input_fd , termios .TCSADRAIN , raw )
345
- except termios .error as e :
346
- if e .args [0 ] != errno .EIO :
347
- # gh-135329: when running under external programs (like strace),
348
- # tcsetattr may fail with EIO. We can safely ignore this
349
- # and continue with default terminal settings.
350
- raise
357
+ self .__input_fd_set (raw )
351
358
352
359
# In macOS terminal we need to deactivate line wrap via ANSI escape code
353
360
if self .is_apple_terminal :
@@ -356,8 +363,6 @@ def prepare(self):
356
363
self .screen = []
357
364
self .height , self .width = self .getheightwidth ()
358
365
359
- self .__buffer = []
360
-
361
366
self .posxy = 0 , 0
362
367
self .__gone_tall = 0
363
368
self .__move = self .__move_short
@@ -379,11 +384,7 @@ def restore(self):
379
384
self .__disable_bracketed_paste ()
380
385
self .__maybe_write_code (self ._rmkx )
381
386
self .flushoutput ()
382
- try :
383
- tcsetattr (self .input_fd , termios .TCSADRAIN , self .__svtermstate )
384
- except termios .error as e :
385
- if e .args [0 ] != errno .EIO :
386
- raise
387
+ self .__input_fd_set (self .__svtermstate )
387
388
388
389
if self .is_apple_terminal :
389
390
os .write (self .output_fd , b"\033 [?7h" )
@@ -820,3 +821,17 @@ def __tputs(self, fmt, prog=delayprog):
820
821
os .write (self .output_fd , self ._pad * nchars )
821
822
else :
822
823
time .sleep (float (delay ) / 1000.0 )
824
+
825
+ def __input_fd_set (
826
+ self ,
827
+ state : TermState ,
828
+ ignore : AbstractSet [int ] = _error_codes_to_ignore ,
829
+ ) -> bool :
830
+ try :
831
+ tcsetattr (self .input_fd , termios .TCSADRAIN , state )
832
+ except termios .error as te :
833
+ if te .args [0 ] not in ignore :
834
+ raise
835
+ return False
836
+ else :
837
+ return True
0 commit comments