42
42
import fnmatch
43
43
import json
44
44
import math
45
- import multiprocessing
46
45
import os
47
46
import pickle
48
47
import platform
49
48
import re
49
+ import select
50
50
import shlex
51
51
import signal
52
+ import socket
52
53
import subprocess
53
54
import sys
54
55
import tempfile
@@ -205,7 +206,7 @@ def out_tell():
205
206
try :
206
207
return os .lseek (1 , 0 , os .SEEK_CUR )
207
208
except OSError :
208
- return - 1
209
+ return 0
209
210
210
211
211
212
T = typing .TypeVar ('T' )
@@ -303,7 +304,7 @@ def startTest(self, test):
303
304
)
304
305
305
306
306
- class PipeResult (AbstractRemoteResult ):
307
+ class ConnectionResult (AbstractRemoteResult ):
307
308
def __init__ (self , test_suite : 'TestSuite' , conn ):
308
309
super ().__init__ (test_suite )
309
310
self .conn = conn
@@ -435,7 +436,7 @@ def generate_mx_report(self, path: str):
435
436
case _:
436
437
status = 'FAILED'
437
438
report_data .append ({
438
- 'name' : str (result .test_id ),
439
+ 'name' : str (result .test_id ). replace ( ' \\ ' , '/' ) ,
439
440
'status' : status ,
440
441
'duration' : result .duration ,
441
442
})
@@ -506,16 +507,18 @@ def write_tags(test_file: 'TestFile', tags: typing.Iterable['Tag']):
506
507
507
508
508
509
def interrupt_process (process : subprocess .Popen ):
509
- sig = signal .SIGINT if sys .platform != 'win32' else signal .CTRL_C_EVENT
510
- process .send_signal (sig )
510
+ if hasattr (signal , 'SIGINT' ):
511
+ try :
512
+ process .send_signal (signal .SIGINT )
513
+ process .wait (3 )
514
+ return
515
+ except (OSError , subprocess .TimeoutExpired ):
516
+ pass
517
+ process .terminate ()
511
518
try :
512
519
process .wait (3 )
513
520
except subprocess .TimeoutExpired :
514
- process .terminate ()
515
- try :
516
- process .wait (3 )
517
- except subprocess .TimeoutExpired :
518
- process .kill ()
521
+ process .kill ()
519
522
520
523
521
524
class ParallelTestRunner (TestRunner ):
@@ -715,22 +718,18 @@ def get_test_to_blame(self):
715
718
716
719
def run_in_subprocess_and_watch (self ):
717
720
self .thread = threading .current_thread ()
718
- # noinspection PyUnresolvedReferences
719
- use_pipe = sys .platform != 'win32' and (not IS_GRAALPY or __graalpython__ .posix_module_backend () == 'native' )
720
- with tempfile .TemporaryDirectory (prefix = 'graalpytest-' ) as tmp_dir :
721
+ with (
722
+ tempfile .TemporaryDirectory (prefix = 'graalpytest-' ) as tmp_dir ,
723
+ socket .create_server (('0.0.0.0' , 0 )) as server ,
724
+ ):
721
725
tmp_dir = Path (tmp_dir )
722
726
723
- if use_pipe :
724
- pipe , child_pipe = multiprocessing .Pipe ()
725
- else :
726
- result_file = tmp_dir / 'result'
727
+ port = server .getsockname ()[1 ]
728
+ assert port
727
729
728
730
while self .remaining_test_ids and not self .stop_event .is_set ():
729
731
last_remaining_count = len (self .remaining_test_ids )
730
- with (
731
- open (tmp_dir / 'out' , 'w+' ) as self .out_file ,
732
- open (tmp_dir / 'tests' , 'w+' ) as tests_file ,
733
- ):
732
+ with open (tmp_dir / 'out' , 'w+' ) as self .out_file :
734
733
self .last_out_pos = 0
735
734
self .last_started_time = time .time ()
736
735
cmd = [
@@ -739,69 +738,54 @@ def run_in_subprocess_and_watch(self):
739
738
* self .runner .subprocess_args ,
740
739
__file__ ,
741
740
'worker' ,
742
- '--tests-file ' , str (tests_file . name ),
741
+ '--port ' , str (port ),
743
742
]
744
- if use_pipe :
745
- cmd += ['--pipe-fd' , str (child_pipe .fileno ())]
746
- else :
747
- cmd += ['--result-file' , str (result_file )]
748
743
if self .runner .failfast :
749
744
cmd .append ('--failfast' )
750
- # We communicate the tests through a temp file to avoid running into too long commandlines on windows
751
- tests_file .seek (0 )
752
- tests_file .truncate ()
753
- tests_file .write ('\n ' .join (map (str , self .remaining_test_ids )))
754
- tests_file .flush ()
755
- popen_kwargs : dict = dict (
756
- stdout = self .out_file ,
757
- stderr = self .out_file ,
758
- )
759
- if use_pipe :
760
- popen_kwargs .update (pass_fds = [child_pipe .fileno ()])
761
- self .process = subprocess .Popen (cmd , ** popen_kwargs )
762
-
763
- timed_out = None
764
-
765
- if use_pipe :
766
- while self .process .poll () is None :
767
- while pipe .poll (0.1 ):
768
- self .process_event (pipe .recv ())
769
- if self .stop_event .is_set ():
770
- interrupt_process (self .process )
771
- break
772
- if self .last_started_test_id :
773
- last_started_test = self .tests_by_id .get (self .last_started_test_id )
774
- timeout = (
775
- last_started_test .test_file .test_config .per_test_timeout
776
- or self .runner .default_test_timeout
777
- )
778
- else :
779
- timeout = self .runner .default_test_timeout
780
- timeout *= self .runner .timeout_factor
781
- if time .time () - self .last_started_time >= timeout :
782
- interrupt_process (self .process )
783
- timed_out = timeout
784
- # Drain the pipe
785
- while pipe .poll (0.1 ):
786
- pipe .recv ()
787
- break
745
+ self .process = subprocess .Popen (cmd , stdout = self .out_file , stderr = self .out_file )
746
+
747
+ server .settimeout (600.0 )
748
+ with server .accept ()[0 ] as sock :
749
+ conn = Connection (sock )
750
+
751
+ conn .send ([TestSpecifier (t .test_file , t .test_name ) for t in self .remaining_test_ids ])
752
+
753
+ timed_out = None
754
+
788
755
try :
789
- self .process .wait (self .runner .default_test_timeout )
790
- except subprocess .TimeoutExpired :
791
- log ("Warning: Worker didn't shutdown in a timely manner, interrupting it" )
792
- interrupt_process (self .process )
756
+ while True :
757
+ while conn .poll (0.1 ):
758
+ event = conn .recv ()
759
+ self .process_event (event )
760
+ if self .stop_event .is_set ():
761
+ interrupt_process (self .process )
762
+ break
763
+ if self .last_started_test_id :
764
+ last_started_test = self .tests_by_id .get (self .last_started_test_id )
765
+ timeout = (
766
+ last_started_test .test_file .test_config .per_test_timeout
767
+ or self .runner .default_test_timeout
768
+ )
769
+ else :
770
+ timeout = self .runner .default_test_timeout
771
+ timeout *= self .runner .timeout_factor
772
+ if time .time () - self .last_started_time >= timeout :
773
+ interrupt_process (self .process )
774
+ timed_out = timeout
775
+ break
776
+ except (ConnectionClosed , OSError ):
777
+ # The socket closed or got connection reset, that's normal if the worker exitted or crashed
778
+ pass
779
+ try :
780
+ self .process .wait (self .runner .default_test_timeout )
781
+ except subprocess .TimeoutExpired :
782
+ log ("Warning: Worker didn't shutdown in a timely manner, interrupting it" )
783
+ interrupt_process (self .process )
793
784
794
785
returncode = self .process .wait ()
795
786
796
787
if self .stop_event .is_set ():
797
788
return
798
- if use_pipe :
799
- while pipe .poll (0.1 ):
800
- self .process_event (pipe .recv ())
801
- else :
802
- with open (result_file , 'rb' ) as f :
803
- for file_event in pickle .load (f ):
804
- self .process_event (file_event )
805
789
806
790
if returncode != 0 or timed_out is not None :
807
791
self .out_file .seek (self .last_out_pos )
@@ -1215,32 +1199,51 @@ def loadTestsFromModule(self, module, *, pattern=None):
1215
1199
return test_suite
1216
1200
1217
1201
1202
+ class ConnectionClosed (Exception ):
1203
+ pass
1204
+
1205
+
1206
+ class Connection :
1207
+ def __init__ (self , sock ):
1208
+ self .socket = sock
1209
+
1210
+ def send (self , obj ):
1211
+ data = pickle .dumps (obj )
1212
+ header = len (data ).to_bytes (8 , byteorder = 'big' )
1213
+ self .socket .sendall (header )
1214
+ self .socket .sendall (data )
1215
+
1216
+ def _recv (self , size ):
1217
+ data = b''
1218
+ while len (data ) < size :
1219
+ read = self .socket .recv (size - len (data ))
1220
+ if not read :
1221
+ return data
1222
+ data += read
1223
+ return data
1224
+
1225
+ def recv (self ):
1226
+ size = int .from_bytes (self ._recv (8 ), byteorder = 'big' )
1227
+ if not size :
1228
+ raise ConnectionClosed
1229
+ data = self ._recv (size )
1230
+ return pickle .loads (data )
1231
+
1232
+ def poll (self , timeout = None ):
1233
+ rlist , wlist , xlist = select .select ([self .socket ], [], [], timeout )
1234
+ return bool (rlist )
1235
+
1236
+
1218
1237
def main_worker (args ):
1219
- tests = []
1220
- with open (args .tests_file ) as f :
1221
- for line in f :
1222
- tests .append (TestSpecifier .from_str (line .strip ()))
1223
-
1224
- data = []
1225
- if args .pipe_fd :
1226
- import multiprocessing .connection
1227
- conn = multiprocessing .connection .Connection (args .pipe_fd )
1228
-
1229
- def result_factory (suite ):
1230
- return PipeResult (suite , conn )
1231
- else :
1232
- def result_factory (suite ):
1233
- return SimpleResult (suite , data )
1238
+ with socket .create_connection (('localhost' , args .port )) as sock :
1239
+ conn = Connection (sock )
1234
1240
1235
- for test_suite in collect (tests , no_excludes = True ):
1236
- result = result_factory (test_suite )
1237
- result .failfast = args .failfast
1238
- test_suite .run (result )
1241
+ tests = conn .recv ()
1239
1242
1240
- if args . result_file :
1241
- with open ( args . result_file , 'wb' ) as f :
1242
- # noinspection PyTypeChecker
1243
- pickle . dump ( data , f )
1243
+ for test_suite in collect ( tests , no_excludes = True ) :
1244
+ result = ConnectionResult ( test_suite , conn )
1245
+ result . failfast = args . failfast
1246
+ test_suite . run ( result )
1244
1247
1245
1248
1246
1249
def main_merge_tags (args ):
@@ -1393,10 +1396,7 @@ def main():
1393
1396
# worker command declaration
1394
1397
worker_parser = subparsers .add_parser ('worker' , help = "Internal command for subprocess workers" )
1395
1398
worker_parser .set_defaults (main = main_worker )
1396
- group = worker_parser .add_mutually_exclusive_group ()
1397
- group .add_argument ('--pipe-fd' , type = int )
1398
- group .add_argument ('--result-file' , type = Path )
1399
- worker_parser .add_argument ('--tests-file' , type = Path , required = True )
1399
+ worker_parser .add_argument ('--port' , type = int )
1400
1400
worker_parser .add_argument ('--failfast' , action = 'store_true' )
1401
1401
1402
1402
# merge-tags-from-report command declaration
0 commit comments