|
7 | 7 | import lit.util |
8 | 8 | import lit.worker |
9 | 9 |
|
| 10 | +# Windows has a limit of 60 workers per pool. |
| 11 | +# This is defined in the multiprocessing module implementation. |
| 12 | +# See: https://github.com/python/cpython/blob/6bc65c30ff1fd0b581a2c93416496fc720bc442c/Lib/concurrent/futures/process.py#L669-L672 |
| 13 | +WINDOWS_MAX_WORKERS_PER_POOL = 60 |
| 14 | + |
| 15 | + |
| 16 | +def _ceilDiv(a, b): |
| 17 | + return (a + b - 1) // b |
10 | 18 |
|
11 | 19 | class MaxFailuresError(Exception): |
12 | 20 | pass |
@@ -72,25 +80,65 @@ def _execute(self, deadline): |
72 | 80 | if v is not None |
73 | 81 | } |
74 | 82 |
|
75 | | - pool = multiprocessing.Pool( |
76 | | - self.workers, lit.worker.initialize, (self.lit_config, semaphores) |
| 83 | + # Windows has a limit of 60 workers per pool, so we need to use multiple pools |
| 84 | + # if we have more workers requested than the limit. |
| 85 | + # Also, allow to override the limit with the LIT_WINDOWS_MAX_WORKERS_PER_POOL environment variable. |
| 86 | + max_workers_per_pool = ( |
| 87 | + WINDOWS_MAX_WORKERS_PER_POOL if os.name == "nt" else self.workers |
| 88 | + ) |
| 89 | + max_workers_per_pool = int( |
| 90 | + os.getenv("LIT_WINDOWS_MAX_WORKERS_PER_POOL", max_workers_per_pool) |
77 | 91 | ) |
78 | 92 |
|
79 | | - async_results = [ |
80 | | - pool.apply_async( |
81 | | - lit.worker.execute, args=[test], callback=self.progress_callback |
| 93 | + num_pools = max(1, _ceilDiv(self.workers, max_workers_per_pool)) |
| 94 | + |
| 95 | + # Distribute self.workers across num_pools as evenly as possible |
| 96 | + workers_per_pool_list = [self.workers // num_pools] * num_pools |
| 97 | + for pool_idx in range(self.workers % num_pools): |
| 98 | + workers_per_pool_list[pool_idx] += 1 |
| 99 | + |
| 100 | + if num_pools > 1: |
| 101 | + self.lit_config.note( |
| 102 | + "Using %d pools balancing %d workers total distributed as %s (Windows worker limit workaround)" |
| 103 | + % (num_pools, self.workers, workers_per_pool_list) |
82 | 104 | ) |
83 | | - for test in self.tests |
84 | | - ] |
85 | | - pool.close() |
| 105 | + |
| 106 | + # Create multiple pools |
| 107 | + pools = [] |
| 108 | + for pool_size in workers_per_pool_list: |
| 109 | + pool = multiprocessing.Pool( |
| 110 | + pool_size, lit.worker.initialize, (self.lit_config, semaphores) |
| 111 | + ) |
| 112 | + pools.append(pool) |
| 113 | + |
| 114 | + # Distribute tests across pools |
| 115 | + tests_per_pool = _ceilDiv(len(self.tests), num_pools) |
| 116 | + async_results = [] |
| 117 | + |
| 118 | + for pool_idx, pool in enumerate(pools): |
| 119 | + start_idx = pool_idx * tests_per_pool |
| 120 | + end_idx = min(start_idx + tests_per_pool, len(self.tests)) |
| 121 | + for test in self.tests[start_idx:end_idx]: |
| 122 | + ar = pool.apply_async( |
| 123 | + lit.worker.execute, args=[test], callback=self.progress_callback |
| 124 | + ) |
| 125 | + async_results.append(ar) |
| 126 | + |
| 127 | + # Close all pools |
| 128 | + for pool in pools: |
| 129 | + pool.close() |
86 | 130 |
|
87 | 131 | try: |
88 | 132 | self._wait_for(async_results, deadline) |
89 | 133 | except: |
90 | | - pool.terminate() |
| 134 | + # Terminate all pools on exception |
| 135 | + for pool in pools: |
| 136 | + pool.terminate() |
91 | 137 | raise |
92 | 138 | finally: |
93 | | - pool.join() |
| 139 | + # Join all pools |
| 140 | + for pool in pools: |
| 141 | + pool.join() |
94 | 142 |
|
95 | 143 | def _wait_for(self, async_results, deadline): |
96 | 144 | timeout = deadline - time.time() |
|
0 commit comments