Skip to content

Commit c639b6f

Browse files
committed
fixed bug in whichproc for empty iterators, updated readme
1 parent ca7ddc7 commit c639b6f

File tree

3 files changed

+77
-42
lines changed

3 files changed

+77
-42
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ There are a total of 36 possible `(x,y,z)` combinations possible given these ran
3535

3636
The package provides versions of `pmap` with an optional reduction. These differ from the one provided by `Distributed` in a few key aspects: firstly, the iterator product of the argument is what is passed to the function and not the arguments by elementwise, so the i-th task will be `Iterator.product(args...)[i]` and not `[x[i] for x in args]`. Specifically the second set of parameters in the example above will be `(2,2,3)` and not `(2,3,4)`.
3737

38-
Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Secondly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last processor. The tasks are also approximately evenly distributed across processors, assuming that the function takes an equal amount of time to run for each set of parameters. The function `pmapbatch_elementwise` is also exported that passes the elements to the function one-by-one as unwrapped tuples.
38+
Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Secondly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last worker that has any tasks assigned to it. The tasks are also approximately evenly distributed across processors, assuming that the function takes an equal amount of time to run for each set of parameters. The function `pmapbatch_elementwise` is also exported that passes the elements to the function one-by-one as unwrapped tuples. This is the same as `pmap` where each worker is assigned batches of approximately equal sizes taken from the iterator product.
3939

4040
### pmapbatch and pmapbatch_elementwise
4141

@@ -99,11 +99,11 @@ julia> pmapreduce(x->ones(2).*myid(),x->hcat(x...),1:nworkers())
9999

100100
## ProductSplit
101101

102-
The package provides an iterator `ProductSplit` that lists that ranges of parameters that would be passed on to each core. This may be achieved using an
102+
In the above examples we have talked about the tasks being distributed approximately equally among the workers without going into details about the distribution, which is what we describe here. The package provides an iterator `ProductSplit` that lists that ranges of parameters that would be passed on to each core. This may equivalently be achieved using an
103103

104104
```Iterator.Take{Iterator.Drop{Iterator.ProductIterator}}```
105105

106-
with appropriately chosen parameters, and in many ways a `ProductSplit` behaves similarly. However this iterator supports several extra features such as `O(1)` indexing, which eliminates the need to actually iterate over it in many scenarios.
106+
with appropriately chosen parameters, and in many ways a `ProductSplit` behaves similarly. However a `ProductSplit` supports several extra features such as `O(1)` indexing, which eliminates the need to actually iterate over it in many scenarios.
107107

108108
The signature of the constructor is
109109

src/ParallelUtilities.jl

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,28 @@ using Reexport
44
@reexport using Distributed
55

66
export ProductSplit,
7-
evenlyscatterproduct,
8-
whichproc,
9-
newprocrange,
10-
whichproc,
11-
procidrange,
12-
localindex,
13-
procid_and_index,
14-
extremadims,
15-
extrema_commonlastdim,
16-
workersactive,
17-
nworkersactive,
18-
workerrank,
19-
nodenames,
20-
gethostnames,
21-
nprocs_node,
22-
pmapbatch,
23-
pmapbatch_elementwise,
24-
pmapsum,
25-
pmapsum_elementwise,
26-
pmapreduce,
27-
pmapreduce_commutative,
28-
pmapreduce_commutative_elementwise
7+
evenlyscatterproduct,
8+
ntasks,
9+
whichproc,
10+
whichproc,
11+
procrange_recast,
12+
localindex,
13+
procid_and_localindex,
14+
extremadims,
15+
extrema_commonlastdim,
16+
workersactive,
17+
nworkersactive,
18+
workerrank,
19+
nodenames,
20+
gethostnames,
21+
nprocs_node,
22+
pmapbatch,
23+
pmapbatch_elementwise,
24+
pmapsum,
25+
pmapsum_elementwise,
26+
pmapreduce,
27+
pmapreduce_commutative,
28+
pmapreduce_commutative_elementwise
2929

3030
# The fundamental iterator that behaves like an Iterator.Take{Iterator.Drop{Iterator.ProductIterator}}
3131

@@ -407,13 +407,20 @@ function whichproc(iterators::Tuple,val::Tuple,np::Int)
407407
# We may carry out a binary search as the iterators are sorted
408408
left,right = 1,np
409409

410+
val_t = ReverseLexicographicTuple(val)
411+
410412
while left <= right
411413
mid = floor(Int,(left+right)/2)
412414
ps = ProductSplit(iterators,np,mid)
413415

414-
if ReverseLexicographicTuple(val) < ReverseLexicographicTuple(first(ps))
416+
# If np is greater than the number of ntasks then it's possible
417+
# that ps is empty. In this case the value must be somewhere in
418+
# the previous workers. Otherwise each worker has some tasks and
419+
# these are sorted, so carry out a binary seaarch
420+
421+
if isempty(ps) || val_t < ReverseLexicographicTuple(first(ps))
415422
right = mid - 1
416-
elseif ReverseLexicographicTuple(val) > ReverseLexicographicTuple(last(ps))
423+
elseif val_t > ReverseLexicographicTuple(last(ps))
417424
left = mid + 1
418425
else
419426
return mid
@@ -423,8 +430,10 @@ function whichproc(iterators::Tuple,val::Tuple,np::Int)
423430
return nothing
424431
end
425432

433+
whichproc(iterators::Tuple,::Nothing,np::Int) = nothing
434+
426435
# This function is necessary when we're changing np
427-
function newprocrange(ps::ProductSplit,np_new::Int)
436+
function procrange_recast(ps::ProductSplit,np_new::Int)
428437

429438
if isempty(ps)
430439
return 0:-1 # empty range
@@ -450,13 +459,15 @@ function localindex(ps::ProductSplit{T},val::T) where {T}
450459
val == first(ps) && return left
451460
val == last(ps) && return right
452461

462+
val_t = ReverseLexicographicTuple(val)
463+
453464
while left <= right
454465
mid = floor(Int,(left+right)/2)
455466
val_mid = @inbounds ps[mid]
456467

457-
if ReverseLexicographicTuple(val) < ReverseLexicographicTuple(val_mid)
468+
if val_t < ReverseLexicographicTuple(val_mid)
458469
right = mid - 1
459-
elseif ReverseLexicographicTuple(val) > ReverseLexicographicTuple(val_mid)
470+
elseif val_t > ReverseLexicographicTuple(val_mid)
460471
left = mid + 1
461472
else
462473
return mid
@@ -466,12 +477,14 @@ function localindex(ps::ProductSplit{T},val::T) where {T}
466477
return nothing
467478
end
468479

480+
localindex(::ProductSplit,::Nothing) = nothing
481+
469482
function localindex(iterators::Tuple,val::Tuple,np::Integer,procid::Integer)
470483
ps = evenlyscatterproduct(iterators,np,procid)
471484
localindex(ps,val)
472485
end
473486

474-
function procid_and_index(iterators::Tuple,val::Tuple,np::Integer)
487+
function procid_and_localindex(iterators::Tuple,val::Tuple,np::Integer)
475488
procid = whichproc(iterators,val,np)
476489
index = localindex(iterators,val,np,procid)
477490
return procid,index

test/runtests.jl

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ end
3434
for np = 1:npmax, p = 1:np
3535
ps = ProductSplit(iters,np,p)
3636
@test collect(ps) == collect(split_product_across_processors_iterators(iters,np,p))
37-
@test ParallelUtilities.ntasks(ps) == ntasks_total
38-
@test ParallelUtilities.ntasks(ps.iterators) == ntasks_total
37+
@test ntasks(ps) == ntasks_total
38+
@test ntasks(ps.iterators) == ntasks_total
3939
end
4040

4141
@test_throws ParallelUtilities.ProcessorNumberError ProductSplit(iters,npmax,npmax+1)
@@ -75,24 +75,36 @@ end
7575
ps = ProductSplit(iters,2,2)
7676
@test ps.firstind == div(length(iters[1]),2) + 1
7777
@test ps.lastind == length(iters[1])
78+
79+
for np in length(iters[1])+1:length(iters[1])+10,
80+
p in length(iters[1])+1:np
81+
82+
ps = ProductSplit(iters,np,p)
83+
@test ps.firstind == length(iters[1]) + 1
84+
@test ps.lastind == length(iters[1])
85+
end
7886
end
7987
end
8088

8189
@testset "firstlast" begin
8290
@testset "first" begin
83-
for iters in [(1:10,),(1:10,4:6),(1:10,4:6,1:4),(1:2:10,4:1:6)],np=1:10
91+
for iters in [(1:10,),(1:10,4:6),(1:10,4:6,1:4),(1:2:10,4:1:6)],
92+
np=1:10ntasks(iters)
93+
8494
ps = ProductSplit(iters,np,1)
85-
@test first(ps) == map(first,iters)
95+
@test first(ps) == ( isempty(ps) ? nothing : map(first,iters) )
8696
end
8797

8898
iters = (1:1,)
8999
ps = ProductSplit(iters,2length(iters[1]),length(iters[1])+1) # must be empty
90100
@test first(ps) === nothing
91101
end
92102
@testset "last" begin
93-
for iters in [(1:10,),(1:10,4:6),(1:10,4:6,1:4),(1:2:10,4:1:6)],np=1:10
103+
for iters in [(1:10,),(1:10,4:6),(1:10,4:6,1:4),(1:2:10,4:1:6)],
104+
np=1:10ntasks(iters)
105+
94106
ps = ProductSplit(iters,np,np)
95-
@test last(ps) == map(last,iters)
107+
@test last(ps) == ( isempty(ps) ? nothing : map(last,iters) )
96108
end
97109

98110
iters = (1:1,)
@@ -172,35 +184,45 @@ end
172184
end
173185
end
174186

175-
@testset "whichproc + newprocrange" begin
187+
@testset "whichproc + procrange_recast" begin
176188
np,proc_id = 5,3
177189
iters = (1:10,4:6,1:4)
178190
ps = ProductSplit(iters,np,proc_id)
179191
@test whichproc(iters,first(ps),1) == 1
180192
@test whichproc(iters,(100,100,100),1) === nothing
181-
@test newprocrange(ps,1) == 1:1
193+
@test procrange_recast(ps,1) == 1:1
182194

183-
for np_new in 1:np
195+
iters = (1:1,2:2)
196+
ps = ProductSplit(iters,np,proc_id)
197+
@test whichproc(iters,first(ps),np) === nothing
198+
@test whichproc(iters,nothing,np) === nothing
199+
200+
for np_new in 1:10ntasks(iters)
184201
for proc_id_new=1:np_new
185202
ps_new = ProductSplit(iters,np_new,proc_id_new)
203+
186204
for val in ps_new
205+
# Should loop only if ps_new is non-empty
187206
@test whichproc(iters,val,np_new) == proc_id_new
188207
end
189208
end
190209
procid_new_first = whichproc(iters,first(ps),np_new)
191210
proc_new_last = whichproc(iters,last(ps),np_new)
192-
@test newprocrange(ps,np_new) == procid_new_first:proc_new_last
211+
@test procrange_recast(ps,np_new) == (isempty(ps) ? (0:-1) : (procid_new_first:proc_new_last))
193212
end
194213
end
195214

196215
@testset "localindex" begin
197216

198217
for iters in [(1:10,),(1:10,4:6),(1:10,4:6,1:4),(1:2:10,4:1:6)]
199-
for np=1:10,proc_id=1:np
218+
for np=1:10ntasks(iters),proc_id=1:np
200219
ps = ProductSplit(iters,np,proc_id)
201220
for (ind,val) in enumerate(ps)
202221
@test localindex(ps,val) == ind
203222
end
223+
if isempty(ps)
224+
@test localindex(ps,first(ps)) === nothing
225+
end
204226
end
205227
end
206228
end

0 commit comments

Comments
 (0)