|
1 | 1 | # ParallelUtilities.jl
|
2 |
| -Parallel mapreduce and other helpful functions for HPC |
| 2 | +Parallel mapreduce and other helpful functions for HPC, meant primarily for embarassingly parallel operations that often require one to split up a list of tasks into subsections that can be processed on individual cores. |
| 3 | + |
| 4 | +# Installation |
| 5 | + |
| 6 | +Install the package using |
| 7 | + |
| 8 | +```julia |
| 9 | +pkg> add https://github.com/jishnub/ParallelUtilities.jl.git |
| 10 | +julia> using ParallelUtilities |
| 11 | +``` |
| 12 | + |
| 13 | +# Usage |
| 14 | + |
| 15 | +The package splits up a collection of ranges into subparts of roughly equal length, so that all the cores are approximately equally loaded. This is best understood using an example: let's say that we have a function `f` that is defined as |
| 16 | + |
| 17 | +```julia |
| 18 | +julia> @everywhere begin |
| 19 | + f(x,y,z) = x+y+z |
| 20 | + end |
| 21 | +``` |
| 22 | + |
| 23 | +where each parameter takes up values in a range, and we would like to sample the entire parameter space. As an example, we choose the ranges to be |
| 24 | + |
| 25 | +```julia |
| 26 | +julia> xrange,yrange,zrange = 1:3,2:4,3:6 # ranges should be strictly increasing |
| 27 | +``` |
| 28 | + |
| 29 | +There are a total of 36 possible `(x,y,z)` combinations possible given these ranges. Let's say that we would like to split the evaluation of the function over 10 processors. We describe the simple way to evaluate this and then explain how this is achieved. |
| 30 | + |
| 31 | +## pmap-related functions |
| 32 | + |
| 33 | +The package provides versions of `pmap` with an optional reduction. These differ from the one provided by `Distributed` in a few key aspects: firstly, the iterator product of the argument is what is passed to the function and not the arguments by elementwise, so the i-th task will be `Iterator.product(args...)[i]` and not `[x[i] for x in args]`. Specifically the second set of parameters in the example above will be `(2,2,3)` and not `(2,3,4)`. |
| 34 | + |
| 35 | +Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Secondly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last processor. The tasks are also approximately evenly distributed across processors, assuming that the function takes an equal amount of time to run for each set of parameters. The function `pmapbatch_elementwise` is also exported that passes the elements to the function one-by-one as unwrapped tuples. |
| 36 | + |
| 37 | +### pmapbatch and pmapbatch_elementwise |
| 38 | + |
| 39 | +As an example we demonstrate how to evaluate the function `f` for the ranges of parameters listed above: |
| 40 | + |
| 41 | +```julia |
| 42 | +julia> p = pmapbatch_elementwise(f,(xrange,yrange,zrange)); |
| 43 | + |
| 44 | +julia> Tuple(p) |
| 45 | +(6, 7, 8, 7, 8, 9, 8, 9, 10, 7, 8, 9, 8, 9, 10, 9, 10, 11, 8, 9, 10, 9, 10, 11, 10, 11, 12, 9, 10, 11, 10, 11, 12, 11, 12, 13) |
| 46 | + |
| 47 | +# Check for correctness |
| 48 | +julia> p == map(f,vec(collect(Iterators.product(xrange,yrange,zrange)))) |
| 49 | +true |
| 50 | +``` |
| 51 | + |
| 52 | +There is also a function `pmapbatch` that deals with batches of parameters that are passed to each processor, and `pmap_elementwise` calls this function under the hood to process the parameters one by one. We may use this directly as well if we need the entire batch for some reason (eg. reading values off a disk, which needs to be done once for the entire set and not for every parameter). As an example we demonstrate how to obtain the same result as above using `pmapbatch`: |
| 53 | + |
| 54 | +```julia |
| 55 | +julia> p = pmapbatch(x->[f(i) for i in x],(xrange,yrange,zrange)); |
| 56 | + |
| 57 | +julia> Tuple(p) |
| 58 | +(6, 7, 8, 7, 8, 9, 8, 9, 10, 7, 8, 9, 8, 9, 10, 9, 10, 11, 8, 9, 10, 9, 10, 11, 10, 11, 12, 9, 10, 11, 10, 11, 12, 11, 12, 13) |
| 59 | +``` |
| 60 | + |
| 61 | +### pmapsum and pmapreduce |
| 62 | + |
| 63 | +Often a parallel execution is followed by a reduction (eg. a sum over the results). A reduction may be commutative (in which case the order of results do not matter), or non-commutative (in which the order does matter). There are two functions that are exported that carry out these tasks: `pmapreduce_commutative` and `pmapreduce`, where the former does not preserve ordering and the latter does. The former might be slightly faster as it does not have to sort the results to preserve ordering. For convenience, the package also provides the function `pmapsum` that chooses `sum` as the reduction operator. The map-reduce operation is similar in many ways to the distributed `for` loop provided by julia, but the main difference is that the reduction operation is not binary for the functions in this package (eg. we need `sum` and not `(+)`to add the results). There is also the difference as above that the function gets the parameters in batches, with functions having the suffix `_elementwise` taking on parameters individually as unwrapped tuples as above. The function `pmapreduce` does not take on parameters elementwise at this point, although this might be implemented in the future. |
| 64 | + |
| 65 | +As an example, to sum up a list of numbers in parallel we may call |
| 66 | +```julia |
| 67 | +julia> pmapsum_elementwise(identity,1:1000) |
| 68 | +500500 |
| 69 | +``` |
| 70 | + |
| 71 | +Here the mapped function is taken to by `identity` which just returns its argument. To sum the squares of the numbers in a list we may use |
| 72 | + |
| 73 | +```julia |
| 74 | +julia> pmapsum_elementwise(x->x^2,1:1000) |
| 75 | +333833500 |
| 76 | +``` |
| 77 | + |
| 78 | +We may choose an arbitrary reduction operator in the function `pmapreduce` and `pmapreduce_commutative`, and the elementwise function `pmapreduce_commutative_elementwise`. The reductions are performed locally on each node before the results are subsequently collected and reduced on the calling host. |
| 79 | + |
| 80 | +```julia |
| 81 | +# Compute 1^2 * 2^2 * 3^2 in parallel |
| 82 | +julia> pmapreduce_commutative_elementwise(x->x^2,prod,1:3) |
| 83 | +36 |
| 84 | +``` |
| 85 | + |
| 86 | +The function `pmapreduce` sorts the results obtained from each processor, so it is useful for concatenations. |
| 87 | + |
| 88 | +```julia |
| 89 | +# We perform the job on 2 workers |
| 90 | +# The signature is pmapreduce(fmap,freduce,iterable) |
| 91 | +julia> pmapreduce(x->ones(2).*myid(),x->hcat(x...),1:nworkers()) |
| 92 | +2×2 Array{Float64,2}: |
| 93 | + 2.0 3.0 |
| 94 | + 2.0 3.0 |
| 95 | +``` |
| 96 | + |
| 97 | +## ProductSplit |
| 98 | + |
| 99 | +The package provides an iterator `ProductSplit` that lists that ranges of parameters that would be passed on to each core. This may be achieved using an |
| 100 | + |
| 101 | +```Iterator.Take{Iterator.Drop{Iterator.ProductIterator}}``` |
| 102 | + |
| 103 | +with appropriately chosen parameters, and in many ways a `ProductSplit` behaves similarly. However this iterator supports several extra features such as `O(1)` indexing, which eliminates the need to actually iterate over it in many scenarios. |
| 104 | + |
| 105 | +The signature of the constructor is |
| 106 | + |
| 107 | +```julia |
| 108 | +ProductSplit(tuple_of_ranges,number_of_processors,processor_rank) |
| 109 | +``` |
| 110 | + |
| 111 | +where `processor_rank` takes up values in `1:number_of_processors`. Note that this is different from MPI where the rank starts from 0. For example, we check the tasks that are passed on to the processor number 4: |
| 112 | + |
| 113 | +```julia |
| 114 | +julia> ps = ProductSplit((xrange,yrange,zrange),10,4) |
| 115 | +ProductSplit{Tuple{Int64,Int64,Int64},3,UnitRange{Int64}}((1:3, 2:4, 3:5), (0, 3, 9), 10, 4, 10, 12) |
| 116 | + |
| 117 | +julia> collect(ps) |
| 118 | +4-element Array{Tuple{Int64,Int64,Int64},1}: |
| 119 | + (1, 3, 4) |
| 120 | + (2, 3, 4) |
| 121 | + (3, 3, 4) |
| 122 | + (1, 4, 4) |
| 123 | +``` |
| 124 | + |
| 125 | +where the object loops over values of `(x,y,z)`, and the values are sorted in reverse lexicographic order (the last index increases the slowest while the first index increases the fastest). The ranges roll over as expected. The tasks are evenly distributed with the remainders being split among the first few processors. In this example the first six processors receive 4 tasks each and the last four receive 3 each. We can see this by evaluating the length of the `ProductSplit` operator on each processor |
| 126 | + |
| 127 | +```julia |
| 128 | +julia> Tuple(length(ProductSplit((xrange,yrange,zrange),10,i)) for i=1:10) |
| 129 | +(4, 4, 4, 4, 4, 4, 3, 3, 3, 3) |
| 130 | +``` |
| 131 | + |
| 132 | +The object can be generated through the function `evenlyscatterproduct` using the same signature |
| 133 | + |
| 134 | +```julia |
| 135 | +julia> evenlyscatterproduct((xrange,yrange,zrange),10,4) |
| 136 | +ProductSplit{Tuple{Int64,Int64,Int64},3,UnitRange{Int64}}((1:3, 2:4, 3:6), (0, 3, 9), 10, 4, 13, 16) |
| 137 | +``` |
| 138 | + |
| 139 | +### Indexing |
| 140 | + |
| 141 | +The iterator supports fast indexing |
| 142 | +```julia |
| 143 | +julia> ps[3] |
| 144 | +(3, 3, 4) |
| 145 | + |
| 146 | +julia> @btime $ps[3] |
| 147 | + 9.493 ns (0 allocations: 0 bytes) |
| 148 | +(3, 3, 4) |
| 149 | +``` |
| 150 | + |
| 151 | +This is useful if we have a large number of parameters to analyze on each processor. |
| 152 | + |
| 153 | +```julia |
| 154 | +julia> xrange_long,yrange_long,zrange_long = 1:3000,1:3000,1:3000 |
| 155 | +(1:3000, 1:3000, 1:3000) |
| 156 | + |
| 157 | +julia> params_long = (xrange_long,yrange_long,zrange_long); |
| 158 | + |
| 159 | +julia> ps_long = ProductSplit(params_long,10,4) |
| 160 | +ProductSplit{Tuple{Int64,Int64,Int64},3,UnitRange{Int64}}((1:3000, 1:3000, 1:3000), (0, 3000, 9000000), 10, 4, 8100000001, 10800000000) |
| 161 | + |
| 162 | +julia> length(ps_long) |
| 163 | +2700000000 |
| 164 | + |
| 165 | +julia> @btime length($ps_long) # this is fast |
| 166 | + 0.034 ns (0 allocations: 0 bytes) |
| 167 | +2700000000 |
| 168 | + |
| 169 | +julia> @btime $ps_long[1000000] # also fast, does not iterate |
| 170 | + 32.530 ns (0 allocations: 0 bytes) |
| 171 | +(1000, 334, 901) |
| 172 | + |
| 173 | +julia> @btime first($ps_long) |
| 174 | + 31.854 ns (0 allocations: 0 bytes) |
| 175 | +(1, 1, 901) |
| 176 | + |
| 177 | +julia> @btime last($ps_long) |
| 178 | + 31.603 ns (0 allocations: 0 bytes) |
| 179 | +(3000, 3000, 1200) |
| 180 | +``` |
| 181 | + |
| 182 | +We may also compute the index of a particular set of parameters in the iterator. This is somewhat slower and is evaluated in `O(log(n))` time using a binary search. Whether or not the value exists in the list may however be evaluated in `O(1)` time. |
| 183 | + |
| 184 | +```julia |
| 185 | +julia> val = (3,3,4) |
| 186 | +(3, 3, 4) |
| 187 | + |
| 188 | +julia> val in ps |
| 189 | +true |
| 190 | + |
| 191 | +julia> localindex(ps,val) |
| 192 | +3 |
| 193 | + |
| 194 | +julia> val=(10,2,901); |
| 195 | + |
| 196 | +julia> @btime $val in $ps_long |
| 197 | + 67.824 ns (0 allocations: 0 bytes) |
| 198 | +true |
| 199 | + |
| 200 | +julia> @btime localindex($ps_long,$val) |
| 201 | + 1.036 μs (0 allocations: 0 bytes) |
| 202 | +3010 |
| 203 | +``` |
| 204 | + |
| 205 | +Another useful function is `whichproc` that returns the rank of the processor a specific set of parameters will be on, given the total number of processors. This is also computed using a binary search. |
| 206 | + |
| 207 | +```julia |
| 208 | +julia> whichproc(params_long,val,10) |
| 209 | +4 |
| 210 | + |
| 211 | +julia> @btime whichproc($params_long,$val,10) |
| 212 | + 1.264 μs (14 allocations: 448 bytes) |
| 213 | +4 |
| 214 | +``` |
| 215 | + |
| 216 | +### Extrema |
| 217 | + |
| 218 | +We can compute the ranges of each variable on any processor in `O(1)` time. |
| 219 | + |
| 220 | +```julia |
| 221 | +julia> extrema(ps,2) # extrema of the second parameter on this processor |
| 222 | +(3, 4) |
| 223 | + |
| 224 | +julia> Tuple(extrema(ps,i) for i in 1:3) |
| 225 | +((1, 3), (3, 4), (4, 4)) |
| 226 | + |
| 227 | +# Minimum and maximum work similarly |
| 228 | + |
| 229 | +julia> (minimum(ps,2),maximum(ps,2)) |
| 230 | +(3, 4) |
| 231 | + |
| 232 | +julia> @btime extrema($ps_long,2) |
| 233 | + 52.813 ns (0 allocations: 0 bytes) |
| 234 | +(1, 3000) |
| 235 | +``` |
0 commit comments