Skip to content

Commit c398221

Browse files
authored
Add SegmentedOrderedBinaryTrees (#1)
* Implement SegmentedOrderedBinaryTree Asynchronous createbranchchannel for OrderedBinaryTree Remove type specialization in createbranchchannels * add tests * Asynchronous remote tests
1 parent 0c9626c commit c398221

File tree

7 files changed

+2752
-1957
lines changed

7 files changed

+2752
-1957
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "ParallelUtilities"
22
uuid = "fad6cfc8-4f83-11e9-06cc-151124046ad0"
33
authors = ["Jishnu Bhattacharya <[email protected]>"]
4-
version = "0.7.1"
4+
version = "0.7.2"
55

66
[deps]
77
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"

src/errors.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ struct ProcessorNumberError <: Exception
33
np :: Int
44
end
55
function Base.showerror(io::IO,err::ProcessorNumberError)
6-
print(io,"processor id $(err.p) does not line in the range $(1:err.np)")
6+
print(io,"processor id $(err.p) does not lie in the range $(1:err.np)")
77
end
88

99
struct DecreasingIteratorError <: Exception

src/mapreduce.jl

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,33 @@ function reducedvalue(freduce::Function,rank,
9090

9191
N = nchildren(pipe)
9292
leftchild = N > 0
93-
vals = Vector{Tred}(undef,N+1)
93+
selfvalpresent = rank > 0
94+
vals = Vector{Tred}(undef,N + selfvalpresent)
9495
@sync begin
9596
@async begin
96-
selfval = take!(pipe.selfchannels.out)::Tmap
97-
selfvalred = freduce((value(selfval),))
98-
ind = 1 + leftchild
99-
v = pval(rank,selfvalred)
100-
vals[ind] = v
97+
if selfvalpresent
98+
selfval = take!(pipe.selfchannels.out)::Tmap
99+
selfvalred = freduce((value(selfval),))
100+
pv = pval(rank,selfvalred)
101+
ind = selfvalpresent + leftchild
102+
vals[ind] = pv
103+
end
101104
end
102-
@async for i=2:N+1
103-
pv = take!(pipe.childrenchannels.out) :: Tred
104-
shift = pv.rank > rank ? 1 : -1
105-
ind = shift + leftchild + 1
106-
vals[ind] = pv
105+
@async begin
106+
if selfvalpresent
107+
for i=1:N
108+
pv = take!(pipe.childrenchannels.out) :: Tred
109+
shift = pv.rank > rank ? 1 : -1
110+
ind = shift + leftchild + 1
111+
vals[ind] = pv
112+
end
113+
else
114+
for i=1:N
115+
pv = take!(pipe.childrenchannels.out) :: Tred
116+
vals[i] = pv
117+
end
118+
sort!(vals,by=pv->pv.rank)
119+
end
107120
end
108121
end
109122

@@ -125,7 +138,7 @@ function reduceTreeNode(freduce::Function,rank,pipe::BranchChannel{Tmap,Tred},
125138
anyerr = take!(pipe.selfchannels.err)
126139
else
127140
anyerr = false
128-
end
141+
end
129142
anyerr = anyerr ||
130143
any(take!(pipe.childrenchannels.err) for i=1:nchildren(pipe))
131144

@@ -542,7 +555,7 @@ function pmapreduce(fmap::Function,Tmap::Type,freduce::Function,Tred::Type,
542555
iterators::Tuple,args...;kwargs...)
543556

544557
tree,branches = createbranchchannels(pval{Tmap},pval{Tred},
545-
iterators,OrderedBinaryTree)
558+
iterators, SegmentedOrderedBinaryTree)
546559
pmapreduceworkers(fmap,freduce,iterators,tree,
547560
branches,Sorted(),args...;kwargs...)
548561
end

0 commit comments

Comments
 (0)