@@ -3,7 +3,7 @@ export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do
33export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL
44using Compat
55using Compat. Distributed
6- import Compat. Sockets: connect, listenany, accept, getipaddr, IPv4
6+ import Compat. Sockets: connect, listenany, accept, IPv4, getsockname
77
88
99
@@ -42,6 +42,7 @@ mutable struct MPIManager <: ClusterManager
4242
4343 # TCP Transport
4444 port:: UInt16
45+ ip:: UInt32
4546 stdout_ios:: Array
4647
4748 # MPI transport
@@ -54,7 +55,7 @@ mutable struct MPIManager <: ClusterManager
5455 sending_done:: Channel{Nothing}
5556 receiving_done:: Channel{Nothing}
5657
57- function MPIManager (; np:: Integer = Sys. CPU_CORES ,
58+ function MPIManager (; np:: Integer = Sys. CPU_THREADS ,
5859 mpirun_cmd:: Cmd = ` mpiexec -n $np ` ,
5960 launch_timeout:: Real = 60.0 ,
6061 mode:: TransportMode = MPI_ON_WORKERS)
@@ -86,13 +87,15 @@ mutable struct MPIManager <: ClusterManager
8687 if mode != MPI_TRANSPORT_ALL
8788 # Start a listener for capturing stdout from the workers
8889 port, server = listenany (11000 )
90+ ip = getsockname (server)[1 ]. host
8991 @async begin
9092 while true
9193 sock = accept (server)
9294 push! (mgr. stdout_ios, sock)
9395 end
9496 end
9597 mgr. port = port
98+ mgr. ip = ip
9699 mgr. stdout_ios = IO[]
97100 else
98101 mgr. rank2streams = Dict {Int,Tuple{IO,IO}} ()
@@ -133,7 +136,7 @@ function Distributed.launch(mgr::MPIManager, params::Dict,
133136 throw (ErrorException (" Reuse of MPIManager is not allowed." ))
134137 end
135138 cookie = string (" :cookie_" ,Distributed. cluster_cookie ())
136- setup_cmds = ` using MPI\; MPI.setup_worker'('$(getipaddr () . host ) ,$(mgr. port) ,$cookie ')'`
139+ setup_cmds = ` using MPI\; MPI.setup_worker'('$(mgr . ip ) ,$(mgr. port) ,$cookie ')'`
137140 mpi_cmd = ` $(mgr. mpirun_cmd) $(params[:exename ]) -e $(Base. shell_escape (setup_cmds)) `
138141 open (detach (mpi_cmd))
139142 mgr. launched = true
@@ -151,7 +154,7 @@ function Distributed.launch(mgr::MPIManager, params::Dict,
151154 end
152155
153156 # Traverse all worker I/O streams and receive their MPI rank
154- configs = Array {WorkerConfig} (mgr. np)
157+ configs = Array {WorkerConfig} (undef, mgr. np)
155158 @sync begin
156159 for io in mgr. stdout_ios
157160 @async let io= io
@@ -199,12 +202,12 @@ function setup_worker(host, port, cookie)
199202
200203 # Hand over control to Base
201204 if cookie == nothing
202- Base . start_worker (io)
205+ Distributed . start_worker (io)
203206 else
204207 if isa (cookie, Symbol)
205208 cookie = string (cookie)[8 : end ] # strip the leading "cookie_"
206209 end
207- Base . start_worker (io, cookie)
210+ Distributed . start_worker (io, cookie)
208211 end
209212end
210213
279282# case
280283function start_send_event_loop (mgr:: MPIManager , rank:: Int )
281284 try
282- r_s = BufferStream ()
283- w_s = BufferStream ()
285+ r_s = Base . BufferStream ()
286+ w_s = Base . BufferStream ()
284287 mgr. rank2streams[rank] = (r_s, w_s)
285288
286289 # TODO : There is one task per communication partner -- this can be
@@ -292,7 +295,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int)
292295 reqs = MPI. Request[]
293296 while ! isready (mgr. initiate_shutdown)
294297 # When data are available, send them
295- while nb_available (w_s) > 0
298+ while bytesavailable (w_s) > 0
296299 data = take! (w_s. buffer)
297300 push! (reqs, MPI. Isend (data, rank, 0 , mgr. comm))
298301 end
@@ -307,7 +310,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int)
307310 end
308311 (r_s, w_s)
309312 catch e
310- Base. show_backtrace (STDOUT , catch_backtrace ())
313+ Base. show_backtrace (stdout , catch_backtrace ())
311314 println (e)
312315 rethrow (e)
313316 end
@@ -334,11 +337,15 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
334337 # Create manager object
335338 mgr = MPIManager (np= size- 1 , mode= mode)
336339 mgr. comm = comm
340+ # Needed because of Julia commit https://github.com/JuliaLang/julia/commit/299300a409c35153a1fa235a05c3929726716600
341+ if isdefined (Distributed, :init_multi )
342+ Distributed. init_multi ()
343+ end
337344 # Send connection information to all workers
338345 # TODO : Use Bcast
339346 for j in 1 : size- 1
340347 cookie = VERSION >= v " 0.5.0-dev+4047" ? Distributed. cluster_cookie () : nothing
341- MPI. send ((getipaddr () . host , mgr. port, cookie), j, 0 , comm)
348+ MPI. send ((mgr . ip , mgr. port, cookie), j, 0 , comm)
342349 end
343350 # Tell Base about the workers
344351 addprocs (mgr)
@@ -363,6 +370,9 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
363370
364371 # Send the cookie over. Introduced in v"0.5.0-dev+4047". Irrelevant under MPI
365372 # transport, but need it to satisfy the changed protocol.
373+ if isdefined (Distributed, :init_multi )
374+ Distributed. init_multi ()
375+ end
366376 MPI. bcast (Distributed. cluster_cookie (), 0 , comm)
367377 # Start event loop for the workers
368378 @async receive_event_loop (mgr)
@@ -376,7 +386,7 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
376386 mgr. comm = comm
377387 # Recv the cookie
378388 cookie = MPI. bcast (nothing , 0 , comm)
379- Base . init_worker (cookie, mgr)
389+ Distributed . init_worker (cookie, mgr)
380390 # Start a worker event loop
381391 receive_event_loop (mgr)
382392 MPI. Finalize ()
@@ -394,7 +404,7 @@ function receive_event_loop(mgr::MPIManager)
394404 (hasdata, stat) = MPI. Iprobe (MPI. ANY_SOURCE, 0 , mgr. comm)
395405 if hasdata
396406 count = Get_count (stat, UInt8)
397- buf = Array {UInt8} (count)
407+ buf = Array {UInt8} (undef, count)
398408 from_rank = Get_source (stat)
399409 MPI. Recv! (buf, from_rank, 0 , mgr. comm)
400410
@@ -403,7 +413,7 @@ function receive_event_loop(mgr::MPIManager)
403413 # This is the first time we communicate with this rank.
404414 # Set up a new connection.
405415 (r_s, w_s) = start_send_event_loop (mgr, from_rank)
406- Base . process_messages (r_s, w_s)
416+ Distributed . process_messages (r_s, w_s)
407417 num_send_loops += 1
408418 else
409419 (r_s, w_s) = streams
459469function mpi_do (mgr:: MPIManager , expr)
460470 ! mgr. initialized && wait (mgr. cond_initialized)
461471 jpids = keys (mgr. j2mpi)
462- refs = Array {Any} (length (jpids))
472+ refs = Array {Any} (undef, length (jpids))
463473 for (i,p) in enumerate (Iterators. filter (x -> x != myid (), jpids))
464474 refs[i] = remotecall (expr, p)
465475 end
490500macro mpi_do (mgr, expr)
491501 quote
492502 # Evaluate expression in Main module
493- thunk = () -> (eval (Main, $ (Expr (:quote , expr))); nothing )
503+ thunk = () -> (Core . eval (Main, $ (Expr (:quote , expr))); nothing )
494504 mpi_do ($ (esc (mgr)), thunk)
495505 end
496506end
0 commit comments