1+ import Base: launch, kill, manage, connect
2+ export MPIWindowIOManager, launch, kill, manage, connect, @cluster
3+
4+ """
5+ Stores the buffers needed for communication, in one instance per rank. Loop stops when the stop_condition is triggered
6+ """
7+ mutable struct MPIWindowIOManager <: ClusterManager
8+ comm:: MPI.Comm
9+ connection_windows:: Vector{WindowIO}
10+ stdio_windows:: Vector{WindowIO}
11+ workers_wait:: Bool
12+
13+ function MPIWindowIOManager (comm:: MPI.Comm , workers_wait:: Bool )
14+ nb_procs = MPI. Comm_size (comm)
15+ connection_windows = Vector {WindowIO} (nb_procs)
16+ stdio_windows = Vector {WindowIO} (nb_procs)
17+
18+ for i in 1 : nb_procs
19+ connection_windows[i] = WindowIO (comm)
20+ stdio_windows[i] = WindowIO (comm)
21+ end
22+
23+ # Make sure all windows are created before continuing
24+ MPI. Barrier (comm)
25+
26+ return new (comm, connection_windows, stdio_windows, workers_wait)
27+ end
28+ end
29+
30+ # Closes all local MPI Windows in a manager. Must be called collectively on all ranks
31+ function closeall (manager:: MPIWindowIOManager )
32+ for w in manager. connection_windows
33+ close (w)
34+ end
35+ for w in manager. stdio_windows
36+ close (w)
37+ end
38+ end
39+
40+ function launch (mgr:: MPIWindowIOManager , params:: Dict ,
41+ instances:: Array , cond:: Condition )
42+ try
43+ nprocs = MPI. Comm_size (mgr. comm)
44+ for cnt in 1 : (nprocs- 1 )
45+ push! (instances, WorkerConfig ())
46+ end
47+ notify (cond)
48+ catch e
49+ println (" Error in MPI launch $e " )
50+ rethrow (e)
51+ end
52+ end
53+
54+ function kill (mgr:: MPIWindowIOManager , pid:: Int , config:: WorkerConfig )
55+ @spawnat pid notify (_stop_requested)
56+ Distributed. set_worker_state (Distributed. Worker (pid), Distributed. W_TERMINATED)
57+ end
58+
59+ function manage (mgr:: MPIWindowIOManager , id:: Integer , config:: WorkerConfig , op:: Symbol ) end
60+
61+ function connect (mgr:: MPIWindowIOManager , pid:: Int , config:: WorkerConfig )
62+ myrank = MPI. Comm_rank (mgr. comm)
63+ if myrank == 0
64+ proc_stdio = mgr. stdio_windows[pid]
65+ @schedule while ! eof (proc_stdio)
66+ try
67+ println (" \t From worker $(pid) :\t $(readline (proc_stdio)) " )
68+ catch e
69+ end
70+ end
71+ end
72+ return (mgr. connection_windows[pid], WindowWriter (mgr. connection_windows[myrank+ 1 ], pid- 1 ))
73+ end
74+
75+ function redirect_to_mpi (s:: WindowWriter )
76+ (rd, wr) = redirect_stdout ()
77+ @schedule while ! eof (rd) && isopen (s. winio)
78+ av = readline (rd)
79+ if isopen (s. winio)
80+ println (s,av)
81+ flush (s)
82+ end
83+ end
84+ end
85+
86+ function checkworkers ()
87+ for w in workers ()
88+ if w != (@fetchfrom w myid ())
89+ error (" worker $w is not waiting" )
90+ end
91+ end
92+ end
93+
94+ function notify_workers ()
95+ for w in workers ()
96+ @spawnat (w, notify (_stop_requested))
97+ end
98+ end
99+
100+ function wait_for_events ()
101+ global _stop_requested
102+ wait (_stop_requested)
103+ end
104+
105+ """
106+ Initialize the current process as a Julia parallel worker. Must be called on all ranks.
107+ If comm is not supplied, MPI is initialized and MPI_COMM_WORLD is used.
108+ """
109+ function start_window_worker (comm:: Comm , workers_wait)
110+ rank = MPI. Comm_rank (comm)
111+ N = MPI. Comm_size (comm)
112+
113+ manager = MPIWindowIOManager (comm, workers_wait)
114+ cookie = string (comm)
115+ if length (cookie) > Base. Distributed. HDR_COOKIE_LEN
116+ cookie = cookie[1 : Base. Distributed. HDR_COOKIE_LEN]
117+ end
118+
119+ try
120+ if rank == 0
121+ Base. cluster_cookie (cookie)
122+ MPI. Barrier (comm)
123+ addprocs (manager)
124+ @assert nprocs () == N
125+ @assert nworkers () == (N == 1 ? 1 : N- 1 )
126+
127+ if ! workers_wait
128+ checkworkers ()
129+ notify_workers ()
130+ end
131+ else
132+ init_worker (cookie, manager)
133+ MPI. Barrier (comm)
134+ redirect_to_mpi (WindowWriter (manager. stdio_windows[rank+ 1 ], 0 ))
135+ for i in vcat ([1 ], (rank+ 2 ): N)
136+ # Receiving end of connections to all higher workers and master
137+ Base. process_messages (manager. connection_windows[i], WindowWriter (manager. connection_windows[rank+ 1 ], i- 1 ))
138+ end
139+
140+ global _stop_requested = Condition ()
141+ wait_for_events ()
142+ end
143+ catch e
144+ Base. display_error (STDERR," exception $e on rank $rank " ,backtrace ())
145+ end
146+
147+ if workers_wait && rank != 0
148+ closeall (manager)
149+ MPI. Finalize ()
150+ exit (0 )
151+ end
152+
153+ return manager
154+ end
155+
156+ """
157+ Stop the manager. This closes all windows and calls MPI.Finalize on all workers
158+ """
159+ function stop_main_loop (manager:: MPIWindowIOManager )
160+ if myid () != 1
161+ wait_for_events ()
162+ else
163+ checkworkers ()
164+ if nprocs () > 1
165+ rmprocs (workers ())
166+ end
167+ end
168+ closeall (manager)
169+ MPI. Finalize ()
170+ end
171+
172+ """
173+ Runs the given expression using the Julia parallel cluster. Useful when running with MPI_WINDOW_NOWAIT,
174+ since this will temporarily activate the worker event loops to listen for messages.
175+ """
176+ macro cluster (expr)
177+ quote
178+ if myid () != 1
179+ wait_for_events ()
180+ else
181+ $ (esc (expr))
182+ notify_workers ()
183+ end
184+ end
185+ end
0 commit comments