34
34
35
35
from . import curses
36
36
from .console import Console , Event
37
- from .fancy_termios import tcgetattr , tcsetattr
37
+ from .fancy_termios import tcgetattr , tcsetattr , TermState
38
38
from .trace import trace
39
39
from .unix_eventqueue import EventQueue
40
40
from .utils import wlen
44
44
45
45
# types
46
46
if TYPE_CHECKING :
47
- from typing import IO , Literal , overload
47
+ from typing import AbstractSet , IO , Literal , overload , cast
48
48
else :
49
49
overload = lambda func : None
50
+ cast = lambda typ , val : val
50
51
51
52
52
53
class InvalidTerminal (RuntimeError ):
53
- pass
54
+ def __init__ (self , message : str ) -> None :
55
+ super ().__init__ (errno .EIO , message )
54
56
55
57
56
58
_error = (termios .error , curses .error , InvalidTerminal )
59
+ _error_codes_to_ignore = frozenset ([errno .EIO , errno .ENXIO , errno .EPERM ])
57
60
58
61
SIGWINCH_EVENT = "repaint"
59
62
@@ -118,12 +121,13 @@ def __init__(self):
118
121
119
122
def register (self , fd , flag ):
120
123
self .fd = fd
124
+
121
125
# note: The 'timeout' argument is received as *milliseconds*
122
126
def poll (self , timeout : float | None = None ) -> list [int ]:
123
127
if timeout is None :
124
128
r , w , e = select .select ([self .fd ], [], [])
125
129
else :
126
- r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
130
+ r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
127
131
return r
128
132
129
133
poll = MinimalPoll # type: ignore[assignment]
@@ -159,8 +163,15 @@ def __init__(
159
163
and os .getenv ("TERM_PROGRAM" ) == "Apple_Terminal"
160
164
)
161
165
166
+ try :
167
+ self .__input_fd_set (tcgetattr (self .input_fd ), ignore = frozenset ())
168
+ except _error as e :
169
+ raise RuntimeError (f"termios failure ({ e .args [1 ]} )" )
170
+
162
171
@overload
163
- def _my_getstr (cap : str , optional : Literal [False ] = False ) -> bytes : ...
172
+ def _my_getstr (
173
+ cap : str , optional : Literal [False ] = False
174
+ ) -> bytes : ...
164
175
165
176
@overload
166
177
def _my_getstr (cap : str , optional : bool ) -> bytes | None : ...
@@ -226,7 +237,6 @@ def __read(self, n: int) -> bytes:
226
237
self .input_buffer_pos = 0
227
238
return ret
228
239
229
-
230
240
def change_encoding (self , encoding : str ) -> None :
231
241
"""
232
242
Change the encoding used for I/O operations.
@@ -338,6 +348,8 @@ def prepare(self):
338
348
"""
339
349
Prepare the console for input/output operations.
340
350
"""
351
+ self .__buffer = []
352
+
341
353
self .__svtermstate = tcgetattr (self .input_fd )
342
354
raw = self .__svtermstate .copy ()
343
355
raw .iflag &= ~ (termios .INPCK | termios .ISTRIP | termios .IXON )
@@ -349,14 +361,7 @@ def prepare(self):
349
361
raw .lflag |= termios .ISIG
350
362
raw .cc [termios .VMIN ] = 1
351
363
raw .cc [termios .VTIME ] = 0
352
- try :
353
- tcsetattr (self .input_fd , termios .TCSADRAIN , raw )
354
- except termios .error as e :
355
- if e .args [0 ] != errno .EIO :
356
- # gh-135329: when running under external programs (like strace),
357
- # tcsetattr may fail with EIO. We can safely ignore this
358
- # and continue with default terminal settings.
359
- raise
364
+ self .__input_fd_set (raw )
360
365
361
366
# In macOS terminal we need to deactivate line wrap via ANSI escape code
362
367
if self .is_apple_terminal :
@@ -365,8 +370,6 @@ def prepare(self):
365
370
self .screen = []
366
371
self .height , self .width = self .getheightwidth ()
367
372
368
- self .__buffer = []
369
-
370
373
self .posxy = 0 , 0
371
374
self .__gone_tall = 0
372
375
self .__move = self .__move_short
@@ -388,11 +391,7 @@ def restore(self):
388
391
self .__disable_bracketed_paste ()
389
392
self .__maybe_write_code (self ._rmkx )
390
393
self .flushoutput ()
391
- try :
392
- tcsetattr (self .input_fd , termios .TCSADRAIN , self .__svtermstate )
393
- except termios .error as e :
394
- if e .args [0 ] != errno .EIO :
395
- raise
394
+ self .__input_fd_set (self .__svtermstate )
396
395
397
396
if self .is_apple_terminal :
398
397
os .write (self .output_fd , b"\033 [?7h" )
@@ -831,3 +830,17 @@ def __tputs(self, fmt, prog=delayprog):
831
830
os .write (self .output_fd , self ._pad * nchars )
832
831
else :
833
832
time .sleep (float (delay ) / 1000.0 )
833
+
834
+ def __input_fd_set (
835
+ self ,
836
+ state : TermState ,
837
+ ignore : AbstractSet [int ] = _error_codes_to_ignore ,
838
+ ) -> bool :
839
+ try :
840
+ tcsetattr (self .input_fd , termios .TCSADRAIN , state )
841
+ except termios .error as te :
842
+ if te .args [0 ] not in ignore :
843
+ raise
844
+ return False
845
+ else :
846
+ return True
0 commit comments