3434
3535from . import curses
3636from .console import Console , Event
37- from .fancy_termios import tcgetattr , tcsetattr
37+ from .fancy_termios import tcgetattr , tcsetattr , TermState
3838from .trace import trace
3939from .unix_eventqueue import EventQueue
4040from .utils import wlen
4444
4545# types
4646if TYPE_CHECKING :
47- from typing import IO , Literal , overload
47+ from typing import AbstractSet , IO , Literal , overload , cast
4848else :
4949 overload = lambda func : None
50+ cast = lambda typ , val : val
5051
5152
5253class InvalidTerminal (RuntimeError ):
53- pass
54+ def __init__ (self , message : str ) -> None :
55+ super ().__init__ (errno .EIO , message )
5456
5557
5658_error = (termios .error , curses .error , InvalidTerminal )
59+ _error_codes_to_ignore = frozenset ([errno .EIO , errno .ENXIO , errno .EPERM ])
5760
5861SIGWINCH_EVENT = "repaint"
5962
@@ -118,12 +121,13 @@ def __init__(self):
118121
119122 def register (self , fd , flag ):
120123 self .fd = fd
124+
121125 # note: The 'timeout' argument is received as *milliseconds*
122126 def poll (self , timeout : float | None = None ) -> list [int ]:
123127 if timeout is None :
124128 r , w , e = select .select ([self .fd ], [], [])
125129 else :
126- r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
130+ r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
127131 return r
128132
129133 poll = MinimalPoll # type: ignore[assignment]
@@ -159,8 +163,15 @@ def __init__(
159163 and os .getenv ("TERM_PROGRAM" ) == "Apple_Terminal"
160164 )
161165
166+ try :
167+ self .__input_fd_set (tcgetattr (self .input_fd ), ignore = frozenset ())
168+ except _error as e :
169+ raise RuntimeError (f"termios failure ({ e .args [1 ]} )" )
170+
162171 @overload
163- def _my_getstr (cap : str , optional : Literal [False ] = False ) -> bytes : ...
172+ def _my_getstr (
173+ cap : str , optional : Literal [False ] = False
174+ ) -> bytes : ...
164175
165176 @overload
166177 def _my_getstr (cap : str , optional : bool ) -> bytes | None : ...
@@ -226,7 +237,6 @@ def __read(self, n: int) -> bytes:
226237 self .input_buffer_pos = 0
227238 return ret
228239
229-
230240 def change_encoding (self , encoding : str ) -> None :
231241 """
232242 Change the encoding used for I/O operations.
@@ -338,6 +348,8 @@ def prepare(self):
338348 """
339349 Prepare the console for input/output operations.
340350 """
351+ self .__buffer = []
352+
341353 self .__svtermstate = tcgetattr (self .input_fd )
342354 raw = self .__svtermstate .copy ()
343355 raw .iflag &= ~ (termios .INPCK | termios .ISTRIP | termios .IXON )
@@ -349,14 +361,7 @@ def prepare(self):
349361 raw .lflag |= termios .ISIG
350362 raw .cc [termios .VMIN ] = 1
351363 raw .cc [termios .VTIME ] = 0
352- try :
353- tcsetattr (self .input_fd , termios .TCSADRAIN , raw )
354- except termios .error as e :
355- if e .args [0 ] != errno .EIO :
356- # gh-135329: when running under external programs (like strace),
357- # tcsetattr may fail with EIO. We can safely ignore this
358- # and continue with default terminal settings.
359- raise
364+ self .__input_fd_set (raw )
360365
361366 # In macOS terminal we need to deactivate line wrap via ANSI escape code
362367 if self .is_apple_terminal :
@@ -365,8 +370,6 @@ def prepare(self):
365370 self .screen = []
366371 self .height , self .width = self .getheightwidth ()
367372
368- self .__buffer = []
369-
370373 self .posxy = 0 , 0
371374 self .__gone_tall = 0
372375 self .__move = self .__move_short
@@ -388,11 +391,7 @@ def restore(self):
388391 self .__disable_bracketed_paste ()
389392 self .__maybe_write_code (self ._rmkx )
390393 self .flushoutput ()
391- try :
392- tcsetattr (self .input_fd , termios .TCSADRAIN , self .__svtermstate )
393- except termios .error as e :
394- if e .args [0 ] != errno .EIO :
395- raise
394+ self .__input_fd_set (self .__svtermstate )
396395
397396 if self .is_apple_terminal :
398397 os .write (self .output_fd , b"\033 [?7h" )
@@ -831,3 +830,17 @@ def __tputs(self, fmt, prog=delayprog):
831830 os .write (self .output_fd , self ._pad * nchars )
832831 else :
833832 time .sleep (float (delay ) / 1000.0 )
833+
834+ def __input_fd_set (
835+ self ,
836+ state : TermState ,
837+ ignore : AbstractSet [int ] = _error_codes_to_ignore ,
838+ ) -> bool :
839+ try :
840+ tcsetattr (self .input_fd , termios .TCSADRAIN , state )
841+ except termios .error as te :
842+ if te .args [0 ] not in ignore :
843+ raise
844+ return False
845+ else :
846+ return True
0 commit comments