|
18 | 18 | from ..utils.functional import maybe_add_argument |
19 | 19 | from ..utils.types import MapFunction, ReduceFunction, Seed, ensure_seed_sequence |
20 | 20 | from .backend import ParallelBackend, _maybe_init_parallel_backend |
21 | | -from .backends import JoblibParallelBackend, RayParallelBackend |
| 21 | +from .backends import JoblibParallelBackend |
22 | 22 | from .config import ParallelConfig |
23 | 23 |
|
24 | 24 | __all__ = ["MapReduceJob"] |
@@ -108,6 +108,15 @@ def __init__( |
108 | 108 | timeout: Optional[float] = None, |
109 | 109 | ): |
110 | 110 | parallel_backend = _maybe_init_parallel_backend(parallel_backend, config) |
| 111 | + |
| 112 | + if not isinstance(parallel_backend, JoblibParallelBackend): |
| 113 | + raise ValueError( |
| 114 | + f"Unexpected parallel backend {parallel_backend.__class__.__name__}. " |
| 115 | + "MapReduceJob only supports the use of JoblibParallelBackend " |
| 116 | + "with passing the specific" |
| 117 | + "joblib backend name using `joblib.parallel_config`. " |
| 118 | + ) |
| 119 | + |
111 | 120 | self.parallel_backend = parallel_backend |
112 | 121 |
|
113 | 122 | self.timeout = timeout |
@@ -140,16 +149,7 @@ def __call__( |
140 | 149 | """ |
141 | 150 | seed_seq = ensure_seed_sequence(seed) |
142 | 151 |
|
143 | | - if isinstance(self.parallel_backend, JoblibParallelBackend): |
144 | | - backend = "loky" |
145 | | - elif isinstance(self.parallel_backend, RayParallelBackend): |
146 | | - backend = "ray" |
147 | | - else: |
148 | | - raise ValueError( |
149 | | - f"Unexpected parallel backend {self.parallel_backend.__class__.__name__}" |
150 | | - ) |
151 | | - |
152 | | - with Parallel(backend=backend, prefer="processes") as parallel: |
| 152 | + with Parallel(prefer="processes") as parallel: |
153 | 153 | chunks = self._chunkify(self.inputs_, n_chunks=self.n_jobs) |
154 | 154 | map_results: List[R] = parallel( |
155 | 155 | delayed(self._map_func)( |
|
0 commit comments