Skip to content

Commit 0434ddd

Browse files
authored
Merge pull request #434 from medecau/multiprocessing-fold
refactor parallel.fold
2 parents 42494fd + ac68944 commit 0434ddd

File tree

3 files changed

+21
-2
lines changed

3 files changed

+21
-2
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ dist/
44
*.egg-info/
55
bench/shakespeare.txt
66
.coverage
7+
8+
\.tox/

toolz/sandbox/parallel.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
1+
import functools
12
from toolz.itertoolz import partition_all
23
from toolz.compatibility import reduce, map
34
from toolz.utils import no_default
45

56

7+
def _reduce(func, seq, initial=None):
8+
if initial is None:
9+
return functools.reduce(func, seq)
10+
else:
11+
return functools.reduce(func, seq, initial)
12+
13+
614
def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None):
715
"""
816
Reduce without guarantee of ordered reduction.
@@ -43,16 +51,22 @@ def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None):
4351
>>> fold(add, [1, 2, 3, 4], chunksize=2, map=map)
4452
10
4553
"""
54+
assert chunksize > 1
55+
4656
if combine is None:
4757
combine = binop
4858

4959
chunks = partition_all(chunksize, seq)
5060

5161
# Evaluate sequence in chunks via map
5262
if default == no_default:
53-
results = map(lambda chunk: reduce(binop, chunk), chunks)
63+
results = map(
64+
functools.partial(_reduce, binop),
65+
chunks)
5466
else:
55-
results = map(lambda chunk: reduce(binop, chunk, default), chunks)
67+
results = map(
68+
functools.partial(_reduce, binop, initial=default),
69+
chunks)
5670

5771
results = list(results) # TODO: Support complete laziness
5872

toolz/sandbox/tests/test_parallel.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22
from toolz import reduce
33
from operator import add
44
from pickle import dumps, loads
5+
from multiprocessing import Pool
6+
57

68
# is comparison will fail between this and no_default
79
no_default2 = loads(dumps('__no__default__'))
810

911

1012
def test_fold():
1113
assert fold(add, range(10), 0) == reduce(add, range(10), 0)
14+
assert fold(add, range(10), 0, map=Pool().map) == reduce(add, range(10), 0)
1215
assert fold(add, range(10), 0, chunksize=2) == reduce(add, range(10), 0)
1316
assert fold(add, range(10)) == fold(add, range(10), 0)
1417

0 commit comments

Comments
 (0)