1+ import Base: launch, kill, manage, connect
2+ export MPIWindowIOManager, launch, kill, manage, connect, @cluster
3+
4+ """
5+ Stores the buffers needed for communication, in one instance per rank. Loop stops when the stop_condition is triggered
6+ """
7+ mutable struct MPIWindowIOManager <: ClusterManager
8+ comm:: MPI.Comm
9+ connection_windows:: Vector{WindowIO}
10+ stdio_windows:: Vector{WindowIO}
11+ workers_wait:: Bool
12+
13+ function MPIWindowIOManager (comm:: MPI.Comm , workers_wait:: Bool )
14+ nb_procs = MPI. Comm_size (comm)
15+ connection_windows = Vector {WindowIO} (nb_procs)
16+ stdio_windows = Vector {WindowIO} (nb_procs)
17+
18+ for i in 1 : nb_procs
19+ connection_windows[i] = WindowIO (comm)
20+ stdio_windows[i] = WindowIO (comm)
21+ end
22+
23+ # Make sure all windows are created before continuing
24+ MPI. Barrier (comm)
25+
26+ return new (comm, connection_windows, stdio_windows, workers_wait)
27+ end
28+ end
29+
30+ # Closes all local MPI Windows in a manager. Must be called collectively on all ranks
31+ function closeall (manager:: MPIWindowIOManager )
32+ for w in manager. connection_windows
33+ close (w)
34+ end
35+ for w in manager. stdio_windows
36+ close (w)
37+ end
38+ end
39+
40+ function launch (mgr:: MPIWindowIOManager , params:: Dict ,
41+ instances:: Array , cond:: Condition )
42+ try
43+ nprocs = MPI. Comm_size (mgr. comm)
44+ for cnt in 1 : (nprocs- 1 )
45+ push! (instances, WorkerConfig ())
46+ end
47+ notify (cond)
48+ catch e
49+ println (" Error in MPI launch $e " )
50+ rethrow (e)
51+ end
52+ end
53+
54+ function kill (mgr:: MPIWindowIOManager , pid:: Int , config:: WorkerConfig )
55+ @spawnat pid notify (_stop_requested)
56+ Distributed. set_worker_state (Distributed. Worker (pid), Distributed. W_TERMINATED)
57+ end
58+
59+ function manage (mgr:: MPIWindowIOManager , id:: Integer , config:: WorkerConfig , op:: Symbol ) end
60+
61+ function connect (mgr:: MPIWindowIOManager , pid:: Int , config:: WorkerConfig )
62+ myrank = MPI. Comm_rank (mgr. comm)
63+ if myrank == 0
64+ proc_stdio = mgr. stdio_windows[pid]
65+ @schedule while ! eof (proc_stdio)
66+ try
67+ println (" \t From worker $(pid) :\t $(readline (proc_stdio)) " )
68+ catch e
69+ end
70+ end
71+ end
72+ return (mgr. connection_windows[pid], WindowWriter (mgr. connection_windows[myrank+ 1 ], pid- 1 ))
73+ end
74+
75+ function redirect_to_mpi (s:: WindowWriter )
76+ (rd, wr) = redirect_stdout ()
77+ @schedule while ! eof (rd) && isopen (s. winio)
78+ av = readline (rd)
79+ if isopen (s. winio)
80+ println (s,av)
81+ flush (s)
82+ end
83+ end
84+ end
85+
86+ function checkworkers ()
87+ for w in workers ()
88+ if w != (@fetchfrom w myid ())
89+ error (" worker $w is not waiting" )
90+ end
91+ end
92+ end
93+
94+ function notify_workers ()
95+ for w in workers ()
96+ @spawnat (w, notify (_stop_requested))
97+ end
98+ end
99+
100+ function wait_for_events ()
101+ global _stop_requested
102+ wait (_stop_requested)
103+ end
104+
105+ """
106+ Initialize the current process as a Julia parallel worker. Must be called on all ranks.
107+ If comm is not supplied, MPI is initialized and MPI_COMM_WORLD is used.
108+ """
109+ function start_window_worker (comm:: Comm , workers_wait)
110+ rank = MPI. Comm_rank (comm)
111+ N = MPI. Comm_size (comm)
112+
113+ manager = MPIWindowIOManager (comm, workers_wait)
114+ cookie = string (comm)[1 : Base. Distributed. HDR_COOKIE_LEN]
115+
116+ try
117+ if rank == 0
118+ Base. cluster_cookie (cookie)
119+ MPI. Barrier (comm)
120+ addprocs (manager)
121+ @assert nprocs () == N
122+ @assert nworkers () == (N == 1 ? 1 : N- 1 )
123+
124+ if ! workers_wait
125+ checkworkers ()
126+ notify_workers ()
127+ end
128+ else
129+ init_worker (cookie, manager)
130+ MPI. Barrier (comm)
131+ redirect_to_mpi (WindowWriter (manager. stdio_windows[rank+ 1 ], 0 ))
132+ for i in vcat ([1 ], (rank+ 2 ): N)
133+ # Receiving end of connections to all higher workers and master
134+ Base. process_messages (manager. connection_windows[i], WindowWriter (manager. connection_windows[rank+ 1 ], i- 1 ))
135+ end
136+
137+ global _stop_requested = Condition ()
138+ wait_for_events ()
139+ end
140+ catch e
141+ Base. display_error (STDERR," exception $e on rank $rank " ,backtrace ())
142+ end
143+
144+ if workers_wait && rank != 0
145+ closeall (manager)
146+ MPI. Finalize ()
147+ exit (0 )
148+ end
149+
150+ return manager
151+ end
152+
153+ """
154+ Stop the manager. This closes all windows and calls MPI.Finalize on all workers
155+ """
156+ function stop_main_loop (manager:: MPIWindowIOManager )
157+ if myid () != 1
158+ wait_for_events ()
159+ else
160+ checkworkers ()
161+ if nprocs () > 1
162+ rmprocs (workers ())
163+ end
164+ end
165+ closeall (manager)
166+ MPI. Finalize ()
167+ end
168+
169+ """
170+ Runs the given expression using the Julia parallel cluster. Useful when running with MPI_WINDOW_NOWAIT,
171+ since this will temporarily activate the worker event loops to listen for messages.
172+ """
173+ macro cluster (expr)
174+ quote
175+ if myid () != 1
176+ wait_for_events ()
177+ else
178+ $ (esc (expr))
179+ notify_workers ()
180+ end
181+ end
182+ end
0 commit comments