Skip to content

Commit 16c1487

Browse files
committed
Dump status information on SIGTERM or SIGINT
1 parent 2e02467 commit 16c1487

File tree

1 file changed

+127
-73
lines changed
  • graalpython/com.oracle.graal.python.test/src

1 file changed

+127
-73
lines changed

graalpython/com.oracle.graal.python.test/src/runner.py

Lines changed: 127 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class Logger:
7575
report_incomplete = sys.stdout.isatty()
7676

7777
def __init__(self):
78-
self.lock = threading.Lock()
78+
self.lock = threading.RLock()
7979
self.incomplete_line = None
8080

8181
def log(self, msg='', incomplete=False):
@@ -92,7 +92,8 @@ def log(self, msg='', incomplete=False):
9292
self.incomplete_line = msg if incomplete else None
9393

9494

95-
log = Logger().log
95+
logger = Logger()
96+
log = logger.log
9697

9798

9899
class TestStatus(enum.StrEnum):
@@ -504,27 +505,105 @@ def run_tests(self, tests: list['TestSuite']):
504505
log(crash)
505506

506507
def run_partitions_in_subprocesses(self, executor, partitions: list[list['Test']]):
507-
futures = [
508-
executor.submit(self.run_in_subprocess_and_watch, partition)
509-
for partition in partitions
510-
]
508+
workers = [SubprocessWorker(self, partition) for i, partition in enumerate(partitions)]
509+
futures = [executor.submit(worker.run_in_subprocess_and_watch) for worker in workers]
510+
511+
def dump_worker_status():
512+
with logger.lock:
513+
for i, partition in enumerate(partitions):
514+
not_started = 0
515+
if futures[i].running():
516+
log(f"Worker on {workers[i].thread}: {workers[i].get_status()}")
517+
elif not futures[i].done():
518+
not_started += len(partition)
519+
if not_started:
520+
log(f"There are {not_started} tests not assigned to any worker")
521+
511522
try:
512-
concurrent.futures.wait(futures)
513-
for future in futures:
514-
future.result()
515-
except KeyboardInterrupt:
516-
self.stop_event.set()
517-
concurrent.futures.wait(futures)
518-
print("Interrupted!")
519-
sys.exit(1)
523+
def sigterm_handler(_signum, _frame):
524+
dump_worker_status()
525+
# noinspection PyUnresolvedReferences,PyProtectedMember
526+
os._exit(1)
520527

521-
def run_in_subprocess_and_watch(self, tests: list['Test']):
528+
prev_sigterm = signal.signal(signal.SIGTERM, sigterm_handler)
529+
except Exception:
530+
prev_sigterm = None
531+
532+
try:
533+
try:
534+
concurrent.futures.wait(futures)
535+
for future in futures:
536+
future.result()
537+
except KeyboardInterrupt:
538+
log("Received keyboard interrupt, stopping")
539+
dump_worker_status()
540+
self.stop_event.set()
541+
concurrent.futures.wait(futures)
542+
sys.exit(1)
543+
finally:
544+
if prev_sigterm:
545+
signal.signal(signal.SIGTERM, prev_sigterm)
546+
547+
548+
class SubprocessWorker:
549+
def __init__(self, runner: ParallelTestRunner, tests: list['Test']):
550+
self.runner = runner
551+
self.stop_event = runner.stop_event
552+
self.remaining_test_ids = [test.test_id for test in tests]
553+
self.tests_by_id = {test.test_id: test for test in tests}
554+
self.out_file: typing.TextIO | None = None
555+
self.last_started_test: Test | None = None
556+
self.last_started_time: float | None = None
557+
self.last_out_pos = 0
558+
self.process: subprocess.Popen | None = None
559+
self.thread = None
560+
561+
def process_event(self, event):
562+
match event['event']:
563+
case 'testStarted':
564+
self.remaining_test_ids.remove(event['test'])
565+
self.runner.report_start(event['test'])
566+
self.last_started_test = self.tests_by_id[event['test']]
567+
self.last_started_time = time.time()
568+
self.last_out_pos = event['out_pos']
569+
case 'testResult':
570+
out_end = event['out_pos']
571+
test_output = ''
572+
if self.last_out_pos != out_end:
573+
self.out_file.seek(self.last_out_pos)
574+
test_output = self.out_file.read(out_end - self.last_out_pos)
575+
result = TestResult(
576+
test_id=event['test'],
577+
status=event['status'],
578+
param=event.get('param'),
579+
output=test_output,
580+
duration=event.get('duration'),
581+
)
582+
self.runner.report_result(result)
583+
self.last_started_test = None
584+
self.last_started_time = None
585+
self.last_out_pos = event['out_pos']
586+
587+
def get_status(self):
588+
if not self.process:
589+
process_status = "not started"
590+
elif self.process.poll() is not None:
591+
process_status = f"exitted with code {self.process.returncode}"
592+
else:
593+
process_status = "running"
594+
595+
if self.last_started_test is not None:
596+
duration = time.time() - self.last_started_time
597+
test_status = f"executing {self.last_started_test} for {duration:.2f}s"
598+
else:
599+
test_status = "no current test"
600+
remaining = len(self.remaining_test_ids)
601+
return f"test: {test_status}; remaining: {remaining}; process status: {process_status}"
602+
603+
def run_in_subprocess_and_watch(self):
604+
self.thread = threading.current_thread()
522605
# noinspection PyUnresolvedReferences
523606
use_pipe = sys.platform != 'win32' and (not IS_GRAALPY or __graalpython__.posix_module_backend() == 'native')
524-
tests_by_id = {test.test_id: test for test in tests}
525-
remaining_test_ids = [test.test_id for test in tests]
526-
last_started_test: Test | None = None
527-
last_started_time: float | None = None
528607
with tempfile.TemporaryDirectory(prefix='graalpytest-') as tmp_dir:
529608
tmp_dir = Path(tmp_dir)
530609
env = os.environ.copy()
@@ -535,106 +614,79 @@ def run_in_subprocess_and_watch(self, tests: list['Test']):
535614
else:
536615
result_file = tmp_dir / 'result'
537616

538-
def process_event(event):
539-
nonlocal remaining_test_ids, last_started_test, last_started_time, last_out_pos
540-
match event['event']:
541-
case 'testStarted':
542-
remaining_test_ids.remove(event['test'])
543-
self.report_start(event['test'])
544-
last_started_test = tests_by_id[event['test']]
545-
last_started_time = time.time()
546-
last_out_pos = event['out_pos']
547-
case 'testResult':
548-
out_end = event['out_pos']
549-
test_output = ''
550-
if last_out_pos != out_end:
551-
out_file.seek(last_out_pos)
552-
test_output = out_file.read(out_end - last_out_pos)
553-
result = TestResult(
554-
test_id=event['test'],
555-
status=event['status'],
556-
param=event.get('param'),
557-
output=test_output,
558-
duration=event.get('duration'),
559-
)
560-
self.report_result(result)
561-
last_started_test = None
562-
last_started_time = None
563-
last_out_pos = event['out_pos']
564-
565-
while remaining_test_ids and not self.stop_event.is_set():
617+
while self.remaining_test_ids and not self.stop_event.is_set():
566618
with (
567-
open(tmp_dir / 'out', 'w+') as out_file,
619+
open(tmp_dir / 'out', 'w+') as self.out_file,
568620
open(tmp_dir / 'tests', 'w+') as tests_file,
569621
):
570-
last_out_pos = 0
622+
self.last_out_pos = 0
571623
cmd = [
572624
sys.executable,
573625
'-u',
574-
*self.subprocess_args,
626+
*self.runner.subprocess_args,
575627
__file__,
576628
'--tests-file', str(tests_file.name),
577629
]
578630
if use_pipe:
579631
cmd += ['--pipe-fd', str(child_pipe.fileno())]
580632
else:
581633
cmd += ['--result-file', str(result_file)]
582-
if self.failfast:
634+
if self.runner.failfast:
583635
cmd.append('--failfast')
584636
# We communicate the tests through a temp file to avoid running into too long commandlines on windows
585637
tests_file.seek(0)
586638
tests_file.truncate()
587-
tests_file.write('\n'.join(map(str, remaining_test_ids)))
639+
tests_file.write('\n'.join(map(str, self.remaining_test_ids)))
588640
tests_file.flush()
589641
popen_kwargs: dict = dict(
590-
stdout=out_file,
591-
stderr=out_file,
642+
stdout=self.out_file,
643+
stderr=self.out_file,
592644
env=env,
593645
)
594646
if use_pipe:
595647
popen_kwargs.update(pass_fds=[child_pipe.fileno()])
596-
process = subprocess.Popen(cmd, **popen_kwargs)
648+
self.process = subprocess.Popen(cmd, **popen_kwargs)
597649

598650
timed_out = False
599651

600652
if use_pipe:
601-
while process.poll() is None:
653+
while self.process.poll() is None:
602654
while pipe.poll(0.1):
603-
process_event(pipe.recv())
655+
self.process_event(pipe.recv())
604656
if self.stop_event.is_set():
605-
interrupt_process(process)
657+
interrupt_process(self.process)
606658
break
607-
if last_started_test is not None:
608-
timeout = last_started_test.test_file.test_config.per_test_timeout
609-
if time.time() - last_started_time >= timeout:
610-
interrupt_process(process)
659+
if self.last_started_test is not None:
660+
timeout = self.last_started_test.test_file.test_config.per_test_timeout
661+
if time.time() - self.last_started_time >= timeout:
662+
interrupt_process(self.process)
611663
timed_out = True
612664
# Drain the pipe
613665
while pipe.poll(0.1):
614666
pipe.recv()
615667
break
616668
try:
617-
returncode = process.wait(60)
669+
returncode = self.process.wait(60)
618670
except subprocess.TimeoutExpired:
619671
log("Warning: Worker didn't shutdown in a timely manner, interrupting it")
620-
interrupt_process(process)
672+
interrupt_process(self.process)
621673

622-
process.wait()
674+
self.process.wait()
623675

624676
if self.stop_event.is_set():
625677
return
626678
if use_pipe:
627679
while pipe.poll(0.1):
628-
process_event(pipe.recv())
680+
self.process_event(pipe.recv())
629681
else:
630682
with open(result_file, 'rb') as f:
631683
for file_event in pickle.load(f):
632-
process_event(file_event)
684+
self.process_event(file_event)
633685

634686
if returncode != 0 or timed_out:
635-
out_file.seek(last_out_pos)
636-
output = out_file.read()
637-
if last_started_test:
687+
self.out_file.seek(self.last_out_pos)
688+
output = self.out_file.read()
689+
if self.last_started_test:
638690
if timed_out:
639691
message = "Timed out"
640692
elif returncode >= 0:
@@ -645,16 +697,16 @@ def process_event(event):
645697
except ValueError:
646698
signal_name = str(-returncode)
647699
message = f"Test process killed by signal {signal_name}"
648-
self.report_result(TestResult(
649-
test_id=last_started_test.test_id,
700+
self.runner.report_result(TestResult(
701+
test_id=self.last_started_test.test_id,
650702
status=TestStatus.ERROR,
651703
param=message,
652704
output=output,
653705
))
654706
continue
655707
else:
656708
# Crashed outside of tests, don't retry
657-
self.crashes.append(output or 'Runner subprocess crashed')
709+
self.runner.crashes.append(output or 'Runner subprocess crashed')
658710
return
659711

660712

@@ -1114,6 +1166,8 @@ def main():
11141166
print(test)
11151167
return
11161168

1169+
log(f"Collected {sum(len(test_suite.collected_tests) for test_suite in tests)} tests")
1170+
11171171
if not tests:
11181172
sys.exit("No tests matched\n")
11191173

0 commit comments

Comments
 (0)