@@ -2,7 +2,7 @@ module ParallelOperations
22
33using Distributed
44
5- import Base: reduce, sum, maximum, minimum
5+ import Base: reduce
66
77export
88 sendto, @sendto,
@@ -17,19 +17,24 @@ export
1717 allgather,
1818 allreduce,
1919
20- sum, allsum,
21- maximum, allmaximum,
22- minimum, allminimum
20+ allsum,
21+ allmaximum,
22+ allminimum
2323
2424# point-to-point
2525
2626function sendto(p:: Int , expr, data, mod:: Module = Main)
27- @async @spawnat(p, Core. eval(mod, Expr(:(= ), expr, data)))
27+ remotecall_fetch(p) do
28+ Core. eval(mod, Expr(:(= ), expr, data))
29+ end
2830end
2931
3032function sendto(p:: Int , mod:: Module = Main; args... )
31- data = Dict(nm => val for (nm, val) in args)
32- @async @spawnat(p, Core. eval(mod, Expr(:(= ), :parallel_data, data)))
33+ for (nm, val) in args
34+ remotecall_fetch(p) do
35+ Core. eval(mod, Expr(:(= ), nm, val))
36+ end
37+ end
3338end
3439
3540function sendto(p:: Int , f:: Function , expr, mod:: Module = Main; args = ())
@@ -61,7 +66,9 @@ macro sendto(p, expr, mod::Symbol = :Main)
6166end
6267
6368function getfrom(p:: Int , expr, mod:: Module = Main)
64- return fetch(@spawnat(p, Core. eval(mod, expr)))
69+ return remotecall_fetch(p) do
70+ Core. eval(mod, expr)
71+ end
6572end
6673
6774macro getfrom(p, obj, mod:: Symbol = :Main)
9299
93100function bcast(pids:: Array , f:: Function , expr, mod:: Module = Main; args... )
94101 @sync for p in pids
95- @async sendto(p, f, expr, mod; args... )
102+ sendto(p, f, expr, mod; args... )
96103 end
97104end
98105
99106function bcast(pids:: Array , f:: Function , mod:: Module = Main; args... )
100107 @sync for p in pids
101- @async sendto(p, f, mod; args... )
108+ sendto(p, f, mod; args... )
102109 end
103110end
104111
@@ -166,13 +173,22 @@ function allreduce(f::Function, pids::Array, src_expr, target_expr = src_expr, m
166173end
167174
168175# Commonly used functions
169- sum(pids:: Array , expr:: Union{Symbol, Expr} , mod:: Module = Main) = sum(gather(pids, expr, mod))
170- allsum(pids:: Array , src_expr:: Union{Symbol, Expr} , target_expr = src_expr, mod:: Module = Main) = bcast(pids, target_expr, sum(pids, src_expr, mod), mod)
176+ function _allsum(pids:: Array , expr:: Union{Symbol, Expr} , mod:: Module = Main)
177+ return Base. sum(gather(pids, expr, mod))
178+ end
171179
172- maximum(pids:: Array , expr, mod:: Module = Main) = maximum(gather(pids, expr, mod))
173- allmaximum(pids:: Array , src_expr, target_expr = src_expr, mod:: Module = Main) = bcast(pids, target_expr, maximum(pids, src_expr, mod), mod)
180+ allsum(pids:: Array , src_expr:: Union{Symbol, Expr} , target_expr = src_expr, mod:: Module = Main) = bcast(pids, target_expr, _allsum(pids, src_expr, mod), mod)
181+
182+ function _allmaximum(pids:: Array , expr, mod:: Module = Main)
183+ return Base. maximum(gather(pids, expr, mod))
184+ end
185+
186+ allmaximum(pids:: Array , src_expr, target_expr = src_expr, mod:: Module = Main) = bcast(pids, target_expr, _allmaximum(pids, src_expr, mod), mod)
187+
188+ function _allminimum(pids:: Array , expr, mod:: Module = Main)
189+ return Base. minimum(gather(pids, expr, mod))
190+ end
174191
175- minimum(pids:: Array , expr, mod:: Module = Main) = minimum(gather(pids, expr, mod))
176- allminimum(pids:: Array , src_expr, target_expr = src_expr, mod:: Module = Main) = bcast(pids, target_expr, minimum(pids, src_expr, mod), mod)
192+ allminimum(pids:: Array , src_expr, target_expr = src_expr, mod:: Module = Main) = bcast(pids, target_expr, _allminimum(pids, src_expr, mod), mod)
177193
178194end
0 commit comments