41
41
import enum
42
42
import fnmatch
43
43
import json
44
- import math
45
44
import os
46
45
import pickle
47
46
import platform
60
59
import typing
61
60
import unittest
62
61
import unittest .loader
62
+ import urllib .request
63
63
from abc import abstractmethod
64
64
from collections import defaultdict
65
65
from dataclasses import dataclass , field
81
81
CURRENT_PLATFORM = f'{ sys .platform } -{ platform .machine ()} '
82
82
CURRENT_PLATFORM_KEYS = frozenset ({CURRENT_PLATFORM })
83
83
84
+ RUNNER_ENV = {}
85
+ DISABLE_JIT_ENV = {'GRAAL_PYTHON_VM_ARGS' : '--experimental-options --engine.Compilation=false' }
86
+
87
+ # We leave the JIT enabled for the tests themselves, but disable it for subprocesses
88
+ # noinspection PyUnresolvedReferences
89
+ if IS_GRAALPY and __graalpython__ .is_native and 'GRAAL_PYTHON_VM_ARGS' not in os .environ :
90
+ try :
91
+ subprocess .check_output ([sys .executable , '--version' ], env = {** os .environ , ** DISABLE_JIT_ENV })
92
+ RUNNER_ENV = DISABLE_JIT_ENV
93
+ except subprocess .CalledProcessError :
94
+ pass
95
+
84
96
85
97
class Logger :
86
98
report_incomplete = sys .stdout .isatty ()
@@ -335,16 +347,16 @@ def __init__(self, *, failfast: bool, report_durations: int | None):
335
347
self .total_duration = 0.0
336
348
337
349
@staticmethod
338
- def report_start (test_id : TestId ):
339
- log (f"{ test_id } ... " , incomplete = True )
350
+ def report_start (test_id : TestId , prefix = '' ):
351
+ log (f"{ prefix } { test_id } ... " , incomplete = True )
340
352
341
- def report_result (self , result : TestResult ):
353
+ def report_result (self , result : TestResult , prefix = '' ):
342
354
self .results .append (result )
343
355
message = f"{ result .test_id } ... { result .status } "
344
356
if result .status == TestStatus .SKIPPED and result .param :
345
- message = f"{ message } { result .param !r} "
357
+ message = f"{ prefix } { message } { result .param !r} "
346
358
else :
347
- message = f"{ message } ({ result .duration :.2f} s)"
359
+ message = f"{ prefix } { message } ({ result .duration :.2f} s)"
348
360
log (message )
349
361
350
362
def tests_failed (self ):
@@ -532,10 +544,10 @@ def __init__(self, *, num_processes, subprocess_args, separate_workers, timeout_
532
544
self .crashes = []
533
545
self .default_test_timeout = 600
534
546
535
- def report_result (self , result : TestResult ):
547
+ def report_result (self , result : TestResult , prefix = '' ):
536
548
if self .failfast and result .status in FAILED_STATES :
537
549
self .stop_event .set ()
538
- super ().report_result (result )
550
+ super ().report_result (result , prefix = prefix )
539
551
540
552
def tests_failed (self ):
541
553
return super ().tests_failed () or bool (self .crashes )
@@ -550,10 +562,36 @@ def partition_tests_into_processes(self, suites: list['TestSuite']) -> list[list
550
562
lambda suite : suite .test_file .config .new_worker_per_file ,
551
563
)
552
564
partitions = [suite .collected_tests for suite in per_file_suites ]
553
- per_partition = int (math .ceil (len (unpartitioned ) / max (1 , self .num_processes )))
554
- while unpartitioned :
555
- partitions .append ([test for suite in unpartitioned [:per_partition ] for test in suite .collected_tests ])
556
- unpartitioned = unpartitioned [per_partition :]
565
+
566
+ # Use timings if available to partition unpartitioned optimally
567
+ timings = {}
568
+ if unpartitioned and self .num_processes :
569
+ configdir = unpartitioned [0 ].test_file .config .configdir if unpartitioned else None
570
+ if configdir :
571
+ timing_path = configdir / f"timings-{ sys .platform .lower ()} .json"
572
+ if timing_path .exists ():
573
+ with open (timing_path , "r" , encoding = "utf-8" ) as f :
574
+ timings = json .load (f )
575
+
576
+ timed_files = []
577
+ for suite in unpartitioned :
578
+ file_path = str (suite .test_file .path ).replace ("\\ " , "/" )
579
+ total = timings .get (file_path , 20.0 )
580
+ timed_files .append ((total , suite ))
581
+
582
+ # Sort descending by expected time
583
+ timed_files .sort (reverse = True , key = lambda x : x [0 ])
584
+
585
+ # Greedily assign to balance by timing sum
586
+ process_loads = [[] for _ in range (self .num_processes )]
587
+ process_times = [0.0 ] * self .num_processes
588
+ for t , suite in timed_files :
589
+ i = process_times .index (min (process_times ))
590
+ process_loads [i ].append (suite )
591
+ process_times [i ] += t
592
+ for group in process_loads :
593
+ partitions .append ([test for suite in group for test in suite .collected_tests ])
594
+
557
595
return partitions
558
596
559
597
def run_tests (self , tests : list ['TestSuite' ]):
@@ -582,7 +620,7 @@ def run_tests(self, tests: list['TestSuite']):
582
620
log (crash )
583
621
584
622
def run_partitions_in_subprocesses (self , executor , partitions : list [list ['Test' ]]):
585
- workers = [SubprocessWorker (self , partition ) for i , partition in enumerate (partitions )]
623
+ workers = [SubprocessWorker (i , self , partition ) for i , partition in enumerate (partitions )]
586
624
futures = [executor .submit (worker .run_in_subprocess_and_watch ) for worker in workers ]
587
625
588
626
def dump_worker_status ():
@@ -626,7 +664,8 @@ def sigterm_handler(_signum, _frame):
626
664
627
665
628
666
class SubprocessWorker :
629
- def __init__ (self , runner : ParallelTestRunner , tests : list ['Test' ]):
667
+ def __init__ (self , worker_id : int , runner : ParallelTestRunner , tests : list ['Test' ]):
668
+ self .prefix = f'[worker-{ worker_id + 1 } ] '
630
669
self .runner = runner
631
670
self .stop_event = runner .stop_event
632
671
self .lock = threading .RLock ()
@@ -649,7 +688,7 @@ def process_event(self, event):
649
688
except ValueError :
650
689
# It executed something we didn't ask for. Not sure why this happens
651
690
log (f'WARNING: unexpected test started { test_id } ' )
652
- self .runner .report_start (test_id )
691
+ self .runner .report_start (test_id , prefix = self . prefix )
653
692
with self .lock :
654
693
self .last_started_test_id = test_id
655
694
self .last_started_time = time .time ()
@@ -668,7 +707,7 @@ def process_event(self, event):
668
707
output = test_output ,
669
708
duration = event .get ('duration' ),
670
709
)
671
- self .runner .report_result (result )
710
+ self .runner .report_result (result , prefix = self . prefix )
672
711
with self .lock :
673
712
self .last_started_test_id = None
674
713
self .last_started_time = time .time () # Starts timeout for the following teardown/setup
@@ -820,7 +859,7 @@ def run_in_subprocess_and_watch(self):
820
859
param = message ,
821
860
output = output ,
822
861
duration = (time .time () - self .last_started_time ),
823
- ))
862
+ ), prefix = self . prefix )
824
863
if blame_id is not self .last_started_test_id :
825
864
# If we're here, it means we didn't know exactly which test we were executing, we were
826
865
# somewhere in between
@@ -899,6 +938,7 @@ def parse_config(cls, config_path: Path):
899
938
if config_tags_dir := settings .get ('tags_dir' ):
900
939
tags_dir = (config_path .parent / config_tags_dir ).resolve ()
901
940
# Temporary hack for Bytecode DSL development in master branch:
941
+ # noinspection PyUnresolvedReferences
902
942
if IS_GRAALPY and getattr (__graalpython__ , 'is_bytecode_dsl_interpreter' , False ) and tags_dir :
903
943
new_tags_dir = (config_path .parent / (config_tags_dir + '_bytecode_dsl' )).resolve ()
904
944
if new_tags_dir .exists ():
@@ -972,6 +1012,7 @@ class TestSuite:
972
1012
collected_tests : list ['Test' ]
973
1013
974
1014
def run (self , result ):
1015
+ os .environ .update (RUNNER_ENV )
975
1016
saved_path = sys .path [:]
976
1017
sys .path [:] = self .pythonpath
977
1018
try :
@@ -1299,6 +1340,31 @@ def get_bool_env(name: str):
1299
1340
return os .environ .get (name , '' ).lower () in ('true' , '1' )
1300
1341
1301
1342
1343
+ def main_extract_test_timings (args ):
1344
+ """
1345
+ Fetches a test log from the given URL, extracts per-file test timings, and writes the output as JSON.
1346
+ """
1347
+
1348
+ # Download the log file
1349
+ with urllib .request .urlopen (args .url ) as response :
1350
+ log_content = response .read ().decode ("utf-8" , errors = "replace" )
1351
+
1352
+ pattern = re .compile (
1353
+ r"^(?P<path>[^\s:]+)::\S+ +\.\.\. (?:ok|FAIL|ERROR|SKIPPED|expected failure|unexpected success|\S+) \((?P<time>[\d.]+)s\)" ,
1354
+ re .MULTILINE ,
1355
+ )
1356
+
1357
+ timings = {}
1358
+ for match in pattern .finditer (log_content ):
1359
+ raw_path = match .group ("path" ).replace ("\\ " , "/" )
1360
+ t = float (match .group ("time" ))
1361
+ timings .setdefault (raw_path , 0.0 )
1362
+ timings [raw_path ] += t
1363
+
1364
+ with open (args .output , "w" , encoding = "utf-8" ) as f :
1365
+ json .dump (timings , f , indent = 2 , sort_keys = True )
1366
+
1367
+
1302
1368
def main ():
1303
1369
is_mx_graalpytest = get_bool_env ('MX_GRAALPYTEST' )
1304
1370
parent_parser = argparse .ArgumentParser (formatter_class = argparse .RawTextHelpFormatter )
@@ -1428,6 +1494,20 @@ def main():
1428
1494
merge_tags_parser .add_argument ('report_path' )
1429
1495
1430
1496
# run the appropriate command
1497
+
1498
+ # extract-test-timings command declaration
1499
+ extract_parser = subparsers .add_parser (
1500
+ "extract-test-timings" ,
1501
+ help = "Extract per-file test timings from a test log URL and write them as JSON"
1502
+ )
1503
+ extract_parser .add_argument (
1504
+ "url" , help = "URL of the test log file"
1505
+ )
1506
+ extract_parser .add_argument (
1507
+ "output" , help = "Output JSON file for per-file timings"
1508
+ )
1509
+ extract_parser .set_defaults (main = main_extract_test_timings )
1510
+
1431
1511
args = parent_parser .parse_args ()
1432
1512
args .main (args )
1433
1513
0 commit comments