Skip to content

Commit 5c5991d

Browse files
authored
Merge pull request #236 from kaczmarj/enh/nproc
[ENH] set `n_proc` to max available by default in `ConcurrentFuturesWorker`
2 parents c9c967e + 7aef2fd commit 5c5991d

File tree

3 files changed

+58
-3
lines changed

3 files changed

+58
-3
lines changed

pydra/engine/helpers.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,3 +513,31 @@ def output_from_inputfields(output_spec, inputs):
513513
(field_name, attr.ib(type=File, metadata={"value": value}))
514514
)
515515
return output_spec
516+
517+
518+
def get_available_cpus():
519+
"""
520+
Return the number of CPUs available to the current process or, if that is not
521+
available, the total number of CPUs on the system.
522+
523+
Returns
524+
-------
525+
n_proc : :obj:`int`
526+
The number of available CPUs.
527+
"""
528+
# Will not work on some systems or if psutil is not installed.
529+
# See https://psutil.readthedocs.io/en/latest/#psutil.Process.cpu_affinity
530+
try:
531+
import psutil
532+
533+
return len(psutil.Process().cpu_affinity())
534+
except (AttributeError, ImportError, NotImplementedError):
535+
pass
536+
537+
# Not available on all systems, including macOS.
538+
# See https://docs.python.org/3/library/os.html#os.sched_getaffinity
539+
if hasattr(os, "sched_getaffinity"):
540+
return len(os.sched_getaffinity(0))
541+
542+
# Last resort
543+
return os.cpu_count()

pydra/engine/tests/test_helpers.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
1+
import os
12
from pathlib import Path
3+
import platform
24

35
import pytest
46
import cloudpickle as cp
57

68
from .utils import multiply
7-
from ..helpers import hash_value, hash_function, save, create_pyscript
9+
from ..helpers import (
10+
hash_value,
11+
hash_function,
12+
get_available_cpus,
13+
save,
14+
create_pyscript,
15+
)
816
from .. import helpers_file
917
from ..specs import File, Directory
1018

@@ -155,3 +163,22 @@ def test_hash_value_nested(tmpdir):
155163
[file_1, [file_2, file_3]], tp=File
156164
)
157165
assert hash_value(tmpdir, tp=Directory) == helpers_file.hash_dir(tmpdir)
166+
167+
168+
def test_get_available_cpus():
169+
assert get_available_cpus() > 0
170+
try:
171+
import psutil
172+
173+
has_psutil = True
174+
except ImportError:
175+
has_psutil = False
176+
177+
if hasattr(os, "sched_getaffinity"):
178+
assert get_available_cpus() == len(os.sched_getaffinity(0))
179+
180+
if has_psutil and platform.system().lower() != "darwin":
181+
assert get_available_cpus() == len(psutil.Process().cpu_affinity())
182+
183+
if platform.system().lower() == "darwin":
184+
assert get_available_cpus() == os.cpu_count()

pydra/engine/workers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import concurrent.futures as cf
99

10-
from .helpers import create_pyscript, read_and_display_async, save
10+
from .helpers import create_pyscript, get_available_cpus, read_and_display_async, save
1111

1212
import logging
1313

@@ -164,7 +164,7 @@ class ConcurrentFuturesWorker(Worker):
164164
def __init__(self, n_procs=None):
165165
"""Initialize Worker."""
166166
super(ConcurrentFuturesWorker, self).__init__()
167-
self.n_procs = n_procs
167+
self.n_procs = get_available_cpus() if n_procs is None else n_procs
168168
# added cpu_count to verify, remove once confident and let PPE handle
169169
self.pool = cf.ProcessPoolExecutor(self.n_procs)
170170
# self.loop = asyncio.get_event_loop()

0 commit comments

Comments
 (0)