Skip to content

Commit c25e273

Browse files
authored
add _CloudPickleProcessPoolExecutor as default pool in FuturesExecutor
1 parent 66496b9 commit c25e273

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

src/coffea/processor/executor.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import concurrent.futures
22
import json
33
import math
4+
import multiprocessing
45
import os
56
import pickle
67
import time
@@ -500,6 +501,13 @@ def __call__(
500501
0,
501502
)
502503

504+
# this class changes the default pickler of ProcessPoolExecutor to the default cloudpickle.Pickler
505+
class _CloudPickleProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
506+
def __init__(self, max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None):
507+
if mp_context is None:
508+
mp_context = multiprocessing.get_context()
509+
mp_context.reducer = cloudpickle.Pickler()
510+
super().__init__(max_workers=max_workers, mp_context=mp_context, initializer=initializer, initargs=initargs, max_tasks_per_child=max_tasks_per_child)
503511

504512
@dataclass
505513
class FuturesExecutor(ExecutorBase):
@@ -514,7 +522,7 @@ class FuturesExecutor(ExecutorBase):
514522
accumulator : Accumulatable
515523
An accumulator to collect the output of the function
516524
pool : concurrent.futures.Executor class or instance, optional
517-
The type of futures executor to use, defaults to ProcessPoolExecutor.
525+
The type of futures executor to use, defaults to _CloudPickleProcessPoolExecutor.
518526
You can pass an instance instead of a class to reuse an executor
519527
workers : int, optional
520528
Number of parallel processes for futures (default 1)
@@ -553,7 +561,7 @@ class FuturesExecutor(ExecutorBase):
553561

554562
pool: Union[
555563
Callable[..., concurrent.futures.Executor], concurrent.futures.Executor
556-
] = concurrent.futures.ProcessPoolExecutor # fmt: skip
564+
] = _CloudPickleProcessPoolExecutor # fmt: skip
557565
mergepool: Optional[
558566
Union[
559567
Callable[..., concurrent.futures.Executor],

0 commit comments

Comments
 (0)