Skip to content

Commit 5a921c5

Browse files
committed
test commit
1 parent e7e30f7 commit 5a921c5

File tree

1 file changed

+11
-6
lines changed

1 file changed

+11
-6
lines changed

src/ParallelUtilities.jl

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -240,18 +240,23 @@ function pmapsum(f::Function,iterable,args...;kwargs...)
240240

241241
futures = pmap_onebatch_per_worker(f,iterable,args...;kwargs...)
242242

243-
# Final sum across all nodes
244-
# sum(fetch(f) for f in futures)
245-
@fetchfrom first(procs_used) @distributed (+) for f in futures
246-
fetch(f)
243+
# final sum to be run on the first worker
244+
function final_sum(futures)
245+
s = fetch(first(futures))
246+
@sync for f in futures[2:end]
247+
@async s += fetch(f)
248+
end
249+
return s
247250
end
248251

252+
@fetchfrom first(procs_used) final_sum(futures)
249253
end
250254

251-
function pmap_onebatch_per_worker(f::Function,iterable,args...;num_workers=nothing,kwargs...)
255+
function pmap_onebatch_per_worker(f::Function,iterable,args...;kwargs...)
252256

253257
procs_used = workers_active(iterable)
254-
if !isnothing(num_workers) && num_workers<=length(procs_used)
258+
num_workers = get(kwargs,:num_workers,length(procs_used))
259+
if num_workers<length(procs_used)
255260
procs_used = procs_used[1:num_workers]
256261
end
257262
num_workers = length(procs_used)

0 commit comments

Comments
 (0)