3535
3636from  . import  terminfo 
3737from  .console  import  Console , Event 
38- from  .fancy_termios  import  tcgetattr , tcsetattr 
38+ from  .fancy_termios  import  tcgetattr , tcsetattr ,  TermState 
3939from  .trace  import  trace 
4040from  .unix_eventqueue  import  EventQueue 
4141from  .utils  import  wlen 
5151
5252# types 
5353if  TYPE_CHECKING :
54-     from  typing  import  IO , Literal , overload 
54+     from  typing  import  AbstractSet ,  IO , Literal , overload ,  cast 
5555else :
5656    overload  =  lambda  func : None 
57+     cast  =  lambda  typ , val : val 
5758
5859
5960class  InvalidTerminal (RuntimeError ):
60-     pass 
61+     def  __init__ (self , message : str ) ->  None :
62+         super ().__init__ (errno .EIO , message )
6163
6264
6365_error  =  (termios .error , InvalidTerminal )
66+ _error_codes_to_ignore  =  frozenset ([errno .EIO , errno .ENXIO , errno .EPERM ])
6467
6568SIGWINCH_EVENT  =  "repaint" 
6669
@@ -125,12 +128,13 @@ def __init__(self):
125128
126129        def  register (self , fd , flag ):
127130            self .fd  =  fd 
131+ 
128132        # note: The 'timeout' argument is received as *milliseconds* 
129133        def  poll (self , timeout : float  |  None  =  None ) ->  list [int ]:
130134            if  timeout  is  None :
131135                r , w , e  =  select .select ([self .fd ], [], [])
132136            else :
133-                 r , w , e  =  select .select ([self .fd ], [], [], timeout / 1000 )
137+                 r , w , e  =  select .select ([self .fd ], [], [], timeout   /   1000 )
134138            return  r 
135139
136140    poll  =  MinimalPoll   # type: ignore[assignment] 
@@ -164,8 +168,15 @@ def __init__(
164168            and  os .getenv ("TERM_PROGRAM" ) ==  "Apple_Terminal" 
165169        )
166170
171+         try :
172+             self .__input_fd_set (tcgetattr (self .input_fd ), ignore = frozenset ())
173+         except  _error  as  e :
174+             raise  RuntimeError (f"termios failure ({ e .args [1 ]}  )
175+ 
167176        @overload  
168-         def  _my_getstr (cap : str , optional : Literal [False ] =  False ) ->  bytes : ...
177+         def  _my_getstr (
178+             cap : str , optional : Literal [False ] =  False 
179+         ) ->  bytes : ...
169180
170181        @overload  
171182        def  _my_getstr (cap : str , optional : bool ) ->  bytes  |  None : ...
@@ -205,7 +216,9 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
205216
206217        self .__setup_movement ()
207218
208-         self .event_queue  =  EventQueue (self .input_fd , self .encoding , self .terminfo )
219+         self .event_queue  =  EventQueue (
220+             self .input_fd , self .encoding , self .terminfo 
221+         )
209222        self .cursor_visible  =  1 
210223
211224        signal .signal (signal .SIGCONT , self ._sigcont_handler )
@@ -217,7 +230,6 @@ def _sigcont_handler(self, signum, frame):
217230    def  __read (self , n : int ) ->  bytes :
218231        return  os .read (self .input_fd , n )
219232
220- 
221233    def  change_encoding (self , encoding : str ) ->  None :
222234        """ 
223235        Change the encoding used for I/O operations. 
@@ -329,6 +341,8 @@ def prepare(self):
329341        """ 
330342        Prepare the console for input/output operations. 
331343        """ 
344+         self .__buffer  =  []
345+ 
332346        self .__svtermstate  =  tcgetattr (self .input_fd )
333347        raw  =  self .__svtermstate .copy ()
334348        raw .iflag  &=  ~ (termios .INPCK  |  termios .ISTRIP  |  termios .IXON )
@@ -340,14 +354,7 @@ def prepare(self):
340354        raw .lflag  |=  termios .ISIG 
341355        raw .cc [termios .VMIN ] =  1 
342356        raw .cc [termios .VTIME ] =  0 
343-         try :
344-             tcsetattr (self .input_fd , termios .TCSADRAIN , raw )
345-         except  termios .error  as  e :
346-             if  e .args [0 ] !=  errno .EIO :
347-                 # gh-135329: when running under external programs (like strace), 
348-                 # tcsetattr may fail with EIO. We can safely ignore this 
349-                 # and continue with default terminal settings. 
350-                 raise 
357+         self .__input_fd_set (raw )
351358
352359        # In macOS terminal we need to deactivate line wrap via ANSI escape code 
353360        if  self .is_apple_terminal :
@@ -356,8 +363,6 @@ def prepare(self):
356363        self .screen  =  []
357364        self .height , self .width  =  self .getheightwidth ()
358365
359-         self .__buffer  =  []
360- 
361366        self .posxy  =  0 , 0 
362367        self .__gone_tall  =  0 
363368        self .__move  =  self .__move_short 
@@ -379,11 +384,7 @@ def restore(self):
379384        self .__disable_bracketed_paste ()
380385        self .__maybe_write_code (self ._rmkx )
381386        self .flushoutput ()
382-         try :
383-             tcsetattr (self .input_fd , termios .TCSADRAIN , self .__svtermstate )
384-         except  termios .error  as  e :
385-             if  e .args [0 ] !=  errno .EIO :
386-                 raise 
387+         self .__input_fd_set (self .__svtermstate )
387388
388389        if  self .is_apple_terminal :
389390            os .write (self .output_fd , b"\033 [?7h" )
@@ -820,3 +821,17 @@ def __tputs(self, fmt, prog=delayprog):
820821                os .write (self .output_fd , self ._pad  *  nchars )
821822            else :
822823                time .sleep (float (delay ) /  1000.0 )
824+ 
825+     def  __input_fd_set (
826+         self ,
827+         state : TermState ,
828+         ignore : AbstractSet [int ] =  _error_codes_to_ignore ,
829+     ) ->  bool :
830+         try :
831+             tcsetattr (self .input_fd , termios .TCSADRAIN , state )
832+         except  termios .error  as  te :
833+             if  te .args [0 ] not  in ignore :
834+                 raise 
835+             return  False 
836+         else :
837+             return  True 
0 commit comments