Skip to content

Commit fcb5165

Browse files
authored
Merge pull request #270 from appliedAI-Initiative/fix/249-negative-njobs
Fix #249: allow negative n_jobs
2 parents 7390dd0 + f4c2a46 commit fcb5165

File tree

6 files changed

+44
-13
lines changed

6 files changed

+44
-13
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## Unreleased
44

5+
- Fixed a bug whereby `compute_shapley_values` would only spawn one process when
6+
using `n_jobs=-1` and Monte Carlo methods.
7+
[PR #270](https://github.com/appliedAI-Initiative/pyDVL/pull/270)
58
- Bugfix in `RayParallelBackend`: wrong semantics for `kwargs`.
69
[PR #268](https://github.com/appliedAI-Initiative/pyDVL/pull/268)
710
- Splitting of problem preparation and solution in Least-Core computation.

src/pydvl/utils/parallel/backend.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020

2121
from ..config import ParallelConfig
2222

23-
__all__ = [
24-
"init_parallel_backend",
25-
]
23+
__all__ = ["init_parallel_backend", "effective_n_jobs", "available_cpus"]
2624

2725
T = TypeVar("T")
2826

@@ -234,3 +232,25 @@ def available_cpus() -> int:
234232
if system() != "Linux":
235233
return os.cpu_count() or 1
236234
return len(os.sched_getaffinity(0))
235+
236+
237+
def effective_n_jobs(n_jobs: int, config: ParallelConfig = ParallelConfig()) -> int:
238+
"""Returns the effective number of jobs.
239+
240+
This number may vary depending on the parallel backend and the resources
241+
available.
242+
243+
:param n_jobs: the number of jobs requested. If -1, the number of available
244+
CPUs is returned.
245+
:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with
246+
cluster address, number of cpus, etc.
247+
:return: the effective number of jobs, guaranteed to be >= 1.
248+
:raises RuntimeError: if the effective number of jobs returned by the backend
249+
is < 1.
250+
"""
251+
parallel_backend = init_parallel_backend(config)
252+
if (eff_n_jobs := parallel_backend.effective_n_jobs(n_jobs)) < 1:
253+
raise RuntimeError(
254+
f"Invalid number of jobs {eff_n_jobs} obtained from parallel backend {config.backend}"
255+
)
256+
return eff_n_jobs

src/pydvl/value/least_core/montecarlo.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
22
import warnings
3-
from typing import Iterable, Optional, Tuple
3+
from typing import Iterable, Optional
44

55
import numpy as np
6-
from numpy.typing import NDArray
76

87
from pydvl.utils.config import ParallelConfig
98
from pydvl.utils.numeric import random_powerset
109
from pydvl.utils.parallel import MapReduceJob
10+
from pydvl.utils.parallel.backend import effective_n_jobs
1111
from pydvl.utils.progress import maybe_progress
1212
from pydvl.utils.utility import Utility
1313
from pydvl.value.least_core.common import LeastCoreProblem, lc_solve_problem
@@ -145,16 +145,13 @@ def mclc_prepare_problem(
145145
)
146146
n_iterations = 2**n
147147

148-
iterations_per_job = max(1, n_iterations // n_jobs)
148+
iterations_per_job = max(1, n_iterations // effective_n_jobs(n_jobs, config))
149149

150150
map_reduce_job: MapReduceJob["Utility", "LeastCoreProblem"] = MapReduceJob(
151151
inputs=u,
152152
map_func=_montecarlo_least_core,
153153
reduce_func=_reduce_func,
154-
map_kwargs=dict(
155-
n_iterations=iterations_per_job,
156-
progress=progress,
157-
),
154+
map_kwargs=dict(n_iterations=iterations_per_job, progress=progress),
158155
n_jobs=n_jobs,
159156
config=config,
160157
)

src/pydvl/value/shapley/gt.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
from pydvl.utils import MapReduceJob, ParallelConfig, Utility, maybe_progress
2727
from pydvl.utils.numeric import random_subset_of_size
28+
from pydvl.utils.parallel.backend import effective_n_jobs
2829
from pydvl.utils.status import Status
2930
from pydvl.value import ValuationResult
3031

@@ -237,7 +238,7 @@ def group_testing_shapley(
237238
f"ε={eps:.02f} guarantee at .95 probability"
238239
)
239240

240-
iterations_per_job = max(1, n_iterations // n_jobs)
241+
iterations_per_job = max(1, n_iterations // effective_n_jobs(n_jobs, config))
241242

242243
def reducer(
243244
results_it: Iterable[Tuple[NDArray, NDArray]]

src/pydvl/value/shapley/montecarlo.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from ...utils.config import ParallelConfig
4545
from ...utils.numeric import random_powerset, running_moments
4646
from ...utils.parallel import MapReduceJob, init_parallel_backend
47+
from ...utils.parallel.backend import effective_n_jobs
4748
from ...utils.progress import maybe_progress
4849
from ...utils.status import Status
4950
from ...utils.utility import Utility
@@ -224,7 +225,7 @@ def permutation_montecarlo_shapley(
224225
:param progress: Whether to display progress bars for each job.
225226
:return: Object with the data values.
226227
"""
227-
iterations_per_job = max(1, n_iterations // n_jobs)
228+
iterations_per_job = max(1, n_iterations // effective_n_jobs(n_jobs, config))
228229

229230
map_reduce_job: MapReduceJob[Utility, "NDArray"] = MapReduceJob(
230231
u,

tests/utils/test_parallel.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import pytest
88

99
from pydvl.utils.parallel import MapReduceJob, init_parallel_backend
10-
from pydvl.utils.parallel.backend import available_cpus
10+
from pydvl.utils.parallel.backend import available_cpus, effective_n_jobs
1111
from pydvl.utils.parallel.map_reduce import _get_value
1212

1313

@@ -25,6 +25,15 @@ def test_effective_n_jobs(parallel_config, num_workers):
2525
else:
2626
assert parallel_backend.effective_n_jobs(-1) == num_workers
2727

28+
for n_jobs in [-1, 1, 2]:
29+
assert parallel_backend.effective_n_jobs(n_jobs) == effective_n_jobs(
30+
n_jobs, parallel_config
31+
)
32+
assert effective_n_jobs(n_jobs, parallel_config) > 0
33+
34+
with pytest.raises(ValueError):
35+
parallel_backend.effective_n_jobs(0)
36+
2837

2938
@pytest.fixture()
3039
def map_reduce_job_and_parameters(parallel_config, n_jobs, request):

0 commit comments

Comments
 (0)