Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 53 additions & 10 deletions llvm/utils/lit/lit/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
import lit.util
import lit.worker

# Windows has a limit of 60 workers per pool.
# This is defined in the multiprocessing module implementation.
# See: https://github.com/python/cpython/blob/6bc65c30ff1fd0b581a2c93416496fc720bc442c/Lib/concurrent/futures/process.py#L669-L672
WINDOWS_MAX_WORKERS_PER_POOL = 60


def _ceilDiv(a, b):
return (a + b - 1) // b

class MaxFailuresError(Exception):
pass
Expand Down Expand Up @@ -72,25 +80,60 @@ def _execute(self, deadline):
if v is not None
}

pool = multiprocessing.Pool(
self.workers, lit.worker.initialize, (self.lit_config, semaphores)
# Windows has a limit of 60 workers per pool, so we need to use multiple pools
# if we have more workers requested than the limit.
max_workers_per_pool = (
WINDOWS_MAX_WORKERS_PER_POOL if os.name == "nt" else self.workers
)
num_pools = max(1, _ceilDiv(self.workers, max_workers_per_pool))

async_results = [
pool.apply_async(
lit.worker.execute, args=[test], callback=self.progress_callback
# Distribute self.workers across num_pools as evenly as possible
workers_per_pool_list = [self.workers // num_pools] * num_pools
for pool_idx in range(self.workers % num_pools):
workers_per_pool_list[pool_idx] += 1

if num_pools > 1:
self.lit_config.note(
"Using %d pools balancing %d workers total distributed as %s (Windows worker limit workaround)"
% (num_pools, self.workers, workers_per_pool_list)
)
for test in self.tests
]
pool.close()

# Create multiple pools
pools = []
for pool_size in workers_per_pool_list:
pool = multiprocessing.Pool(
pool_size, lit.worker.initialize, (self.lit_config, semaphores)
)
pools.append(pool)

# Distribute tests across pools
tests_per_pool = _ceilDiv(len(self.tests), num_pools)
async_results = []

for pool_idx, pool in enumerate(pools):
start_idx = pool_idx * tests_per_pool
end_idx = min(start_idx + tests_per_pool, len(self.tests))
for test in self.tests[start_idx:end_idx]:
ar = pool.apply_async(
lit.worker.execute, args=[test], callback=self.progress_callback
)
async_results.append(ar)

# Close all pools
for pool in pools:
pool.close()

try:
self._wait_for(async_results, deadline)
except:
pool.terminate()
# Terminate all pools on exception
for pool in pools:
pool.terminate()
raise
finally:
pool.join()
# Join all pools
for pool in pools:
pool.join()

def _wait_for(self, async_results, deadline):
timeout = deadline - time.time()
Expand Down
5 changes: 0 additions & 5 deletions llvm/utils/lit/lit/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@ def usable_core_count():
except AttributeError:
n = os.cpu_count() or 1

# On Windows with more than 60 processes, multiprocessing's call to
# _winapi.WaitForMultipleObjects() prints an error and lit hangs.
if platform.system() == "Windows":
return min(n, 60)

return n

def abs_path_preserve_drive(path):
Expand Down
Loading