3535
3636from . import terminfo
3737from .console import Console , Event
38- from .fancy_termios import tcgetattr , tcsetattr
38+ from .fancy_termios import tcgetattr , tcsetattr , TermState
3939from .trace import trace
4040from .unix_eventqueue import EventQueue
4141from .utils import wlen
5151
5252# types
5353if TYPE_CHECKING :
54- from typing import IO , Literal , overload
54+ from typing import AbstractSet , IO , Literal , overload , cast
5555else :
5656 overload = lambda func : None
57+ cast = lambda typ , val : val
5758
5859
5960class InvalidTerminal (RuntimeError ):
60- pass
61+ def __init__ (self , message : str ) -> None :
62+ super ().__init__ (errno .EIO , message )
6163
6264
6365_error = (termios .error , InvalidTerminal )
66+ _error_codes_to_ignore : frozenset [int ] = frozenset ([errno .EIO , errno .EPERM ])
6467
6568SIGWINCH_EVENT = "repaint"
6669
@@ -125,12 +128,13 @@ def __init__(self):
125128
126129 def register (self , fd , flag ):
127130 self .fd = fd
131+
128132 # note: The 'timeout' argument is received as *milliseconds*
129133 def poll (self , timeout : float | None = None ) -> list [int ]:
130134 if timeout is None :
131135 r , w , e = select .select ([self .fd ], [], [])
132136 else :
133- r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
137+ r , w , e = select .select ([self .fd ], [], [], timeout / 1000 )
134138 return r
135139
136140 poll = MinimalPoll # type: ignore[assignment]
@@ -160,8 +164,15 @@ def __init__(
160164 self .terminfo = terminfo .TermInfo (term or None )
161165 self .term = term
162166
167+ try :
168+ self .__input_fd_set (tcgetattr (self .input_fd ), ignore = frozenset ())
169+ except _error as e :
170+ raise RuntimeError (f"termios failure ({ e .args [1 ]} )" )
171+
163172 @overload
164- def _my_getstr (cap : str , optional : Literal [False ] = False ) -> bytes : ...
173+ def _my_getstr (
174+ cap : str , optional : Literal [False ] = False
175+ ) -> bytes : ...
165176
166177 @overload
167178 def _my_getstr (cap : str , optional : bool ) -> bytes | None : ...
@@ -201,7 +212,9 @@ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
201212
202213 self .__setup_movement ()
203214
204- self .event_queue = EventQueue (self .input_fd , self .encoding , self .terminfo )
215+ self .event_queue = EventQueue (
216+ self .input_fd , self .encoding , self .terminfo
217+ )
205218 self .cursor_visible = 1
206219
207220 signal .signal (signal .SIGCONT , self ._sigcont_handler )
@@ -213,7 +226,6 @@ def _sigcont_handler(self, signum, frame):
213226 def __read (self , n : int ) -> bytes :
214227 return os .read (self .input_fd , n )
215228
216-
217229 def change_encoding (self , encoding : str ) -> None :
218230 """
219231 Change the encoding used for I/O operations.
@@ -325,6 +337,8 @@ def prepare(self):
325337 """
326338 Prepare the console for input/output operations.
327339 """
340+ self .__buffer = []
341+
328342 self .__svtermstate = tcgetattr (self .input_fd )
329343 raw = self .__svtermstate .copy ()
330344 raw .iflag &= ~ (termios .INPCK | termios .ISTRIP | termios .IXON )
@@ -336,7 +350,7 @@ def prepare(self):
336350 raw .lflag |= termios .ISIG
337351 raw .cc [termios .VMIN ] = 1
338352 raw .cc [termios .VTIME ] = 0
339- tcsetattr ( self .input_fd , termios . TCSADRAIN , raw )
353+ self .__input_fd_set ( raw )
340354
341355 # In macOS terminal we need to deactivate line wrap via ANSI escape code
342356 if platform .system () == "Darwin" and os .getenv ("TERM_PROGRAM" ) == "Apple_Terminal" :
@@ -345,8 +359,6 @@ def prepare(self):
345359 self .screen = []
346360 self .height , self .width = self .getheightwidth ()
347361
348- self .__buffer = []
349-
350362 self .posxy = 0 , 0
351363 self .__gone_tall = 0
352364 self .__move = self .__move_short
@@ -368,7 +380,7 @@ def restore(self):
368380 self .__disable_bracketed_paste ()
369381 self .__maybe_write_code (self ._rmkx )
370382 self .flushoutput ()
371- tcsetattr ( self .input_fd , termios . TCSADRAIN , self .__svtermstate )
383+ self .__input_fd_set ( self .__svtermstate )
372384
373385 if platform .system () == "Darwin" and os .getenv ("TERM_PROGRAM" ) == "Apple_Terminal" :
374386 os .write (self .output_fd , b"\033 [?7h" )
@@ -803,3 +815,17 @@ def __tputs(self, fmt, prog=delayprog):
803815 os .write (self .output_fd , self ._pad * nchars )
804816 else :
805817 time .sleep (float (delay ) / 1000.0 )
818+
819+ def __input_fd_set (
820+ self ,
821+ state : TermState ,
822+ ignore : AbstractSet [int ] = _error_codes_to_ignore ,
823+ ) -> bool :
824+ try :
825+ tcsetattr (self .input_fd , termios .TCSADRAIN , state )
826+ except termios .error as te :
827+ if te .args [0 ] not in ignore :
828+ raise
829+ return False
830+ else :
831+ return True
0 commit comments