|
7 | 7 | import lit.util |
8 | 8 | import lit.worker |
9 | 9 |
|
| 10 | +# Windows has a limit of 60 workers per pool. |
| 11 | +# This is defined in the multiprocessing module implementation. |
| 12 | +# See: https://github.com/python/cpython/blob/6bc65c30ff1fd0b581a2c93416496fc720bc442c/Lib/concurrent/futures/process.py#L669-L672 |
| 13 | +WINDOWS_MAX_WORKERS_PER_POOL = 60 |
| 14 | + |
| 15 | + |
| 16 | +def _ceilDiv(a, b): |
| 17 | + return (a + b - 1) // b |
10 | 18 |
|
11 | 19 | class MaxFailuresError(Exception): |
12 | 20 | pass |
@@ -73,37 +81,39 @@ def _execute(self, deadline): |
73 | 81 | } |
74 | 82 |
|
75 | 83 | # Windows has a limit of 60 workers per pool, so we need to use multiple pools |
76 | | - # if we have more than 60 workers requested |
77 | | - max_workers_per_pool = 60 if os.name == "nt" else self.workers |
78 | | - num_pools = max( |
79 | | - 1, (self.workers + max_workers_per_pool - 1) // max_workers_per_pool |
| 84 | + # if we have more workers requested than the limit. |
| 85 | + max_workers_per_pool = ( |
| 86 | + WINDOWS_MAX_WORKERS_PER_POOL if os.name == "nt" else self.workers |
80 | 87 | ) |
81 | | - workers_per_pool = min(self.workers, max_workers_per_pool) |
| 88 | + num_pools = max(1, _ceilDiv(self.workers, max_workers_per_pool)) |
| 89 | + |
| 90 | + # Distribute self.workers across num_pools as evenly as possible |
| 91 | + workers_per_pool_list = [self.workers // num_pools] * num_pools |
| 92 | + for pool_idx in range(self.workers % num_pools): |
| 93 | + workers_per_pool_list[pool_idx] += 1 |
82 | 94 |
|
83 | 95 | if num_pools > 1: |
84 | 96 | self.lit_config.note( |
85 | | - "Using %d pools with %d workers each (Windows worker limit workaround)" |
86 | | - % (num_pools, workers_per_pool) |
| 97 | + "Using %d pools balancing %d workers total distributed as %s (Windows worker limit workaround)" |
| 98 | + % (num_pools, self.workers, workers_per_pool_list) |
87 | 99 | ) |
88 | 100 |
|
89 | 101 | # Create multiple pools |
90 | 102 | pools = [] |
91 | | - for i in range(num_pools): |
| 103 | + for pool_size in workers_per_pool_list: |
92 | 104 | pool = multiprocessing.Pool( |
93 | | - workers_per_pool, lit.worker.initialize, (self.lit_config, semaphores) |
| 105 | + pool_size, lit.worker.initialize, (self.lit_config, semaphores) |
94 | 106 | ) |
95 | 107 | pools.append(pool) |
96 | 108 |
|
97 | 109 | # Distribute tests across pools |
98 | | - tests_per_pool = (len(self.tests) + num_pools - 1) // num_pools |
| 110 | + tests_per_pool = _ceilDiv(len(self.tests), num_pools) |
99 | 111 | async_results = [] |
100 | 112 |
|
101 | 113 | for pool_idx, pool in enumerate(pools): |
102 | 114 | start_idx = pool_idx * tests_per_pool |
103 | 115 | end_idx = min(start_idx + tests_per_pool, len(self.tests)) |
104 | | - pool_tests = self.tests[start_idx:end_idx] |
105 | | - |
106 | | - for test in pool_tests: |
| 116 | + for test in self.tests[start_idx:end_idx]: |
107 | 117 | ar = pool.apply_async( |
108 | 118 | lit.worker.execute, args=[test], callback=self.progress_callback |
109 | 119 | ) |
|
0 commit comments