Skip to content

Commit d8411bc

Browse files
committed
ProductSplit supports indexing, iteration and minimum, maximum, extrema
1 parent d028237 commit d8411bc

File tree

5 files changed

+394
-7
lines changed

5 files changed

+394
-7
lines changed

.travis.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Documentation: http://docs.travis-ci.com/user/languages/julia/
2+
language: julia
3+
os:
4+
- linux
5+
- osx
6+
julia:
7+
- 1.0
8+
- nightly
9+
matrix:
10+
allow_failures:
11+
- julia: nightly
12+
fast_finish: true
13+
notifications:
14+
email: false
15+
after_success:
16+
- julia -e 'using Pkg; Pkg.add("Coverage"); using Coverage; Codecov.submit(process_folder())'

Project.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ version = "0.1.0"
66
[deps]
77
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
88
Reexport = "189a3867-3050-52da-a836-e630ba90ab69"
9-
TimerOutputs = "a759f4b9-e2f1-59dc-863e-4aeb61b1ea8f"
9+
10+
[compat]
11+
Reexport = "0.2"

src/ParallelUtilities.jl

Lines changed: 285 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,285 @@
11
module ParallelUtilities
22

3-
using Reexport,TimerOutputs
3+
using Reexport
44
@reexport using Distributed
55

6-
export split_across_processors,split_product_across_processors,
6+
export ProductSplit,split_across_processors,split_product_across_processors,
77
get_processor_id_from_split_array,procid_allmodes,mode_index_in_file,
88
get_processor_range_from_split_array,workers_active,nworkers_active,worker_rank,
99
get_index_in_split_array,procid_and_mode_index,extrema_from_split_array,
1010
pmapsum,pmapreduce,pmap_onebatch_per_worker,moderanges_common_lastarray,
1111
get_nodes,get_hostnames,get_nprocs_node
1212

13+
# The fundamental iterator that behaves like an Iterator.ProductIterator
14+
15+
struct ProcessorNumberError <: Exception
16+
p :: Int
17+
np :: Int
18+
end
19+
20+
function Base.showerror(io::IO,p::ProcessorNumberError)
21+
print(io,"Processor id $(p.p) does not line in the range $(1:p.np)")
22+
end
23+
24+
struct DecreasingIteratorError <: Exception
25+
end
26+
27+
function Base.showerror(io::IO,p::DecreasingIteratorError)
28+
print(io,"All the iterators need to be strictly increasing")
29+
end
30+
31+
struct ProductSplit{T,N,Q}
32+
iterators :: NTuple{N,Q}
33+
togglelevels :: NTuple{N,Int}
34+
np :: Int
35+
p :: Int
36+
firstind :: Int
37+
lastind :: Int
38+
39+
function ProductSplit(iterators::NTuple{N,Q},togglelevels::NTuple{N,Int},
40+
np::Int,p::Int,firstind::Int,lastind::Int) where {N,Q<:AbstractRange}
41+
42+
1 <= p <= np || throw(ProcessorNumberError(p,np))
43+
T = NTuple{N,eltype(Q)}
44+
45+
# Check to make sure that all the iterators are increasing
46+
all(x->step(x)>0,iterators) || throw(DecreasingIteratorError())
47+
48+
new{T,N,Q}(iterators,togglelevels,np,p,firstind,lastind)
49+
end
50+
end
51+
Base.eltype(::ProductSplit{T}) where {T} = T
52+
53+
function _cumprod(len)
54+
(0,_cumprod(first(len),Base.tail(len))...)
55+
end
56+
57+
_cumprod(::Int,::Tuple{}) = ()
58+
function _cumprod(n::Int,tl::Tuple)
59+
(n,_cumprod(n*first(tl),Base.tail(tl))...)
60+
end
61+
62+
function ProductSplit(iterators::NTuple{N,Q},np,p) where {N,Q}
63+
T = NTuple{N,eltype(Q)}
64+
len = Base.Iterators._prod_size(iterators)
65+
Nel = prod(len)
66+
togglelevels = _cumprod(len)
67+
d,r = divrem(Nel,np)
68+
firstind = d*(p-1) + min(r,p-1) + 1
69+
lastind = d*p + min(r,p)
70+
ProductSplit(iterators,togglelevels,np,p,firstind,lastind)
71+
end
72+
73+
@inline Base.@propagate_inbounds Base.first(p::ProductSplit) =
74+
_first(p.iterators,childindex(p,p.firstind)...)
75+
76+
@inline function _first(t::Tuple,ind::Int,rest::Int...)
77+
@boundscheck (1 <= ind <= length(first(t))) || throw(BoundsError(first(t),ind))
78+
(@inbounds first(t)[ind],_first(Base.tail(t),rest...)...)
79+
end
80+
@inline _first(::Tuple{},rest...) = ()
81+
82+
@inline Base.length(p::ProductSplit) = p.lastind - p.firstind + 1
83+
@inline Base.lastindex(p::ProductSplit) = p.lastind - p.firstind + 1
84+
85+
@inline function childindex(p::ProductSplit,ind::Int)
86+
tl = reverse(Base.tail(p.togglelevels))
87+
reverse(childindex(tl,ind))
88+
end
89+
90+
@inline function childindex(tl::Tuple,ind::Int)
91+
t = first(tl)
92+
k = div(ind-1,t)
93+
(k+1,childindex(Base.tail(tl),ind-k*t)...)
94+
end
95+
96+
# First iterator gets the final remainder
97+
@inline childindex(::Tuple{},ind::Int) = (ind,)
98+
99+
@inline childindexshifted(p::ProductSplit,ind::Int) = childindex(p, (ind - 1) + p.firstind)
100+
101+
@inline Base.@propagate_inbounds function Base.getindex(p::ProductSplit,ind::Int)
102+
_getindex(p,childindexshifted(p, ind)...)
103+
end
104+
# This needs to be a separate function to deal with the case of a single child iterator, in which case
105+
# it's not clear if the single index is for the ProductSplit or the child iterator
106+
107+
# This method asserts that the number of indices are correct
108+
@inline Base.@propagate_inbounds function _getindex(p::ProductSplit{<:Any,N},
109+
inds::Vararg{Int,N}) where {N}
110+
111+
_getindex(p.iterators,inds...)
112+
end
113+
114+
@inline function _getindex(p::Tuple,ind::Int,rest::Int...)
115+
@boundscheck (1 <= ind <= length(first(p))) || throw(BoundsError(first(p),ind))
116+
(@inbounds first(p)[ind],_getindex(Base.tail(p),rest...)...)
117+
end
118+
@inline _getindex(::Tuple{},rest::Int...) = ()
119+
120+
function Base.iterate(p::ProductSplit,state=(first(p),1))
121+
el,n = state
122+
123+
if n > length(p)
124+
return nothing
125+
elseif n == length(p)
126+
# In this case the next value doesn't matter, so just return something arbitary
127+
return (el,(p[1],n+1))
128+
end
129+
130+
(el,(p[n+1],n+1))
131+
end
132+
133+
@inline Base.@propagate_inbounds function _firstlastdim(p::ProductSplit{<:Any,N},dim::Int,
134+
firstindchild::Tuple=childindex(p,p.firstind),
135+
lastindchild::Tuple=childindex(p,p.lastind)) where {N}
136+
137+
_firstlastdim(p.iterators,dim,firstindchild,lastindchild)
138+
end
139+
140+
@inline function _firstlastdim(iterators::NTuple{N,<:Any},dim::Int,
141+
firstindchild::Tuple,lastindchild::Tuple) where {N}
142+
143+
@boundscheck (1 <= dim <= N) || throw(BoundsError(iterators,dim))
144+
145+
iter = @inbounds iterators[dim]
146+
147+
fic = @inbounds firstindchild[dim]
148+
lic = @inbounds lastindchild[dim]
149+
150+
first_iter = @inbounds iter[fic]
151+
last_iter = @inbounds iter[lic]
152+
153+
(first_iter,last_iter)
154+
end
155+
156+
function _checkrollover(p::ProductSplit{<:Any,N},dim::Int,
157+
firstindchild::Tuple=childindex(p,p.firstind),
158+
lastindchild::Tuple=childindex(p,p.lastind)) where {N}
159+
160+
_checkrollover(p.iterators,dim,firstindchild,lastindchild)
161+
end
162+
163+
function _checkrollover(t::NTuple{N,<:Any},dim::Int,
164+
firstindchild::Tuple,lastindchild::Tuple) where {N}
165+
166+
if dim > 0
167+
return _checkrollover(Base.tail(t),dim-1,Base.tail(firstindchild),Base.tail(lastindchild))
168+
end
169+
170+
!_checknorollover(reverse(t),reverse(firstindchild),reverse(lastindchild))
171+
end
172+
173+
function _checknorollover(t,firstindchild,lastindchild)
174+
iter = first(t)
175+
first_iter = iter[first(firstindchild)]
176+
last_iter = iter[first(lastindchild)]
177+
178+
(last_iter == first_iter) &
179+
_checknorollover(Base.tail(t),Base.tail(firstindchild),Base.tail(lastindchild))
180+
end
181+
_checknorollover(::Tuple{},::Tuple{},::Tuple{}) = true
182+
183+
@inline function Base.maximum(p::ProductSplit{<:Any,1},dim::Int=1)
184+
@boundscheck (dim > 1) && throw(BoundsError(p.iterators,dim))
185+
lastindchild = childindex(p,p.lastind)
186+
@inbounds lic_dim = lastindchild[1]
187+
@inbounds iter = p.iterators[1]
188+
iter[lic_dim]
189+
end
190+
191+
@inline function Base.maximum(p::ProductSplit{<:Any,N},dim::Int) where {N}
192+
193+
@boundscheck (1 <= dim <= N) || throw(BoundsError(p.iterators,dim))
194+
195+
firstindchild = childindex(p,p.firstind)
196+
lastindchild = childindex(p,p.lastind)
197+
198+
@inbounds first_iter,last_iter = _firstlastdim(p,dim,firstindchild,lastindchild)
199+
200+
v = last_iter
201+
202+
# The last index will not roll over so this can be handled easily
203+
if dim == N
204+
return v
205+
end
206+
207+
if _checkrollover(p,dim,firstindchild,lastindchild)
208+
iter = @inbounds p.iterators[dim]
209+
v = maximum(iter)
210+
end
211+
212+
return v
213+
end
214+
215+
@inline function Base.minimum(p::ProductSplit{<:Any,1},dim::Int=1)
216+
@boundscheck (dim > 1) && throw(BoundsError(p.iterators,dim))
217+
firstindchild = childindex(p,p.firstind)
218+
@inbounds fic_dim = firstindchild[1]
219+
@inbounds iter = p.iterators[1]
220+
iter[fic_dim]
221+
end
222+
223+
@inline function Base.minimum(p::ProductSplit{<:Any,N},dim::Int) where {N}
224+
225+
@boundscheck (1 <= dim <= N) || throw(BoundsError(p.iterators,dim))
226+
227+
firstindchild = childindex(p,p.firstind)
228+
lastindchild = childindex(p,p.lastind)
229+
230+
@inbounds first_iter,last_iter = _firstlastdim(p,dim,firstindchild,lastindchild)
231+
232+
v = first_iter
233+
234+
# The last index will not roll over so this can be handled easily
235+
if dim == N
236+
return v
237+
end
238+
239+
if _checkrollover(p,dim,firstindchild,lastindchild)
240+
iter = @inbounds p.iterators[dim]
241+
v = minimum(iter)
242+
end
243+
244+
return v
245+
end
246+
247+
@inline function Base.extrema(p::ProductSplit{<:Any,1},dim::Int=1)
248+
@boundscheck (dim > 1) && throw(BoundsError(p.iterators,dim))
249+
firstindchild = childindex(p,p.firstind)
250+
lastindchild = childindex(p,p.lastind)
251+
@inbounds fic_dim = firstindchild[1]
252+
@inbounds lic_dim = lastindchild[1]
253+
@inbounds iter = p.iterators[1]
254+
255+
(iter[fic_dim],iter[lic_dim])
256+
end
257+
258+
@inline function Base.extrema(p::ProductSplit{<:Any,N},dim::Int) where {N}
259+
260+
@boundscheck (1 <= dim <= N) || throw(BoundsError(p.iterators,dim))
261+
262+
firstindchild = childindex(p,p.firstind)
263+
lastindchild = childindex(p,p.lastind)
264+
265+
@inbounds first_iter,last_iter = _firstlastdim(p,dim,firstindchild,lastindchild)
266+
267+
v = (first_iter,last_iter)
268+
# The last index will not roll over so this can be handled easily
269+
if dim == N
270+
return v
271+
end
272+
273+
if _checkrollover(p,dim,firstindchild,lastindchild)
274+
iter = @inbounds p.iterators[dim]
275+
v = extrema(iter)
276+
end
277+
278+
return v
279+
end
280+
281+
###################################################################################################
282+
13283
function worker_rank()
14284
if nworkers()==1
15285
return 1
@@ -30,7 +300,7 @@ function split_across_processors(num_tasks::Integer,num_procs=nworkers(),proc_id
30300
return task_start:(task_start+num_tasks_on_proc-1)
31301
end
32302

33-
function split_across_processors(arr₁,num_procs=nworkers(),proc_id=worker_rank())
303+
function split_across_processors(arr₁::Base.Iterators.ProductIterator,num_procs=nworkers(),proc_id=worker_rank())
34304

35305
@assert(proc_id<=num_procs,"processor rank has to be less than number of workers engaged")
36306

@@ -39,7 +309,7 @@ function split_across_processors(arr₁,num_procs=nworkers(),proc_id=worker_rank
39309
num_tasks_per_process,num_tasks_leftover = div(num_tasks,num_procs),mod(num_tasks,num_procs)
40310

41311
num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 );
42-
task_start = num_tasks_per_process*(proc_id-1) + min(num_tasks_leftover+1,proc_id);
312+
task_start = num_tasks_per_process*(proc_id-1) + min(num_tasks_leftover,proc_id-1) + 1;
43313

44314
Iterators.take(Iterators.drop(arr₁,task_start-1),num_tasks_on_proc)
45315
end
@@ -50,7 +320,8 @@ function split_product_across_processors(arr₁::AbstractVector,arr₂::Abstract
50320
split_across_processors(Iterators.product(arr₁,arr₂),num_procs,proc_id)
51321
end
52322

53-
function split_product_across_processors(arrs_tuple,num_procs::Integer=nworkers(),proc_id::Integer=worker_rank())
323+
function split_product_across_processors(arrs_tuple::NTuple,
324+
num_procs::Integer=nworkers(),proc_id::Integer=worker_rank())
54325
return split_across_processors(Iterators.product(arrs_tuple...),num_procs,proc_id)
55326
end
56327

@@ -103,7 +374,11 @@ function get_processor_id_from_split_array(iter,val,num_procs)
103374
return proc_id
104375
end
105376
end
106-
return 0
377+
return nothing
378+
end
379+
380+
function get_processor_id_from_split_array(iter::ProductSplit{T},val::T) where {T}
381+
get_processor_id_from_split_array(iter.iterators_product,val,iter.num_procs)
107382
end
108383

109384
function get_processor_range_from_split_array(iter,vals,num_procs::Integer)
@@ -128,6 +403,10 @@ function get_processor_range_from_split_array(iter,vals,num_procs::Integer)
128403
return proc_id_start:proc_id_end
129404
end
130405

406+
function get_processor_range_from_split_array(iter::ProductSplit{T},val::T) where {T}
407+
get_processor_range_from_split_array(iter.iterators_product,val,iterators.num_procs)
408+
end
409+
131410
get_processor_range_from_split_array(arr₁::AbstractVector,arr₂::AbstractVector,
132411
vals,num_procs::Integer) =
133412
get_processor_range_from_split_array(Iterators.product(arr₁,arr₂),

test/Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[deps]
2+
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

0 commit comments

Comments
 (0)