Skip to content

Commit 7e1e216

Browse files
authored
Merge branch 'develop' into refactor/tmcs
2 parents 1114805 + c56450a commit 7e1e216

File tree

10 files changed

+120
-28
lines changed

10 files changed

+120
-28
lines changed

CHANGELOG.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,15 @@
55
- Generalised stopping criteria for valuation algorithms. Improved classes
66
`ValuationResult` and `Status` with more operations. Some minor issues fixed.
77
[PR #252](https://github.com/appliedAI-Initiative/pyDVL/pull/250)
8-
- Splitting of problem preparation and solution in Least Core computation.
8+
- Fixed a bug whereby `compute_shapley_values` would only spawn one process when
9+
using `n_jobs=-1` and Monte Carlo methods.
10+
[PR #270](https://github.com/appliedAI-Initiative/pyDVL/pull/270)
11+
- Bugfix in `RayParallelBackend`: wrong semantics for `kwargs`.
12+
[PR #268](https://github.com/appliedAI-Initiative/pyDVL/pull/268)
13+
- Splitting of problem preparation and solution in Least-Core computation.
914
Umbrella function for LC methods.
10-
[PR #257](https://github.com/appliedAI-Initiative/pyDVL/pull/257)
11-
- Operations on ValuationResults and Statuses and cleanup
15+
[PR #257](https://github.com/appliedAI-Initiative/pyDVL/pull/257)
16+
- Operations on `ValuationResult` and `Status` and some cleanup
1217
[PR #248](https://github.com/appliedAI-Initiative/pyDVL/pull/248)
1318
- **Bug fix and minor improvements**: Fixes bug in TMCS with remote Ray cluster,
1419
raises an error for dummy sequential parallel backend with TMCS, clones model
@@ -28,8 +33,8 @@
2833
- **Breaking change:** Introduces a class ValuationResult to gather and inspect
2934
results from all valuation algorithms
3035
[PR #214](https://github.com/appliedAI-Initiative/pyDVL/pull/214)
31-
- Fixes bug in Influence calculation with multi-dimensional input and adds
32-
new example notebook
36+
- Fixes bug in Influence calculation with multidimensional input and adds new
37+
example notebook
3338
[PR #195](https://github.com/appliedAI-Initiative/pyDVL/pull/195)
3439
- **Breaking change**: Passes the input to `MapReduceJob` at initialization,
3540
removes `chunkify_inputs` argument from `MapReduceJob`, removes `n_runs`

src/pydvl/utils/dataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ def load_spotify_dataset(
524524
if file_path.exists():
525525
data = pd.read_csv(file_path)
526526
else:
527-
url = "https://github.com/appliedAI-Initiative/pyDVL/blob/develop/data/top_hits_spotify_dataset.csv"
527+
url = "https://raw.githubusercontent.com/appliedAI-Initiative/pyDVL/develop/data/top_hits_spotify_dataset.csv"
528528
data = pd.read_csv(url)
529529
data.to_csv(file_path, index=False)
530530

src/pydvl/utils/parallel/actor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class RayActorWrapper:
4141
def __init__(self, actor_class: Type, config: ParallelConfig, *args, **kwargs):
4242
parallel_backend = cast(RayParallelBackend, init_parallel_backend(config))
4343
remote_cls = parallel_backend.wrap(actor_class)
44-
self.actor_handle = remote_cls.remote(*args, **kwargs)
44+
self.actor_handle = remote_cls(*args, **kwargs)
4545

4646
def remote_caller(method_name: str):
4747
# Wrapper for remote class' methods to mimic local calls

src/pydvl/utils/parallel/backend.py

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
1-
import functools
21
import os
32
from abc import ABCMeta, abstractmethod
43
from dataclasses import asdict
5-
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union
4+
from typing import (
5+
Any,
6+
Callable,
7+
Dict,
8+
Iterable,
9+
List,
10+
Optional,
11+
Tuple,
12+
Type,
13+
TypeVar,
14+
Union,
15+
)
616

717
import ray
818
from ray import ObjectRef
919
from ray.remote_function import RemoteFunction
1020

1121
from ..config import ParallelConfig
1222

13-
__all__ = [
14-
"init_parallel_backend",
15-
]
23+
__all__ = ["init_parallel_backend", "effective_n_jobs", "available_cpus"]
1624

1725
T = TypeVar("T")
1826

@@ -63,7 +71,7 @@ def put(self, v: Any, *args, **kwargs) -> Any:
6371
...
6472

6573
@abstractmethod
66-
def wrap(self, *args, **kwargs) -> Any:
74+
def wrap(self, fun: Callable, **kwargs) -> Callable:
6775
...
6876

6977
@abstractmethod
@@ -104,9 +112,11 @@ def get(self, v: Any, *args, **kwargs):
104112
def put(self, v: Any, *args, **kwargs) -> Any:
105113
return v
106114

107-
def wrap(self, *args, **kwargs) -> Any:
108-
assert len(args) == 1
109-
return functools.partial(args[0], **kwargs)
115+
def wrap(self, fun: Callable, **kwargs) -> Callable:
116+
"""Wraps a function for sequential execution.
117+
118+
This is a noop and kwargs are ignored."""
119+
return fun
110120

111121
def wait(self, v: Any, *args, **kwargs) -> Tuple[list, list]:
112122
return v, []
@@ -151,8 +161,17 @@ def put(self, v: T, *args, **kwargs) -> Union["ObjectRef[T]", T]:
151161
except TypeError:
152162
return v # type: ignore
153163

154-
def wrap(self, *args, **kwargs) -> RemoteFunction:
155-
return ray.remote(*args, **kwargs) # type: ignore
164+
def wrap(self, fun: Callable, **kwargs) -> Callable:
165+
"""Wraps a function as a ray remote.
166+
167+
:param fun: the function to wrap
168+
:param kwargs: keyword arguments to pass to @ray.remote
169+
170+
:return: The `.remote` method of the ray `RemoteFunction`.
171+
"""
172+
if len(kwargs) > 1:
173+
return ray.remote(**kwargs)(fun).remote # type: ignore
174+
return ray.remote(fun).remote # type: ignore
156175

157176
def wait(
158177
self,
@@ -213,3 +232,25 @@ def available_cpus() -> int:
213232
if system() != "Linux":
214233
return os.cpu_count() or 1
215234
return len(os.sched_getaffinity(0))
235+
236+
237+
def effective_n_jobs(n_jobs: int, config: ParallelConfig = ParallelConfig()) -> int:
238+
"""Returns the effective number of jobs.
239+
240+
This number may vary depending on the parallel backend and the resources
241+
available.
242+
243+
:param n_jobs: the number of jobs requested. If -1, the number of available
244+
CPUs is returned.
245+
:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with
246+
cluster address, number of cpus, etc.
247+
:return: the effective number of jobs, guaranteed to be >= 1.
248+
:raises RuntimeError: if the effective number of jobs returned by the backend
249+
is < 1.
250+
"""
251+
parallel_backend = init_parallel_backend(config)
252+
if (eff_n_jobs := parallel_backend.effective_n_jobs(n_jobs)) < 1:
253+
raise RuntimeError(
254+
f"Invalid number of jobs {eff_n_jobs} obtained from parallel backend {config.backend}"
255+
)
256+
return eff_n_jobs

src/pydvl/utils/parallel/map_reduce.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,19 @@ def reduce(self, chunks: List["ObjectRef[R]"]) -> R:
219219
return result # type: ignore
220220

221221
def _wrap_function(self, func: Callable, **kwargs) -> Callable:
222-
remote_func = self.parallel_backend.wrap(
222+
"""Wraps a function with a timeout and remote arguments and puts it on
223+
the remote backend.
224+
225+
:param func: Function to wrap
226+
:param kwargs: Additional keyword arguments to pass to the backend
227+
wrapper. These are *not* arguments for the wrapped function.
228+
:return: Remote function that can be called with the same arguments as
229+
the wrapped function. Depending on the backend, this may simply be
230+
the function itself.
231+
"""
232+
return self.parallel_backend.wrap(
223233
_wrap_func_with_remote_args(func, timeout=self.timeout), **kwargs
224234
)
225-
return getattr(remote_func, "remote", remote_func) # type: ignore
226235

227236
def _backpressure(
228237
self, jobs: List[ObjectRef], n_dispatched: int, n_finished: int

src/pydvl/value/least_core/montecarlo.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from pydvl.utils.config import ParallelConfig
88
from pydvl.utils.numeric import random_powerset
99
from pydvl.utils.parallel import MapReduceJob
10+
from pydvl.utils.parallel.backend import effective_n_jobs
1011
from pydvl.utils.progress import maybe_progress
1112
from pydvl.utils.utility import Utility
1213
from pydvl.value.least_core.common import LeastCoreProblem, lc_solve_problem
@@ -136,7 +137,7 @@ def mclc_prepare_problem(
136137
)
137138
n_iterations = 2**n
138139

139-
iterations_per_job = max(1, n_iterations // n_jobs)
140+
iterations_per_job = max(1, n_iterations // effective_n_jobs(n_jobs, config))
140141

141142
map_reduce_job: MapReduceJob["Utility", "LeastCoreProblem"] = MapReduceJob(
142143
inputs=u,

src/pydvl/value/shapley/gt.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
from pydvl.utils import MapReduceJob, ParallelConfig, Utility, maybe_progress
2727
from pydvl.utils.numeric import random_subset_of_size
28+
from pydvl.utils.parallel.backend import effective_n_jobs
2829
from pydvl.utils.status import Status
2930
from pydvl.value import ValuationResult
3031

@@ -237,7 +238,7 @@ def group_testing_shapley(
237238
f"ε={eps:.02f} guarantee at .95 probability"
238239
)
239240

240-
iterations_per_job = max(1, n_iterations // n_jobs)
241+
iterations_per_job = max(1, n_iterations // effective_n_jobs(n_jobs, config))
241242

242243
def reducer(
243244
results_it: Iterable[Tuple[NDArray, NDArray]]

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ def seed_numpy(seed=42):
189189
np.random.seed(seed)
190190

191191

192-
@pytest.fixture
192+
@pytest.fixture(scope="session")
193193
def num_workers():
194194
return max(1, available_cpus() - 1)
195195

tests/utils/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77

88
@pytest.fixture(scope="module", params=["sequential", "ray-local", "ray-external"])
9-
def parallel_config(request):
9+
def parallel_config(request, num_workers):
1010
if request.param == "sequential":
1111
yield ParallelConfig(backend=request.param)
1212
elif request.param == "ray-local":
@@ -17,7 +17,7 @@ def parallel_config(request):
1717
cluster = Cluster(
1818
initialize_head=True,
1919
head_node_args={
20-
"num_cpus": 4,
20+
"num_cpus": num_workers,
2121
},
2222
)
2323
yield ParallelConfig(backend="ray", address=cluster.address)

tests/utils/test_parallel.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
import operator
2+
import os
3+
import time
24
from functools import partial, reduce
35

46
import numpy as np
57
import pytest
68

79
from pydvl.utils.parallel import MapReduceJob, init_parallel_backend
8-
from pydvl.utils.parallel.backend import available_cpus
10+
from pydvl.utils.parallel.backend import available_cpus, effective_n_jobs
911
from pydvl.utils.parallel.map_reduce import _get_value
1012

1113

12-
def test_effective_n_jobs(parallel_config):
14+
def test_effective_n_jobs(parallel_config, num_workers):
1315
parallel_backend = init_parallel_backend(parallel_config)
1416
if parallel_config.backend == "sequential":
1517
assert parallel_backend.effective_n_jobs(1) == 1
@@ -21,7 +23,16 @@ def test_effective_n_jobs(parallel_config):
2123
if parallel_config.address is None:
2224
assert parallel_backend.effective_n_jobs(-1) == available_cpus()
2325
else:
24-
assert parallel_backend.effective_n_jobs(-1) == 4
26+
assert parallel_backend.effective_n_jobs(-1) == num_workers
27+
28+
for n_jobs in [-1, 1, 2]:
29+
assert parallel_backend.effective_n_jobs(n_jobs) == effective_n_jobs(
30+
n_jobs, parallel_config
31+
)
32+
assert effective_n_jobs(n_jobs, parallel_config) > 0
33+
34+
with pytest.raises(ValueError):
35+
parallel_backend.effective_n_jobs(0)
2536

2637

2738
@pytest.fixture()
@@ -188,3 +199,27 @@ def test_map_reduce_get_value(x, expected_x, parallel_config):
188199
parallel_backend = init_parallel_backend(parallel_config)
189200
x_id = parallel_backend.put(x)
190201
assert np.all(_get_value(x_id) == expected_x)
202+
203+
204+
def test_wrap_function(parallel_config, num_workers):
205+
def fun(x, **kwargs):
206+
return dict(x=x * x, **kwargs)
207+
208+
parallel_backend = init_parallel_backend(parallel_config)
209+
# Try two kwargs for @ray.remote. Should be ignored in the sequential backend
210+
wrapped_func = parallel_backend.wrap(fun, num_cpus=1, max_calls=1)
211+
x = parallel_backend.put(2)
212+
ret = parallel_backend.get(wrapped_func(x))
213+
214+
assert ret["x"] == 4
215+
assert len(ret) == 1 # Ensure that kwargs are not passed to the function
216+
217+
if parallel_config.backend != "sequential":
218+
# Test that the function is executed in different processes
219+
def get_pid():
220+
time.sleep(2) # FIXME: waiting less means fewer processes are used?!
221+
return os.getpid()
222+
223+
wrapped_func = parallel_backend.wrap(get_pid, num_cpus=1)
224+
pids = parallel_backend.get([wrapped_func() for _ in range(num_workers)])
225+
assert len(set(pids)) == num_workers

0 commit comments

Comments
 (0)