Skip to content

Commit 17d2622

Browse files
committed
add method pydra.helpers.get_available_cpus() to return n cpus
This method will attempt to return the number of cpus available to the current process. If those methods do not work, then `os.cpu_count` is used, which returns the number of CPUs on the machine (could be more than the cpus in the current process).
1 parent c9c967e commit 17d2622

File tree

3 files changed

+53
-3
lines changed

3 files changed

+53
-3
lines changed

pydra/engine/helpers.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,3 +513,31 @@ def output_from_inputfields(output_spec, inputs):
513513
(field_name, attr.ib(type=File, metadata={"value": value}))
514514
)
515515
return output_spec
516+
517+
518+
def get_available_cpus():
519+
"""
520+
Return the number of CPUs available to the current process or, if that is not
521+
available, the total number of CPUs on the system.
522+
523+
Returns
524+
-------
525+
n_proc : :obj:`int`
526+
The number of available CPUs.
527+
"""
528+
# Will not work on some systems or if psutil is not installed.
529+
# See https://psutil.readthedocs.io/en/latest/#psutil.Process.cpu_affinity
530+
try:
531+
import psutil
532+
533+
return len(psutil.Process().cpu_affinity())
534+
except (AttributeError, ImportError, NotImplementedError):
535+
pass
536+
537+
# Not available on all systems, including macOS.
538+
# See https://docs.python.org/3/library/os.html#os.sched_getaffinity
539+
if hasattr(os, "sched_getaffinity"):
540+
return len(os.sched_getaffinity(0))
541+
542+
# Last resort
543+
return os.cpu_count()

pydra/engine/tests/test_helpers.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
1+
import os
12
from pathlib import Path
23

34
import pytest
45
import cloudpickle as cp
56

67
from .utils import multiply
7-
from ..helpers import hash_value, hash_function, save, create_pyscript
8+
from ..helpers import (
9+
hash_value,
10+
hash_function,
11+
get_available_cpus,
12+
save,
13+
create_pyscript,
14+
)
815
from .. import helpers_file
916
from ..specs import File, Directory
1017

@@ -155,3 +162,18 @@ def test_hash_value_nested(tmpdir):
155162
[file_1, [file_2, file_3]], tp=File
156163
)
157164
assert hash_value(tmpdir, tp=Directory) == helpers_file.hash_dir(tmpdir)
165+
166+
167+
def test_get_available_cpus():
168+
assert get_available_cpus() > 0
169+
try:
170+
import psutil
171+
172+
has_psutil = True
173+
assert get_available_cpus() == len(psutil.Process().cpu_affinity())
174+
except ImportError:
175+
has_psutil = False
176+
if hasattr(os, "sched_getaffinity"):
177+
assert get_available_cpus() == len(os.sched_getaffinity(0))
178+
elif not has_psutil:
179+
assert get_available_cpus() == os.cpu_count()

pydra/engine/workers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import concurrent.futures as cf
99

10-
from .helpers import create_pyscript, read_and_display_async, save
10+
from .helpers import create_pyscript, get_available_cpus, read_and_display_async, save
1111

1212
import logging
1313

@@ -164,7 +164,7 @@ class ConcurrentFuturesWorker(Worker):
164164
def __init__(self, n_procs=None):
165165
"""Initialize Worker."""
166166
super(ConcurrentFuturesWorker, self).__init__()
167-
self.n_procs = n_procs
167+
self.n_procs = get_available_cpus() if n_procs is None else n_procs
168168
# added cpu_count to verify, remove once confident and let PPE handle
169169
self.pool = cf.ProcessPoolExecutor(self.n_procs)
170170
# self.loop = asyncio.get_event_loop()

0 commit comments

Comments
 (0)