1- import Base: launch, manage, kill, procs, connect
1+ import Base: kill
22export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do
33export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL
4+ using Compat
5+ using Compat. Distributed
6+ import Compat. Sockets: connect, listenany, accept, getipaddr, IPv4
47
58
69
@@ -47,9 +50,9 @@ mutable struct MPIManager <: ClusterManager
4750
4851 # MPI_TRANSPORT_ALL
4952 comm:: MPI.Comm
50- initiate_shutdown:: Channel{Void }
51- sending_done:: Channel{Void }
52- receiving_done:: Channel{Void }
53+ initiate_shutdown:: Channel{Nothing }
54+ sending_done:: Channel{Nothing }
55+ receiving_done:: Channel{Nothing }
5356
5457 function MPIManager (; np:: Integer = Sys. CPU_CORES,
5558 mpirun_cmd:: Cmd = ` mpiexec -n $np ` ,
@@ -83,7 +86,7 @@ mutable struct MPIManager <: ClusterManager
8386 if mode != MPI_TRANSPORT_ALL
8487 # Start a listener for capturing stdout from the workers
8588 port, server = listenany (11000 )
86- @schedule begin
89+ @async begin
8790 while true
8891 sock = accept (server)
8992 push! (mgr. stdout_ios, sock)
@@ -98,12 +101,12 @@ mutable struct MPIManager <: ClusterManager
98101 end
99102
100103 if mode == MPI_TRANSPORT_ALL
101- mgr. initiate_shutdown = Channel {Void } (1 )
102- mgr. sending_done = Channel {Void } (np)
103- mgr. receiving_done = Channel {Void } (1 )
104+ mgr. initiate_shutdown = Channel {Nothing } (1 )
105+ mgr. sending_done = Channel {Nothing } (np)
106+ mgr. receiving_done = Channel {Nothing } (1 )
104107 global initiate_shutdown = mgr. initiate_shutdown
105108 end
106- mgr. initiate_shutdown = Channel {Void } (1 )
109+ mgr. initiate_shutdown = Channel {Nothing } (1 )
107110 global initiate_shutdown = mgr. initiate_shutdown
108111
109112 return mgr
119122# MPI_ON_WORKERS case
120123
121124# Launch a new worker, called from Base.addprocs
122- function launch (mgr:: MPIManager , params:: Dict ,
125+ function Distributed . launch (mgr:: MPIManager , params:: Dict ,
123126 instances:: Array , cond:: Condition )
124127 try
125128 if mgr. mode == MPI_ON_WORKERS
@@ -129,7 +132,7 @@ function launch(mgr::MPIManager, params::Dict,
129132 println (" Try again with a different instance of MPIManager." )
130133 throw (ErrorException (" Reuse of MPIManager is not allowed." ))
131134 end
132- cookie = string (" :cookie_" ,Base . cluster_cookie ())
135+ cookie = string (" :cookie_" ,Distributed . cluster_cookie ())
133136 setup_cmds = ` using MPI\; MPI.setup_worker'('$(getipaddr (). host) ,$(mgr. port) ,$cookie ')'`
134137 mpi_cmd = ` $(mgr. mpirun_cmd) $(params[:exename ]) -e $(Base. shell_escape (setup_cmds)) `
135138 open (detach (mpi_cmd))
@@ -156,7 +159,7 @@ function launch(mgr::MPIManager, params::Dict,
156159 config. io = io
157160 # Add config to the correct slot so that MPI ranks and
158161 # Julia pids are in the same order
159- rank = Base . deserialize (io)
162+ rank = Compat . Serialization . deserialize (io)
160163 idx = mgr. mode == MPI_ON_WORKERS ? rank+ 1 : rank
161164 configs[idx] = config
162165 end
@@ -192,7 +195,7 @@ function setup_worker(host, port, cookie)
192195
193196 # Send our MPI rank to the manager
194197 rank = MPI. Comm_rank (MPI. COMM_WORLD)
195- Base . serialize (io, rank)
198+ Compat . Serialization . serialize (io, rank)
196199
197200 # Hand over control to Base
198201 if cookie == nothing
@@ -206,7 +209,7 @@ function setup_worker(host, port, cookie)
206209end
207210
208211# Manage a worker (e.g. register / deregister it)
209- function manage (mgr:: MPIManager , id:: Integer , config:: WorkerConfig , op:: Symbol )
212+ function Distributed . manage (mgr:: MPIManager , id:: Integer , config:: WorkerConfig , op:: Symbol )
210213 if op == :register
211214 # Retrieve MPI rank from worker
212215 # TODO : Why is this necessary? The workers already sent their rank.
@@ -284,7 +287,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int)
284287 # quite expensive when there are many workers. Design something better.
285288 # For example, instead of maintaining two streams per worker, provide
286289 # only abstract functions to write to / read from these streams.
287- @schedule begin
290+ @async begin
288291 rr = MPI. Comm_rank (mgr. comm)
289292 reqs = MPI. Request[]
290293 while ! isready (mgr. initiate_shutdown)
@@ -334,7 +337,7 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
334337 # Send connection information to all workers
335338 # TODO : Use Bcast
336339 for j in 1 : size- 1
337- cookie = VERSION >= v " 0.5.0-dev+4047" ? Base . cluster_cookie () : nothing
340+ cookie = VERSION >= v " 0.5.0-dev+4047" ? Distributed . cluster_cookie () : nothing
338341 MPI. send ((getipaddr (). host, mgr. port, cookie), j, 0 , comm)
339342 end
340343 # Tell Base about the workers
@@ -360,9 +363,9 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
360363
361364 # Send the cookie over. Introduced in v"0.5.0-dev+4047". Irrelevant under MPI
362365 # transport, but need it to satisfy the changed protocol.
363- MPI. bcast (Base . cluster_cookie (), 0 , comm)
366+ MPI. bcast (Distributed . cluster_cookie (), 0 , comm)
364367 # Start event loop for the workers
365- @schedule receive_event_loop (mgr)
368+ @async receive_event_loop (mgr)
366369 # Tell Base about the workers
367370 addprocs (mgr)
368371 return mgr
@@ -493,7 +496,7 @@ macro mpi_do(mgr, expr)
493496end
494497
495498# All managed Julia processes
496- procs (mgr:: MPIManager ) = sort (keys (mgr. j2mpi))
499+ Distributed . procs (mgr:: MPIManager ) = sort (keys (mgr. j2mpi))
497500
498501# All managed MPI ranks
499502mpiprocs (mgr:: MPIManager ) = sort (keys (mgr. mpi2j))
0 commit comments