|
1 | 1 | module ParallelUtilities
|
2 | 2 |
|
3 |
| -using Reexport |
| 3 | +using Reexport,TimerOutputs |
4 | 4 | @reexport using Distributed
|
5 | 5 |
|
6 | 6 | export split_across_processors,split_product_across_processors,
|
7 | 7 | get_processor_id_from_split_array,procid_allmodes,mode_index_in_file,
|
8 | 8 | get_processor_range_from_split_array,workers_active,nworkers_active,worker_rank,
|
9 | 9 | get_index_in_split_array,procid_and_mode_index,extrema_from_split_array,
|
10 |
| -pmapsum,sum_at_node,pmap_onebatch_per_worker,moderanges_common_lastarray, |
| 10 | +pmapsum,pmapsum_timed,sum_at_node,pmap_onebatch_per_worker,moderanges_common_lastarray, |
11 | 11 | get_nodes,get_hostnames,get_nprocs_node
|
12 | 12 |
|
13 | 13 | function worker_rank()
|
@@ -250,10 +250,32 @@ function pmapsum(f::Function,iterable,args...;kwargs...)
|
250 | 250 | end
|
251 | 251 | return s
|
252 | 252 | end
|
253 |
| - |
254 | 253 | @fetchfrom first(procs_used) final_sum(futures)
|
255 | 254 | end
|
256 | 255 |
|
| 256 | +function pmapsum_timed(f::Function,iterable,args...;kwargs...) |
| 257 | + |
| 258 | + procs_used = workers_active(iterable) |
| 259 | + |
| 260 | + futures = pmap_onebatch_per_worker(f,iterable,args...;kwargs...) |
| 261 | + |
| 262 | + timer = TimerOutput() |
| 263 | + function final_sum(futures,timer) |
| 264 | + @timeit timer "fetch" s = fetch(first(futures)) |
| 265 | + @sync for f in futures[2:end] |
| 266 | + @async begin |
| 267 | + @timeit timer "fetch" t_i = fetch(f) |
| 268 | + s += t_i |
| 269 | + end |
| 270 | + end |
| 271 | + return s,timer |
| 272 | + end |
| 273 | + |
| 274 | + s,timer = @fetchfrom first(procs_used) final_sum(futures,timer) |
| 275 | + println(timer) |
| 276 | + return s |
| 277 | +end |
| 278 | + |
257 | 279 | function pmap_onebatch_per_worker(f::Function,iterable,args...;kwargs...)
|
258 | 280 |
|
259 | 281 | procs_used = workers_active(iterable)
|
|
0 commit comments