Skip to content

Commit 4fa4d32

Browse files
committed
catch remote errors
1 parent d8fddd9 commit 4fa4d32

File tree

4 files changed

+280
-167
lines changed

4 files changed

+280
-167
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "ParallelUtilities"
22
uuid = "fad6cfc8-4f83-11e9-06cc-151124046ad0"
33
authors = ["Jishnu Bhattacharya <[email protected]>"]
4-
version = "0.1.1"
4+
version = "0.2.0"
55

66
[deps]
77
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"

README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ There are a total of 36 possible `(x,y,z)` combinations possible given these ran
3636

3737
The package provides versions of `pmap` with an optional reduction. These differ from the one provided by `Distributed` in a few key aspects: firstly, the iterator product of the argument is what is passed to the function and not the arguments by elementwise, so the i-th task will be `Iterator.product(args...)[i]` and not `[x[i] for x in args]`. Specifically the second set of parameters in the example above will be `(2,2,3)` and not `(2,3,4)`.
3838

39-
Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Secondly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last worker that has any tasks assigned to it. The tasks are also approximately evenly distributed across processors, assuming that the function takes an equal amount of time to run for each set of parameters. The function `pmapbatch_elementwise` is also exported that passes the elements to the function one-by-one as unwrapped tuples. This is the same as `pmap` where each worker is assigned batches of approximately equal sizes taken from the iterator product.
39+
Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Thirdly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last worker that has any tasks assigned to it. The tasks are also approximately evenly distributed across processors, assuming that the function takes an equal amount of time to run for each set of parameters. The function `pmapbatch_elementwise` is also exported that passes the elements to the function one-by-one as unwrapped tuples. This produces the same result as `pmap` where each worker is assigned batches of approximately equal sizes taken from the iterator product.
4040

4141
### pmapbatch and pmapbatch_elementwise
4242

@@ -51,6 +51,19 @@ julia> Tuple(p)
5151
# Check for correctness
5252
julia> p == map(f,vec(collect(Iterators.product(xrange,yrange,zrange))))
5353
true
54+
55+
# pmapbatch_elementwise produces the same result as pmap, although the internals are different
56+
julia> pmapbatch_elementwise(x->x^2,1:3)
57+
3-element Array{Int64,1}:
58+
1
59+
4
60+
9
61+
62+
julia> pmap(x->x^2,1:3)
63+
3-element Array{Int64,1}:
64+
1
65+
4
66+
9
5467
```
5568

5669
There is also a function `pmapbatch` that deals with batches of parameters that are passed to each processor, and `pmap_elementwise` calls this function under the hood to process the parameters one by one. We may use this directly as well if we need the entire batch for some reason (eg. reading values off a disk, which needs to be done once for the entire set and not for every parameter). As an example we demonstrate how to obtain the same result as above using `pmapbatch`:

src/ParallelUtilities.jl

Lines changed: 130 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -543,99 +543,119 @@ nprocs_node(procs_used::Vector{<:Integer} = workers()) = nprocs_node(gethostname
543543
# pmapsum and pmapreduce
544544
############################################################################################
545545

546+
function throwRemoteException(e::Exception)
547+
c = CapturedException(e,catch_backtrace())
548+
throw(RemoteException(c))
549+
end
550+
546551
# This function does not sort the values, so it might be faster
547-
function pmapreduce_commutative(::Type{T},fmap::Function,freduce::Function,
548-
iterators::Tuple,args...;kwargs...) where {T}
552+
function pmapreduce_commutative(fmap::Function,freduce::Function,iterators::Tuple,args...;kwargs...)
549553

550554
procs_used = workersactive(iterators)
551555

552556
num_workers = length(procs_used);
553557
hostnames = gethostnames(procs_used);
554558
nodes = nodenames(hostnames);
555-
procid_rank0_on_node = [procs_used[findfirst(isequal(node),hostnames)] for node in nodes];
559+
procid_rank1_on_node = [procs_used[findfirst(isequal(node),hostnames)] for node in nodes];
556560

557561
nprocs_node_dict = nprocs_node(procs_used)
558-
node_channels = Dict(node=>RemoteChannel(()->Channel{T}(nprocs_node_dict[node]),procid_node)
559-
for (node,procid_node) in zip(nodes,procid_rank0_on_node))
562+
node_channels = Dict(
563+
node=>RemoteChannel(()->Channel{Any}(nprocs_node_dict[node]),procid_node)
564+
for (node,procid_node) in zip(nodes,procid_rank1_on_node))
565+
566+
# Worker at which the final reduction takes place
567+
p_final = first(procid_rank1_on_node)
568+
569+
finalnode_reducechannel = RemoteChannel(()->Channel{Any}(length(procid_rank1_on_node)),p_final)
560570

561-
# Worker at which final reduction takes place
562-
p_final = first(procid_rank0_on_node)
571+
Ntasks_total = num_workers + length(procid_rank1_on_node) + 1
563572

564-
sum_channel = RemoteChannel(()->Channel{T}(length(procid_rank0_on_node)),p_final)
565-
result = nothing
573+
result_channel = RemoteChannel(()->Channel{Any}(1))
566574

567575
# Run the function on each processor and compute the reduction at each node
568576
@sync for (rank,(p,node)) in enumerate(zip(procs_used,hostnames))
569577
@async begin
570578

571-
node_remotechannel = node_channels[node]
579+
eachnode_reducechannel = node_channels[node]
580+
572581
np_node = nprocs_node_dict[node]
573582

574583
iterable_on_proc = evenlyscatterproduct(iterators,num_workers,rank)
575-
@spawnat p put!(node_remotechannel,
576-
fmap(iterable_on_proc,args...;kwargs...))
577-
578-
@async if p in procid_rank0_on_node
579-
f = @spawnat p put!(sum_channel,
580-
freduce(take!(node_remotechannel) for i=1:np_node))
581-
wait(f)
582-
@spawnat p finalize(node_remotechannel)
584+
585+
@spawnat p begin
586+
try
587+
res = fmap(iterable_on_proc,args...;kwargs...)
588+
put!(eachnode_reducechannel,res)
589+
catch e
590+
throwRemoteException(e)
591+
finally
592+
if p procid_rank1_on_node
593+
finalize(eachnode_reducechannel)
594+
end
595+
end
596+
end
597+
598+
@async if p in procid_rank1_on_node
599+
@spawnat p begin
600+
try
601+
res = freduce(take!(eachnode_reducechannel) for i=1:np_node)
602+
put!(finalnode_reducechannel,res)
603+
catch e
604+
throwRemoteException(e)
605+
finally
606+
finalize(eachnode_reducechannel)
607+
if p != p_final
608+
finalize(finalnode_reducechannel)
609+
end
610+
end
611+
end
583612
end
584613

585-
@async if p==p_final
586-
result = @fetchfrom p_final freduce(take!(sum_channel)
587-
for i=1:length(procid_rank0_on_node))
588-
@spawnat p finalize(sum_channel)
614+
@async if p == p_final
615+
@spawnat p begin
616+
try
617+
res = freduce(take!(finalnode_reducechannel)
618+
for i=1:length(procid_rank1_on_node))
619+
620+
put!(result_channel,res)
621+
catch e
622+
throwRemoteException(e)
623+
finally
624+
finalize(finalnode_reducechannel)
625+
626+
if p != result_channel.where
627+
finalize(result_channel)
628+
end
629+
end
630+
end
589631
end
590632
end
591633
end
592634

593-
return result
594-
end
595-
596-
function pmapreduce_commutative(::Type{T},fmap::Function,freduce::Function,
597-
itp::Iterators.ProductIterator,args...;kwargs...) where {T}
598-
599-
pmapreduce_commutative(T,fmap,freduce,itp.iterators,args...;kwargs...)
635+
take!(result_channel)
600636
end
601637

602-
function pmapreduce_commutative(::Type{T},fmap::Function,freduce::Function,
603-
iterable,args...;kwargs...) where {T}
638+
function pmapreduce_commutative(fmap::Function,freduce::Function,
639+
itp::Iterators.ProductIterator,args...;kwargs...)
604640

605-
pmapreduce_commutative(T,fmap,freduce,(iterable,),args...;kwargs...)
641+
pmapreduce_commutative(fmap,freduce,itp.iterators,args...;kwargs...)
606642
end
607643

608644
function pmapreduce_commutative(fmap::Function,freduce::Function,iterable,args...;kwargs...)
609-
pmapreduce_commutative(Any,fmap,freduce,iterable,args...;kwargs...)
645+
pmapreduce_commutative(fmap,freduce,(iterable,),args...;kwargs...)
610646
end
611647

612-
function pmapreduce_commutative_elementwise(::Type{T},fmap::Function,freduce::Function,
613-
iterable,args...;kwargs...) where {T}
614-
615-
pmapreduce_commutative(T,plist->freduce(asyncmap(x->fmap(x...,args...;kwargs...),plist)),
648+
function pmapreduce_commutative_elementwise(fmap::Function,freduce::Function,iterable,args...;kwargs...)
649+
pmapreduce_commutative(plist->freduce(asyncmap(x->fmap(x...,args...;kwargs...),plist)),
616650
freduce,iterable,args...;kwargs...)
617651
end
618652

619-
function pmapreduce_commutative_elementwise(fmap::Function,freduce::Function,
620-
iterable,args...;kwargs...)
621-
622-
pmapreduce_commutative_elementwise(Any,fmap,freduce,iterable,args...;kwargs...)
623-
end
624-
625653
function pmapsum(fmap::Function,iterable,args...;kwargs...)
626654
pmapreduce_commutative(fmap,sum,iterable,args...;kwargs...)
627655
end
628656

629-
function pmapsum(::Type{T},fmap::Function,iterable,args...;kwargs...) where {T}
630-
pmapreduce_commutative(T,fmap,sum,iterable,args...;kwargs...)
631-
end
632-
633657
function pmapsum_elementwise(fmap::Function,iterable,args...;kwargs...)
634-
pmapsum_elementwise(Any,fmap,iterable,args...;kwargs...)
635-
end
636-
637-
function pmapsum_elementwise(::Type{T},fmap::Function,iterable,args...;kwargs...) where {T}
638-
pmapsum(T,plist->sum(asyncmap(x->fmap(x...,args...;kwargs...),plist)),iterable)
658+
pmapsum(plist->sum(asyncmap(x->fmap(x...,args...;kwargs...),plist)),iterable)
639659
end
640660

641661
# Store the processor id with the value
@@ -644,72 +664,97 @@ struct pval{T}
644664
parent :: T
645665
end
646666

647-
function pmapreduce(::Type{T},fmap::Function,freduce::Function,
648-
iterable::Tuple,args...;kwargs...) where {T}
667+
function pmapreduce(fmap::Function,freduce::Function,iterable::Tuple,args...;kwargs...)
649668

650669
procs_used = workersactive(iterable)
651670

652671
num_workers = length(procs_used);
653672
hostnames = gethostnames(procs_used);
654673
nodes = nodenames(hostnames);
655-
procid_rank0_on_node = [procs_used[findfirst(isequal(node),hostnames)] for node in nodes];
674+
procid_rank1_on_node = [procs_used[findfirst(isequal(node),hostnames)] for node in nodes];
656675

657676
nprocs_node_dict = nprocs_node(procs_used)
658-
node_channels = Dict(node=>RemoteChannel(()->Channel{T}(nprocs_node_dict[node]),procid_node)
659-
for (node,procid_node) in zip(nodes,procid_rank0_on_node))
677+
node_channels = Dict(
678+
node=>RemoteChannel(()->Channel{Any}(nprocs_node_dict[node]),procid_node)
679+
for (node,procid_node) in zip(nodes,procid_rank1_on_node))
660680

661-
# Worker at which final reduction takes place
662-
p_final = first(procid_rank0_on_node)
681+
# Worker at which the final reduction takes place
682+
p_final = first(procid_rank1_on_node)
663683

664-
reduce_channel = RemoteChannel(()->Channel{T}(length(procid_rank0_on_node)),p_final)
665-
result = nothing
684+
finalnode_reducechannel = RemoteChannel(()->Channel{Any}(length(procid_rank1_on_node)),p_final)
685+
686+
result_channel = RemoteChannel(()->Channel{Any}(1))
666687

667688
# Run the function on each processor and compute the sum at each node
668689
@sync for (rank,(p,node)) in enumerate(zip(procs_used,hostnames))
669690
@async begin
670691

671-
node_remotechannel = node_channels[node]
692+
eachnode_reducechannel = node_channels[node]
693+
672694
np_node = nprocs_node_dict[node]
673695

674696
iterable_on_proc = evenlyscatterproduct(iterable,num_workers,rank)
675-
@spawnat p put!(node_remotechannel,
676-
pval(p,fmap(iterable_on_proc,args...;kwargs...)))
677-
678-
@async if p in procid_rank0_on_node
679-
f = @spawnat p begin
680-
vals = [take!(node_remotechannel) for i=1:np_node ]
681-
sort!(vals,by=x->x.p)
682-
put!(reduce_channel,pval(p,freduce(v.parent for v in vals)) )
697+
@spawnat p begin
698+
try
699+
res = pval(p,fmap(iterable_on_proc,args...;kwargs...))
700+
put!(eachnode_reducechannel,res)
701+
catch e
702+
throwRemoteException(e)
703+
finally
704+
if p procid_rank1_on_node
705+
finalize(eachnode_reducechannel)
706+
end
707+
end
708+
end
709+
710+
@async if p in procid_rank1_on_node
711+
@spawnat p begin
712+
try
713+
vals = [take!(eachnode_reducechannel) for i=1:np_node]
714+
sort!(vals,by=x->x.p)
715+
res = pval(p,freduce(v.parent for v in vals))
716+
put!(finalnode_reducechannel,res)
717+
catch e
718+
throwRemoteException(e)
719+
finally
720+
finalize(eachnode_reducechannel)
721+
if p != p_final
722+
finalize(finalnode_reducechannel)
723+
end
724+
end
683725
end
684-
wait(f)
685-
@spawnat p finalize(node_remotechannel)
686726
end
687727

688-
@async if p==p_final
689-
result = @fetchfrom p_final begin
690-
vals = [take!(reduce_channel) for i=1:length(procid_rank0_on_node)]
691-
sort!(vals,by=x->x.p)
692-
freduce(v.parent for v in vals)
728+
@async if p == p_final
729+
@spawnat p begin
730+
try
731+
vals = [take!(finalnode_reducechannel) for i=1:length(procid_rank1_on_node)]
732+
sort!(vals,by=x->x.p)
733+
res = freduce(v.parent for v in vals)
734+
put!(result_channel,res)
735+
catch e
736+
throwRemoteException(e)
737+
finally
738+
finalize(finalnode_reducechannel)
739+
if p != result_channel.where
740+
finalize(result_channel)
741+
end
742+
end
693743
end
694-
@spawnat p finalize(reduce_channel)
695744
end
696745
end
697746
end
698747

699-
return result
700-
end
701-
702-
function pmapreduce(fmap::Function,freduce::Function,iterable::Tuple,args...;kwargs...)
703-
pmapreduce(Any,fmap,freduce,iterable,args...;kwargs...)
748+
take!(result_channel)
704749
end
705750

706751
function pmapreduce(fmap::Function,freduce::Function,
707752
itp::Iterators.ProductIterator,args...;kwargs...)
708-
pmapreduce(Any,fmap,freduce,itp.iterators,args...;kwargs...)
753+
pmapreduce(fmap,freduce,itp.iterators,args...;kwargs...)
709754
end
710755

711756
function pmapreduce(fmap::Function,freduce::Function,iterable,args...;kwargs...)
712-
pmapreduce(Any,fmap,freduce,(iterable,),args...;kwargs...)
757+
pmapreduce(fmap,freduce,(iterable,),args...;kwargs...)
713758
end
714759

715760
############################################################################################

0 commit comments

Comments
 (0)