Skip to content

Commit b573b38

Browse files
committed
Fix reduction order in pmapreduce
Reduction is performed using an OrderedBinaryTree. Also fix pval to use ranks instead of worker-id. These two changes lead to orders being preserved.
1 parent d1c8f18 commit b573b38

File tree

9 files changed

+1635
-1235
lines changed

9 files changed

+1635
-1235
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "ParallelUtilities"
22
uuid = "fad6cfc8-4f83-11e9-06cc-151124046ad0"
33
authors = ["Jishnu Bhattacharya <[email protected]>"]
4-
version = "0.6.1"
4+
version = "0.7.0"
55

66
[deps]
77
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"

README.md

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ julia> using ParallelUtilities
2727
* `evenlyscatterproduct`
2828
* `nworkersactive`
2929
* `workersactive`
30-
* `workerrank`
3130
* `whichproc`
3231
* `procrange_recast`
3332
* `localindex`
@@ -106,7 +105,7 @@ The first six processors receive 4 tuples of parameters each and the final four
106105

107106
The package provides versions of `pmap` with an optional reduction. These differ from the one provided by `Distributed` in a few key aspects: firstly, the iterator product of the argument is what is passed to the function and not the arguments by elementwise, so the i-th task will be `Iterators.product(args...)[i]` and not `[x[i] for x in args]`. Specifically the second set of parameters in the example above will be `(2,2,3)` and not `(2,3,4)`.
108107

109-
Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Thirdly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last worker that has any tasks assigned to it. The tasks are also approximately evenly distributed across processors, assuming that the function takes an equal amount of time to run for each set of parameters. The function `pmapbatch_elementwise` is also exported that passes the elements to the function one-by-one as unwrapped tuples. This produces the same result as `pmap` where each worker is assigned batches of approximately equal sizes taken from the iterator product.
108+
Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Thirdly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last active worker. The tasks are also approximately evenly distributed across processors. The function `pmapbatch_elementwise` is also exported that passes the elements to the function one-by-one as unwrapped tuples. This produces the same result as `pmap` where each worker is assigned batches of approximately equal sizes taken from the iterator product.
110109

111110
### pmapbatch and pmapbatch_elementwise
112111

@@ -147,7 +146,7 @@ julia> Tuple(p)
147146

148147
### pmapsum and pmapreduce
149148

150-
Often a parallel execution is followed by a reduction (eg. a sum over the results). A reduction may be commutative (in which case the order of results do not matter), or non-commutative (in which the order does matter). There are two functions that are exported that carry out these tasks: `pmapreduce_commutative` and `pmapreduce`, where the former does not preserve ordering and the latter does. The former might be slightly faster as it does not have to sort the results to preserve ordering. For convenience, the package also provides the function `pmapsum` that chooses `sum` as the reduction operator. The map-reduce operation is similar in many ways to the distributed `for` loop provided by julia, but the main difference is that the reduction operation is not binary for the functions in this package (eg. we need `sum` and not `(+)`to add the results). There is also the difference as above that the function gets the parameters in batches, with functions having the suffix `_elementwise` taking on parameters individually as unwrapped tuples as above. The function `pmapreduce` does not take on parameters elementwise at this point, although this might be implemented in the future.
149+
Often a parallel execution is followed by a reduction (eg. a sum over the results). A reduction may be commutative (in which case the order of results do not matter), or non-commutative (in which the order does matter). There are two functions that are exported that carry out these tasks: `pmapreduce_commutative` and `pmapreduce`, where the former does not preserve ordering and the latter does. For convenience, the package also provides the function `pmapsum` that chooses `sum` as the reduction operator. The map-reduce operation is similar in many ways to the distributed `for` loop provided by julia, but the main difference is that the reduction operation is not binary for the functions in this package (eg. we need `sum` and not `(+)`to add the results). There is also the difference as above that the function gets the parameters in batches, with functions having the suffix `_elementwise` taking on parameters individually as unwrapped tuples as above. The function `pmapreduce` does not take on parameters elementwise at this point, although this might be implemented in the future.
151150

152151
As an example, to sum up a list of numbers in parallel we may call
153152
```julia
@@ -185,7 +184,9 @@ julia> pmapreduce(x->ones(2).*myid(),x->hcat(x...),1:nworkers())
185184
2.0 3.0
186185
```
187186

188-
The functions `pmapreduce` produces the same result as `pmapreduce_commutative` if the reduction operator is commutative (ie. the order of results received from the children workers does not matter). The function `pmapreduce_commutative` might be faster as it does not sort the results received from the workers before reduction. This is what is used by the function `pmapsum` that chooses the reduction operator to be a sum.
187+
The functions `pmapreduce` produces the same result as `pmapreduce_commutative` if the reduction operator is commutative (ie. the order of results received from the children workers does not matter).
188+
189+
The function `pmapsum` sets the reduction operator to be a sum.
189190

190191
```julia
191192
julia> sum(workers())
@@ -198,14 +199,16 @@ julia> pmapsum(x->ones(2).*myid(),1:nworkers())
198199
5.0
199200
```
200201

201-
It is possible to specify the return types of the map and reduce operations in these functions. If they are not specified they are inferred using `Base.return_types`. To specify the return types use the following variants:
202+
It is possible to specify the return types of the map and reduce operations in these functions. To specify the return types use the following variants:
202203

203204
```julia
205+
# Signature is pmapreduce(fmap,Tmap,freduce,Treduce,iterators)
204206
julia> pmapreduce(x->ones(2).*myid(),Vector{Float64},x->hcat(x...),Matrix{Float64},1:nworkers())
205207
2×2 Array{Float64,2}:
206208
2.0 3.0
207209
2.0 3.0
208210

211+
# Signature is pmapsum(fmap,Tmap,iterators)
209212
julia> pmapsum(x->ones(2).*myid(),Vector{Float64},1:nworkers())
210213
2-element Array{Float64,1}:
211214
5.0
@@ -228,16 +231,34 @@ ERROR: On worker 2:
228231
InexactError: Int64(0.7742577217010362)
229232
```
230233

231-
There might be instances where a type inference is not desirable, eg. if the functions return outputs having different types for different parameter values. In such a case type inference may be turned off by specifying the keyword argument `infer_types = false`, eg as
234+
### Progress bar
235+
236+
The progress of the map-reduce operation might be tracked by setting the keyword argument `showprogress` to true. This might be useful in case certain workers have a heavier load than others.
232237

233238
```julia
234-
julia> pmapsum(x->ones(2).*myid(),1:nworkers(),infer_types = false)
235-
2-element Array{Float64,1}:
236-
5.0
237-
5.0
239+
# Running on 8 workers, artificially induce load using sleep
240+
julia> pmapreduce(x->(sleep(myid());myid()),x->hcat(x...),1:nworkers(),showprogress=true)
241+
Progress in pmapreduce : 100%|██████████████████████████████████████████████████| Time: 0:00:09
242+
map: 8
243+
reduce: 8
244+
1×8 Array{Int64,2}:
245+
2 3 4 5 6 7 8 9
246+
247+
julia> pmapreduce(x->(sleep(myid());myid()),x->hcat(x...),1:nworkers(),showprogress=true,progressdesc="Progress : ")
248+
Progress : 100%|████████████████████████████████████████████████████████████████| Time: 0:00:09
249+
map: 8
250+
reduce: 8
251+
1×8 Array{Int64,2}:
252+
2 3 4 5 6 7 8 9
238253
```
239254

240-
Note that the keyword argument `infer_types` can not be used if the return types are specified while calling the function.
255+
Note that this does not track the progress of the individual maps, it merely tracks how many are completed.
256+
257+
### Why two mapreduce functions?
258+
259+
The two separate functions `pmapreduce` and `pmapreduce_commutative` exist for historical reasons. They use different binary tree structures for reduction. The commutative one might be removed in the future in favour of `pmapreduce`.
260+
261+
241262

242263
## ProductSplit
243264

0 commit comments

Comments
 (0)