33
44# Skip these tests if termios or fcntl are not available
55import_module ('termios' )
6+ # fcntl is a proxy for not being one of the wasm32 platforms even though we
7+ # don't use this module... a proper check for what crashes those is needed.
68import_module ("fcntl" )
79
810import errno
1517import socket
1618import io # readline
1719import unittest
18-
19- import struct
20- import fcntl
2120import warnings
2221
2322TEST_STRING_1 = b"I wish to buy a fish license.\n "
2423TEST_STRING_2 = b"For my pet fish, Eric.\n "
2524
26- try :
27- _TIOCGWINSZ = tty .TIOCGWINSZ
28- _TIOCSWINSZ = tty .TIOCSWINSZ
29- _HAVE_WINSZ = True
30- except AttributeError :
31- _HAVE_WINSZ = False
25+ _HAVE_WINSZ = hasattr (tty , "TIOCGWINSZ" ) and hasattr (tty , "TIOCSWINSZ" )
3226
3327if verbose :
3428 def debug (msg ):
@@ -82,14 +76,6 @@ def expectedFailureIfStdinIsTTY(fun):
8276 pass
8377 return fun
8478
85- def _get_term_winsz (fd ):
86- s = struct .pack ("HHHH" , 0 , 0 , 0 , 0 )
87- return fcntl .ioctl (fd , _TIOCGWINSZ , s )
88-
89- def _set_term_winsz (fd , winsz ):
90- fcntl .ioctl (fd , _TIOCSWINSZ , winsz )
91-
92-
9379# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
9480# because pty code is not too portable.
9581class PtyTest (unittest .TestCase ):
@@ -105,18 +91,14 @@ def setUp(self):
10591 self .addCleanup (signal .alarm , 0 )
10692 signal .alarm (10 )
10793
108- # Save original stdin window size
109- self .stdin_rows = None
110- self .stdin_cols = None
94+ # Save original stdin window size.
95+ self .stdin_dim = None
11196 if _HAVE_WINSZ :
11297 try :
113- stdin_dim = os .get_terminal_size (pty .STDIN_FILENO )
114- self .stdin_rows = stdin_dim .lines
115- self .stdin_cols = stdin_dim .columns
116- old_stdin_winsz = struct .pack ("HHHH" , self .stdin_rows ,
117- self .stdin_cols , 0 , 0 )
118- self .addCleanup (_set_term_winsz , pty .STDIN_FILENO , old_stdin_winsz )
119- except OSError :
98+ self .stdin_dim = tty .tcgetwinsize (pty .STDIN_FILENO )
99+ self .addCleanup (tty .tcsetwinsize , pty .STDIN_FILENO ,
100+ self .stdin_dim )
101+ except tty .error :
120102 pass
121103
122104 def handle_sig (self , sig , frame ):
@@ -131,41 +113,40 @@ def test_openpty(self):
131113 try :
132114 mode = tty .tcgetattr (pty .STDIN_FILENO )
133115 except tty .error :
134- # not a tty or bad/closed fd
116+ # Not a tty or bad/closed fd.
135117 debug ("tty.tcgetattr(pty.STDIN_FILENO) failed" )
136118 mode = None
137119
138- new_stdin_winsz = None
139- if self .stdin_rows is not None and self . stdin_cols is not None :
120+ new_dim = None
121+ if self .stdin_dim :
140122 try :
141123 # Modify pty.STDIN_FILENO window size; we need to
142124 # check if pty.openpty() is able to set pty slave
143125 # window size accordingly.
144- debug ("Setting pty.STDIN_FILENO window size" )
145- debug (f"original size: (rows={ self .stdin_rows } , cols={ self .stdin_cols } )" )
146- target_stdin_rows = self .stdin_rows + 1
147- target_stdin_cols = self .stdin_cols + 1
148- debug (f"target size: (rows={ target_stdin_rows } , cols={ target_stdin_cols } )" )
149- target_stdin_winsz = struct .pack ("HHHH" , target_stdin_rows ,
150- target_stdin_cols , 0 , 0 )
151- _set_term_winsz (pty .STDIN_FILENO , target_stdin_winsz )
126+ debug ("Setting pty.STDIN_FILENO window size." )
127+ debug (f"original size: (row, col) = { self .stdin_dim } " )
128+ target_dim = (self .stdin_dim [0 ] + 1 , self .stdin_dim [1 ] + 1 )
129+ debug (f"target size: (row, col) = { target_dim } " )
130+ tty .tcsetwinsize (pty .STDIN_FILENO , target_dim )
152131
153132 # Were we able to set the window size
154133 # of pty.STDIN_FILENO successfully?
155- new_stdin_winsz = _get_term_winsz (pty .STDIN_FILENO )
156- self .assertEqual (new_stdin_winsz , target_stdin_winsz ,
134+ new_dim = tty . tcgetwinsize (pty .STDIN_FILENO )
135+ self .assertEqual (new_dim , target_dim ,
157136 "pty.STDIN_FILENO window size unchanged" )
158137 except OSError :
159- warnings .warn ("Failed to set pty.STDIN_FILENO window size" )
138+ warnings .warn ("Failed to set pty.STDIN_FILENO window size. " )
160139 pass
161140
162141 try :
163142 debug ("Calling pty.openpty()" )
164143 try :
165- master_fd , slave_fd = pty .openpty (mode , new_stdin_winsz )
144+ master_fd , slave_fd , slave_name = pty .openpty (mode , new_dim ,
145+ True )
166146 except TypeError :
167147 master_fd , slave_fd = pty .openpty ()
168- debug (f"Got master_fd '{ master_fd } ', slave_fd '{ slave_fd } '" )
148+ slave_name = None
149+ debug (f"Got { master_fd = } , { slave_fd = } , { slave_name = } " )
169150 except OSError :
170151 # " An optional feature could not be imported " ... ?
171152 raise unittest .SkipTest ("Pseudo-terminals (seemingly) not functional." )
@@ -181,8 +162,8 @@ def test_openpty(self):
181162 if mode :
182163 self .assertEqual (tty .tcgetattr (slave_fd ), mode ,
183164 "openpty() failed to set slave termios" )
184- if new_stdin_winsz :
185- self .assertEqual (_get_term_winsz (slave_fd ), new_stdin_winsz ,
165+ if new_dim :
166+ self .assertEqual (tty . tcgetwinsize (slave_fd ), new_dim ,
186167 "openpty() failed to set slave window size" )
187168
188169 # Ensure the fd is non-blocking in case there's nothing to read.
@@ -367,9 +348,8 @@ def _socketpair(self):
367348 self .files .extend (socketpair )
368349 return socketpair
369350
370- def _mock_select (self , rfds , wfds , xfds , timeout = 0 ):
351+ def _mock_select (self , rfds , wfds , xfds ):
371352 # This will raise IndexError when no more expected calls exist.
372- # This ignores the timeout
373353 self .assertEqual (self .select_rfds_lengths .pop (0 ), len (rfds ))
374354 return self .select_rfds_results .pop (0 ), [], []
375355
@@ -409,28 +389,6 @@ def test__copy_to_each(self):
409389 self .assertEqual (os .read (read_from_stdout_fd , 20 ), b'from master' )
410390 self .assertEqual (os .read (masters [1 ], 20 ), b'from stdin' )
411391
412- def test__copy_eof_on_all (self ):
413- """Test the empty read EOF case on both master_fd and stdin."""
414- read_from_stdout_fd , mock_stdout_fd = self ._pipe ()
415- pty .STDOUT_FILENO = mock_stdout_fd
416- mock_stdin_fd , write_to_stdin_fd = self ._pipe ()
417- pty .STDIN_FILENO = mock_stdin_fd
418- socketpair = self ._socketpair ()
419- masters = [s .fileno () for s in socketpair ]
420-
421- socketpair [1 ].close ()
422- os .close (write_to_stdin_fd )
423-
424- pty .select = self ._mock_select
425- self .select_rfds_lengths .append (2 )
426- self .select_rfds_results .append ([mock_stdin_fd , masters [0 ]])
427- # We expect that both fds were removed from the fds list as they
428- # both encountered an EOF before the second select call.
429- self .select_rfds_lengths .append (0 )
430-
431- # We expect the function to return without error.
432- self .assertEqual (pty ._copy (masters [0 ]), None )
433-
434392 def test__restore_tty_mode_normal_return (self ):
435393 """Test that spawn resets the tty mode no when _copy returns normally."""
436394
0 commit comments