36
36
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
37
37
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
38
38
# SOFTWARE.
39
-
40
39
import os
41
40
import re
42
41
import select
43
42
import subprocess
44
43
import sys
44
+ import tempfile
45
45
import termios
46
46
from textwrap import dedent
47
47
48
48
49
- def validate_repl (stdin ):
49
+ def validate_repl (stdin , python_args = (), ignore_preamble = True ):
50
50
env = os .environ .copy ()
51
51
env ['TERM' ] = 'ansi'
52
52
env ['PYTHONIOENCODING' ] = 'utf-8'
53
53
pty_parent , pty_child = os .openpty ()
54
- termios .tcsetwinsize (pty_parent , (60 , 80 ))
55
- proc = subprocess .Popen ([sys .executable , '-I' ], env = env , stdin = pty_child , stdout = pty_child , stderr = pty_child )
56
- out = ''
57
- input_and_output = []
58
- in_matches = list (re .finditer (r'^(>>>|\.\.\.) (.*)' , stdin , flags = re .MULTILINE ))
59
- for i , match in enumerate (in_matches ):
60
- input_and_output .append ((
61
- match .group (1 ),
62
- match .group (2 ),
63
- stdin [match .end ():in_matches [i + 1 ].start () - 1 if i + 1 < len (in_matches ) else - 1 ],
64
- ))
65
- first_prompt = True
66
- index = 0
67
- whole_out = ''
68
- while True :
69
- rlist , _ , _ = select .select ([pty_parent ], [], [], 30 )
70
- assert pty_parent in rlist , f"Timed out waiting for REPL output. Output: { whole_out } { out } "
71
- out += os .read (pty_parent , 1024 ).decode ('utf-8' )
72
- out = out .replace ('\r \n ' , '\n ' )
73
- out = re .sub (r'\x1b\[(?:\?2004[hl]|\d+[A-G])' , '' , out )
74
- if out .endswith (('\n >>> ' , '\n ... ' )):
75
- if not first_prompt :
54
+ try :
55
+ termios .tcsetwinsize (pty_parent , (60 , 80 ))
56
+ proc = subprocess .Popen (
57
+ [sys .executable , '-I' , * python_args ],
58
+ env = env ,
59
+ stdin = pty_child ,
60
+ stdout = pty_child ,
61
+ stderr = pty_child ,
62
+ )
63
+ out = ''
64
+ input_and_output = []
65
+ expected_preamble = ''
66
+ in_matches = list (re .finditer (r'^(>>>|\.\.\.) (.*)' , stdin , flags = re .MULTILINE ))
67
+ for i , match in enumerate (in_matches ):
68
+ if i == 0 :
69
+ expected_preamble = stdin [:match .start () - 1 ] if match .start () else ''
70
+ input_and_output .append ((
71
+ match .group (1 ),
72
+ match .group (2 ),
73
+ stdin [match .end ():in_matches [i + 1 ].start () - 1 if i + 1 < len (in_matches ) else - 1 ],
74
+ ))
75
+ index = - 1
76
+ whole_out = ''
77
+ while True :
78
+ rlist , _ , _ = select .select ([pty_parent ], [], [], 30 )
79
+ assert pty_parent in rlist , f"Timed out waiting for REPL output. Output: { whole_out } { out } "
80
+ out += os .read (pty_parent , 1024 ).decode ('utf-8' )
81
+ out = out .replace ('\r \n ' , '\n ' )
82
+ out = re .sub (r'\x1b\[(?:\?2004[hl]|\d+[A-G])' , '' , out )
83
+ if out == '>>> ' or out .endswith (('\n >>> ' , '\n ... ' )):
76
84
prompt = out [:3 ]
77
- expected_prompt , current_in , expected_out = input_and_output [index ]
78
- assert prompt == expected_prompt
79
- expected = f'{ expected_prompt } { current_in } { expected_out } '
80
85
actual = out [:- 5 ]
81
- assert actual == expected , f'Actual:\n { actual !r} \n Expected:\n { expected !r} '
86
+ if index >= 0 :
87
+ expected_prompt , current_in , expected_out = input_and_output [index ]
88
+ assert prompt == expected_prompt
89
+ expected = f'{ expected_prompt } { current_in } { expected_out } '
90
+ else :
91
+ expected = expected_preamble
92
+ if index >= 0 or not ignore_preamble :
93
+ assert actual == expected , f'Actual:\n { actual !r} \n Expected:\n { expected !r} '
82
94
index += 1
83
- first_prompt = False
84
- whole_out += out [:- 4 ]
85
- out = out [- 4 :]
86
- if index >= len (input_and_output ):
87
- os .close (pty_child )
88
- os .close (pty_parent )
89
- proc .wait (timeout = 5 )
90
- return
91
- _ , next_in , _ = input_and_output [index ]
92
- os .write (pty_parent , next_in .encode ('utf-8' ) + b'\r ' )
95
+ whole_out += out [:- 4 ]
96
+ out = out [- 4 :]
97
+ if index >= len (input_and_output ):
98
+ os .write (pty_parent , b'\x04 ' ) # CTRL-D
99
+ proc .wait (timeout = 30 )
100
+ out = os .read (pty_parent , 1024 ).decode ('utf-8' )
101
+ out = re .sub (r'\x1b\[\?2004[hl]' , '' , out )
102
+ assert not out .strip (), f"Garbage after EOF:\n { out !r} "
103
+ return
104
+ else :
105
+ _ , next_in , _ = input_and_output [index ]
106
+ os .write (pty_parent , next_in .encode ('utf-8' ) + b'\r ' )
107
+ finally :
108
+ os .close (pty_child )
109
+ os .close (pty_parent )
93
110
94
111
95
112
def test_basic_repl ():
@@ -99,6 +116,8 @@ def test_basic_repl():
99
116
>>> None
100
117
>>> "hello"
101
118
'hello'
119
+ >>> _
120
+ 'hello'
102
121
""" ))
103
122
104
123
@@ -148,3 +167,27 @@ def test_exceptions():
148
167
File "<stdin>", line 3, in __repr__
149
168
NameError: name 'asdf' is not defined
150
169
""" ))
170
+
171
+
172
+ def test_inspect_flag ():
173
+ with tempfile .NamedTemporaryFile ('w' ) as f :
174
+ f .write ('a = 1\n ' )
175
+ f .flush ()
176
+ validate_repl (dedent ("""\
177
+ >>> a
178
+ 1
179
+ """ ), python_args = ['-i' , f .name ], ignore_preamble = False )
180
+
181
+
182
+ def test_inspect_flag_exit ():
183
+ with tempfile .NamedTemporaryFile ('w' ) as f :
184
+ f .write ('a = 1\n import sys\n sys.exit(1)\n ' )
185
+ f .flush ()
186
+ validate_repl (dedent (f"""\
187
+ Traceback (most recent call last):
188
+ File "{ f .name } ", line 3, in <module>
189
+ sys.exit(1)
190
+ SystemExit: 1
191
+ >>> a
192
+ 1
193
+ """ ), python_args = ['-i' , f .name ], ignore_preamble = False )
0 commit comments