3535
3636from . import terminfo
3737from .console import Console , Event
38- from .fancy_termios import tcgetattr , tcsetattr
38+ from .fancy_termios import tcgetattr , tcsetattr , TermState
3939from .trace import trace
4040from .unix_eventqueue import EventQueue
4141from .utils import wlen
5151
5252# types
5353if TYPE_CHECKING :
54- from typing import IO , Literal , overload
54+ from typing import AbstractSet , IO , Literal , overload , cast
5555else :
5656 overload = lambda func : None
57+ cast = lambda typ , val : val
5758
5859
5960class InvalidTerminal (RuntimeError ):
60- pass
61+ def __init__ (self , message : str ) -> None :
62+ super ().__init__ (errno .EIO , message )
6163
6264
6365_error = (termios .error , InvalidTerminal )
66+ _error_codes_to_ignore = frozenset ([errno .EIO , errno .ENXIO , errno .EPERM ])
6467
6568SIGWINCH_EVENT = "repaint"
6669
@@ -125,12 +128,13 @@ def __init__(self):
125128
126129 def register (self , fd , flag ):
127130 self .fd = fd
131+
128132 # note: The 'timeout' argument is received as *milliseconds*
129133 def poll (self , timeout : float | None = None ) -> list [int ]:
130134 if timeout is None :
131135 r , w , e = select .select ([self .fd ], [], [])
132136 else :
133- r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
137+ r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
134138 return r
135139
136140 poll = MinimalPoll # type: ignore[assignment]
@@ -164,8 +168,15 @@ def __init__(
164168 and os .getenv ("TERM_PROGRAM" ) == "Apple_Terminal"
165169 )
166170
171+ try :
172+ self .__input_fd_set (tcgetattr (self .input_fd ), ignore = frozenset ())
173+ except _error as e :
174+ raise RuntimeError (f"termios failure ({ e .args [1 ]} )" )
175+
167176 @overload
168- def _my_getstr (cap : str , optional : Literal [False ] = False ) -> bytes : ...
177+ def _my_getstr (
178+ cap : str , optional : Literal [False ] = False
179+ ) -> bytes : ...
169180
170181 @overload
171182 def _my_getstr (cap : str , optional : bool ) -> bytes | None : ...
@@ -205,7 +216,9 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
205216
206217 self .__setup_movement ()
207218
208- self .event_queue = EventQueue (self .input_fd , self .encoding , self .terminfo )
219+ self .event_queue = EventQueue (
220+ self .input_fd , self .encoding , self .terminfo
221+ )
209222 self .cursor_visible = 1
210223
211224 signal .signal (signal .SIGCONT , self ._sigcont_handler )
@@ -217,7 +230,6 @@ def _sigcont_handler(self, signum, frame):
217230 def __read (self , n : int ) -> bytes :
218231 return os .read (self .input_fd , n )
219232
220-
221233 def change_encoding (self , encoding : str ) -> None :
222234 """
223235 Change the encoding used for I/O operations.
@@ -329,6 +341,8 @@ def prepare(self):
329341 """
330342 Prepare the console for input/output operations.
331343 """
344+ self .__buffer = []
345+
332346 self .__svtermstate = tcgetattr (self .input_fd )
333347 raw = self .__svtermstate .copy ()
334348 raw .iflag &= ~ (termios .INPCK | termios .ISTRIP | termios .IXON )
@@ -340,14 +354,7 @@ def prepare(self):
340354 raw .lflag |= termios .ISIG
341355 raw .cc [termios .VMIN ] = 1
342356 raw .cc [termios .VTIME ] = 0
343- try :
344- tcsetattr (self .input_fd , termios .TCSADRAIN , raw )
345- except termios .error as e :
346- if e .args [0 ] != errno .EIO :
347- # gh-135329: when running under external programs (like strace),
348- # tcsetattr may fail with EIO. We can safely ignore this
349- # and continue with default terminal settings.
350- raise
357+ self .__input_fd_set (raw )
351358
352359 # In macOS terminal we need to deactivate line wrap via ANSI escape code
353360 if self .is_apple_terminal :
@@ -356,8 +363,6 @@ def prepare(self):
356363 self .screen = []
357364 self .height , self .width = self .getheightwidth ()
358365
359- self .__buffer = []
360-
361366 self .posxy = 0 , 0
362367 self .__gone_tall = 0
363368 self .__move = self .__move_short
@@ -379,11 +384,7 @@ def restore(self):
379384 self .__disable_bracketed_paste ()
380385 self .__maybe_write_code (self ._rmkx )
381386 self .flushoutput ()
382- try :
383- tcsetattr (self .input_fd , termios .TCSADRAIN , self .__svtermstate )
384- except termios .error as e :
385- if e .args [0 ] != errno .EIO :
386- raise
387+ self .__input_fd_set (self .__svtermstate )
387388
388389 if self .is_apple_terminal :
389390 os .write (self .output_fd , b"\033 [?7h" )
@@ -820,3 +821,17 @@ def __tputs(self, fmt, prog=delayprog):
820821 os .write (self .output_fd , self ._pad * nchars )
821822 else :
822823 time .sleep (float (delay ) / 1000.0 )
824+
825+ def __input_fd_set (
826+ self ,
827+ state : TermState ,
828+ ignore : AbstractSet [int ] = _error_codes_to_ignore ,
829+ ) -> bool :
830+ try :
831+ tcsetattr (self .input_fd , termios .TCSADRAIN , state )
832+ except termios .error as te :
833+ if te .args [0 ] not in ignore :
834+ raise
835+ return False
836+ else :
837+ return True
0 commit comments