Skip to content

Commit 08630e1

Browse files
committed
updated get_processor_id_from_split_array and get_index_in_split_array to use binary search
1 parent 6f0f5c9 commit 08630e1

File tree

1 file changed

+65
-123
lines changed

1 file changed

+65
-123
lines changed

src/ParallelUtilities.jl

Lines changed: 65 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -285,25 +285,20 @@ function _infullrange(val,t::Tuple)
285285
end
286286
_infullrange(::Tuple{},::Tuple{}) = true
287287

288-
struct OrderedTuple{T}
288+
# This struct is just a wrapper to flip the tuples before comparing
289+
struct LittleEndianTuple{T}
289290
t :: T
290291
end
291292

292-
function Base.:<=(a::OrderedTuple{T},b::OrderedTuple{T}) where {T}
293-
_le(reverse(a.t),reverse(b.t))
294-
end
295-
296-
function _le(t1::Tuple,t2::Tuple)
297-
first(t1) < first(t2) || ((first(t1) == first(t2)) & _le(Base.tail(t1),Base.tail(t2)))
298-
end
299-
_le(::Tuple{},::Tuple{}) = true
293+
Base.isless(a::LittleEndianTuple{T},b::LittleEndianTuple{T}) where {T} = reverse(a.t) < reverse(b.t)
294+
Base.isequal(a::LittleEndianTuple{T},b::LittleEndianTuple{T}) where {T} = a.t == b.t
300295

301296
function Base.in(val::T,p::ProductSplit{T}) where {T}
302297
_infullrange(val,p) || return false
303298

304-
val_ot = OrderedTuple(val)
305-
first_iter = OrderedTuple(p[1])
306-
last_iter = OrderedTuple(p[end])
299+
val_ot = LittleEndianTuple(val)
300+
first_iter = LittleEndianTuple(p[1])
301+
last_iter = LittleEndianTuple(p[end])
307302

308303
first_iter <= val_ot <= last_iter
309304
end
@@ -317,161 +312,108 @@ function worker_rank()
317312
myid()-minimum(workers())+1
318313
end
319314

320-
# function split_across_processors_iterators(arr₁::Base.Iterators.ProductIterator,num_procs,proc_id)
321-
322-
# @assert(proc_id<=num_procs,"processor rank has to be less than number of workers engaged")
323-
324-
# num_tasks = length(arr₁);
325-
326-
# num_tasks_per_process,num_tasks_leftover = div(num_tasks,num_procs),mod(num_tasks,num_procs)
327-
328-
# num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 );
329-
# task_start = num_tasks_per_process*(proc_id-1) + min(num_tasks_leftover,proc_id-1) + 1;
330-
331-
# Iterators.take(Iterators.drop(arr₁,task_start-1),num_tasks_on_proc)
332-
# end
333-
334-
# function split_product_across_processors_iterators(arrs_tuple,num_procs,proc_id)
335-
# split_across_processors_iterators(Iterators.product(arrs_tuple...),num_procs,proc_id)
336-
# end
337-
338-
function split_across_processors(num_tasks::Integer,num_procs=nworkers(),proc_id=worker_rank())
339-
split_product_across_processors((1:num_tasks,),num_procs,proc_id)
315+
function split_across_processors(num_tasks::Integer,np=nworkers(),pid=worker_rank())
316+
split_product_across_processors((1:num_tasks,),np,pid)
340317
end
341318

342-
function split_product_across_processors(arrs_tuple::Tuple,
343-
num_procs::Integer=nworkers(),proc_id::Integer=worker_rank())
319+
function split_product_across_processors(iterators::Tuple,
320+
np::Integer=nworkers(),pid::Integer=worker_rank())
344321

345-
ProductSplit(arrs_tuple,num_procs,proc_id)
322+
ProductSplit(iterators,np,pid)
346323
end
347324

348-
function get_processor_id_from_split_array(arr₁::AbstractVector,arr₂::AbstractVector,
349-
(arr₁_value,arr₂_value)::Tuple,num_procs::Integer)
350-
# Find the closest match in arrays
351-
352-
if (arr₁_value arr₁) || (arr₂_value arr₂)
353-
return nothing # invalid
354-
end
325+
function get_processor_id_from_split_array(iterators::Tuple,val::Tuple,np::Int)
355326

356-
num_tasks = length(arr₁)*length(arr₂);
357-
358-
a1_match_index = searchsortedfirst(arr₁,arr₁_value)
359-
a2_match_index = searchsortedfirst(arr₂,arr₂_value)
360-
361-
num_tasks_per_process,num_tasks_leftover = div(num_tasks,num_procs),mod(num_tasks,num_procs)
327+
_infullrange(val,iterators) || return nothing
362328

363-
proc_id = 1
364-
num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 );
365-
total_tasks_till_proc_id = num_tasks_on_proc
329+
# We may carry out a binary search as the iterators are sorted
330+
left,right = 1,np
366331

367-
task_no = 0
368-
369-
for (ind2,a2) in enumerate(arr₂), (ind1,a1) in enumerate(arr₁)
370-
371-
task_no +=1
372-
if task_no > total_tasks_till_proc_id
373-
proc_id += 1
374-
num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 );
375-
total_tasks_till_proc_id += num_tasks_on_proc
376-
end
377-
378-
if ind2< a2_match_index
379-
continue
380-
end
332+
while left <= right
333+
mid = floor(Int,(left+right)/2)
334+
ps = ProductSplit(iterators,np,mid)
381335

382-
if (ind2 == a2_match_index) && (ind1 == a1_match_index)
383-
break
336+
if LittleEndianTuple(val) < LittleEndianTuple(first(ps))
337+
right = mid - 1
338+
elseif LittleEndianTuple(val) > LittleEndianTuple(last(ps))
339+
left = mid + 1
340+
else
341+
return mid
384342
end
385343
end
386344

387-
return proc_id
388-
end
389-
390-
function get_processor_id_from_split_array(iterators,val,num_procs)
391-
for proc_id in 1:num_procs
392-
tasks_on_proc = split_product_across_processors(iterators,num_procs,proc_id)
393-
if val tasks_on_proc
394-
return proc_id
395-
end
396-
end
397345
return nothing
398346
end
399347

400-
function get_processor_id_from_split_array(iter::ProductSplit{T},val::T) where {T}
401-
get_processor_id_from_split_array(iter.iterators_product,val,iter.num_procs)
402-
end
403-
404-
function get_processor_range_from_split_array(iter,vals,num_procs::Integer)
348+
# This function is necessary when you're changing np
349+
function get_processor_range_from_split_array(ps::ProductSplit,np_new::Int)
405350

406-
if isempty(vals)
351+
if length(ps)==0
407352
return 0:-1 # empty range
408353
end
409354

410-
first_task = first(vals)
411-
proc_id_start = get_processor_id_from_split_array(iter,first_task,num_procs)
412-
413-
last_task = first_task
414-
if length(vals) == 1
415-
return proc_id_start:proc_id_start
416-
end
417-
418-
for t in vals
419-
last_task = t
355+
pid_start = get_processor_id_from_split_array(ps.iterators,first(ps),np_new)
356+
if length(ps) == 1
357+
return pid_start:pid_start
420358
end
421359

422-
proc_id_end = get_processor_id_from_split_array(iter,last_task,num_procs)
423-
return proc_id_start:proc_id_end
360+
pid_end = get_processor_id_from_split_array(ps.iterators,last(ps),np_new)
361+
return pid_start:pid_end
424362
end
425363

426-
function get_processor_range_from_split_array(iter::ProductSplit{T},val::T) where {T}
427-
get_processor_range_from_split_array(iter.iterators_product,val,iterators.num_procs)
428-
end
364+
function get_index_in_split_array(ps::ProductSplit{T},val::T) where {T}
365+
# Can carry out a binary search
429366

430-
get_processor_range_from_split_array(arr₁::AbstractVector,arr₂::AbstractVector,
431-
vals,num_procs::Integer) =
432-
get_processor_range_from_split_array(Iterators.product(arr₁,arr₂),
433-
vals,num_procs)
367+
(length(ps) == 0 || val ps) && return nothing
434368

435-
function get_index_in_split_array(iter_section,val::Tuple)
436-
if isnothing(iter_section)
437-
return nothing
438-
end
439-
for (ind,val_ind) in enumerate(iter_section)
440-
if val_ind == val
441-
return ind
369+
left,right = 1,length(ps)
370+
371+
val == first(ps) && return left
372+
val == last(ps) && return right
373+
374+
while left <= right
375+
mid = floor(Int,(left+right)/2)
376+
val_mid = @inbounds ps[mid]
377+
378+
if LittleEndianTuple(val) < LittleEndianTuple(val_mid)
379+
right = mid - 1
380+
elseif LittleEndianTuple(val) > LittleEndianTuple(val_mid)
381+
left = mid + 1
382+
else
383+
return mid
442384
end
443385
end
444-
nothing
386+
387+
return nothing
445388
end
446389

447390
function procid_and_mode_index(arr₁::AbstractVector,arr₂::AbstractVector,
448-
(arr₁_value,arr₂_value)::Tuple,num_procs::Integer)
449-
proc_id_mode = get_processor_id_from_split_array(arr₁,arr₂,(arr₁_value,arr₂_value),num_procs)
450-
modes_in_procid_file = split_product_across_processors(arr₁,arr₂,num_procs,proc_id_mode)
391+
(arr₁_value,arr₂_value)::Tuple,np::Integer)
392+
pid_mode = get_processor_id_from_split_array(arr₁,arr₂,(arr₁_value,arr₂_value),np)
393+
modes_in_procid_file = split_product_across_processors(arr₁,arr₂,np,pid_mode)
451394
mode_index = get_index_in_split_array(modes_in_procid_file,(arr₁_value,arr₂_value))
452-
return proc_id_mode,mode_index
395+
return pid_mode,mode_index
453396
end
454397

455-
function procid_and_mode_index(iter,val::Tuple,num_procs::Integer)
456-
457-
proc_id_mode = get_processor_id_from_split_array(iter,val,num_procs)
458-
modes_in_procid_file = split_across_processors(iter,num_procs,proc_id_mode)
398+
function procid_and_mode_index(iterators::Tuple,val::Tuple,np::Integer)
399+
pid_mode = get_processor_id_from_split_array(iterators,val,np)
400+
modes_in_procid_file = split_across_processors(iterators,np,pid_mode)
459401
mode_index = get_index_in_split_array(modes_in_procid_file,val)
460-
return proc_id_mode,mode_index
402+
return pid_mode,mode_index
461403
end
462404

463405
function mode_index_in_file(arr₁::AbstractVector,arr₂::AbstractVector,
464-
(arr₁_value,arr₂_value)::Tuple,num_procs::Integer,proc_id_mode::Integer)
406+
(arr₁_value,arr₂_value)::Tuple,np::Integer,pid_mode::Integer)
465407

466-
modes_in_procid_file = split_product_across_processors(arr₁,arr₂,num_procs,proc_id_mode)
408+
modes_in_procid_file = split_product_across_processors(arr₁,arr₂,np,pid_mode)
467409
mode_index = get_index_in_split_array(modes_in_procid_file,(arr₁_value,arr₂_value))
468410
end
469411

470412
function procid_allmodes(arr₁::AbstractVector,arr₂::AbstractVector,
471-
iter,num_procs=nworkers_active(arr₁,arr₂))
413+
iter,np=nworkers_active(arr₁,arr₂))
472414
procid = zeros(Int64,length(iter))
473415
for (ind,mode) in enumerate(iter)
474-
procid[ind] = get_processor_id_from_split_array(arr₁,arr₂,mode,num_procs)
416+
procid[ind] = get_processor_id_from_split_array(arr₁,arr₂,mode,np)
475417
end
476418
return procid
477419
end

0 commit comments

Comments
 (0)