Skip to content

Commit fbbc7ec

Browse files
committed
Merge pull request #105 from JuliaParallel/eschnett/cman4
Restructure MPI cluster manager
2 parents eae5e9e + 20cee8d commit fbbc7ec

File tree

7 files changed

+509
-257
lines changed

7 files changed

+509
-257
lines changed

examples/05-juliacman.jl

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,36 @@
1+
# Note: Run this script without using `mpirun`
2+
13
using MPI
24

3-
manager=MPIManager(np=4)
5+
manager = MPIManager(np=4)
46
addprocs(manager)
57

68
println("Added procs $(procs())")
79

810
println("Running 01-hello as part of a Julia cluster")
9-
@mpi_do manager (include("01-hello-impl.jl"); do_hello())
11+
@mpi_do manager (include("01-hello-impl.jl"); do_hello())
1012

11-
#Interspersed julia parallel call
13+
# Interspersed julia parallel call
1214
nheads = @parallel (+) for i=1:10^8
1315
Int(rand(Bool))
1416
end
1517
println("@parallel nheads $nheads")
1618

1719
println("Running 02-broadcast as part of a Julia cluster")
18-
@mpi_do manager (include("02-broadcast-impl.jl"); do_broadcast())
20+
@mpi_do manager (include("02-broadcast-impl.jl"); do_broadcast())
1921

2022
M = [rand(10,10) for i=1:10]
2123
pmap(svd, M)
2224
println("pmap successful")
2325

2426
println("Running 03-reduce as part of a Julia cluster")
25-
@mpi_do manager (include("03-reduce-impl.jl"); do_reduce())
27+
@mpi_do manager (include("03-reduce-impl.jl"); do_reduce())
2628

2729
pids = [remotecall_fetch(myid, p) for p in workers()]
2830
println("julia pids $pids")
2931

3032
println("Running 04-sendrecv as part of a Julia cluster")
31-
@mpi_do manager (include("04-sendrecv-impl.jl"); do_sendrecv())
33+
@mpi_do manager (include("04-sendrecv-impl.jl"); do_sendrecv())
3234

3335
println("Exiting")
3436
exit()
35-

examples/06-cman-transport.jl

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,59 @@
11
using MPI
22

3-
comm, comm_size, rank = MPI.init_mpi()
3+
MPI.Init()
4+
rank = MPI.Comm_rank(MPI.COMM_WORLD)
5+
size = MPI.Comm_size(MPI.COMM_WORLD)
46

57
include("01-hello-impl.jl")
68
include("02-broadcast-impl.jl")
79
include("03-reduce-impl.jl")
810
include("04-sendrecv-impl.jl")
911

1012
if length(ARGS) == 0
11-
print("Please specify a transport option to use [MPI|TCP]\n")
12-
exit()
13+
println("Please specify a transport option to use [MPI|TCP]")
14+
MPI.Finalize()
15+
exit(1)
1316
elseif ARGS[1] == "TCP"
14-
manager = MPI.start(TCP_TRANSPORT_ALL) # does not return on worker
17+
manager = MPI.start_main_loop(TCP_TRANSPORT_ALL) # does not return on worker
1518
elseif ARGS[1] == "MPI"
16-
manager = MPI.start(MPI_TRANSPORT_ALL) # does not return on worker
19+
manager = MPI.start_main_loop(MPI_TRANSPORT_ALL) # does not return on worker
1720
else
18-
print("Valid transport options are [MPI|TCP]\n")
19-
exit()
21+
println("Valid transport options are [MPI|TCP]")
22+
MPI.Finalize()
23+
exit(1)
2024
end
2125

22-
if rank == 0
23-
nloops = 10^2
24-
function foo(n)
25-
a=ones(n)
26-
remotecall_fetch(x->x, 2, a);
26+
# Check whether a worker accidentally returned
27+
@assert rank == 0
2728

28-
@elapsed for i in 1:nloops
29-
remotecall_fetch(x->x, 2, a)
30-
end
31-
end
32-
33-
n=10^3
34-
foo(1)
35-
t=foo(n)
36-
println("$t seconds for $nloops loops of send-recv of array size $n")
37-
38-
n=10^6
39-
foo(1)
40-
t=foo(n)
41-
println("$t seconds for $nloops loops of send-recv of array size $n")
42-
43-
44-
print("EXAMPLE: HELLO\n")
45-
@mpi_do manager do_hello()
46-
print("EXAMPLE: BROADCAST\n")
47-
@mpi_do manager do_broadcast()
48-
print("EXAMPLE: REDUCE\n")
49-
@mpi_do manager do_reduce()
50-
print("EXAMPLE: SENDRECV\n")
51-
@mpi_do manager do_sendrecv()
52-
53-
# Abscence of a MPI Finalize causes the cluster to hang - don't yet know why
54-
if ARGS[1] == "TCP"
55-
@mpi_do manager MPI.Finalize()
56-
elseif ARGS[1] == "MPI"
57-
@everywhere (MPI.Finalize(); exit())
29+
nloops = 10^2
30+
function foo(n)
31+
a=ones(n)
32+
remotecall_fetch(x->x, mod1(2, size), a);
33+
@elapsed for i in 1:nloops
34+
remotecall_fetch(x->x, mod1(2, size), a)
5835
end
5936
end
37+
38+
n=10^3
39+
foo(1)
40+
t=foo(n)
41+
println("$t seconds for $nloops loops of send-recv of array size $n")
42+
43+
n=10^6
44+
foo(1)
45+
t=foo(n)
46+
println("$t seconds for $nloops loops of send-recv of array size $n")
47+
48+
# We cannot run these examples since they use MPI.Barrier and other blocking
49+
# communication, disabling our event loop
50+
# print("EXAMPLE: HELLO\n")
51+
# @mpi_do manager do_hello()
52+
# print("EXAMPLE: BROADCAST\n")
53+
# @mpi_do manager do_broadcast()
54+
# print("EXAMPLE: REDUCE\n")
55+
# @mpi_do manager do_reduce()
56+
# print("EXAMPLE: SENDRECV\n")
57+
# @mpi_do manager do_sendrecv()
58+
59+
MPI.stop_main_loop(manager)

0 commit comments

Comments
 (0)