@@ -100,9 +100,9 @@ mutable struct Worker
100100 add_msgs:: Array{Any,1}
101101 @atomic gcflag:: Bool
102102 state:: WorkerState
103- c_state:: Condition # wait for state changes
104- ct_time:: Float64 # creation time
105- conn_func:: Any # used to setup connections lazily
103+ c_state:: Threads. Condition # wait for state changes, lock for state
104+ ct_time:: Float64 # creation time
105+ conn_func:: Any # used to setup connections lazily
106106
107107 r_stream:: IO
108108 w_stream:: IO
@@ -134,7 +134,7 @@ mutable struct Worker
134134 if haskey (map_pid_wrkr, id)
135135 return map_pid_wrkr[id]
136136 end
137- w= new (id, Threads. ReentrantLock (), [], [], false , W_CREATED, Condition (), time (), conn_func)
137+ w= new (id, Threads. ReentrantLock (), [], [], false , W_CREATED, Threads . Condition (), time (), conn_func)
138138 w. initialized = Event ()
139139 register_worker (w)
140140 w
@@ -144,12 +144,16 @@ mutable struct Worker
144144end
145145
146146function set_worker_state (w, state)
147- w. state = state
148- notify (w. c_state; all= true )
147+ lock (w. c_state) do
148+ w. state = state
149+ notify (w. c_state; all= true )
150+ end
149151end
150152
151153function check_worker_state (w:: Worker )
154+ lock (w. c_state)
152155 if w. state === W_CREATED
156+ unlock (w. c_state)
153157 if ! isclusterlazy ()
154158 if PGRP. topology === :all_to_all
155159 # Since higher pids connect with lower pids, the remote worker
@@ -169,6 +173,8 @@ function check_worker_state(w::Worker)
169173 errormonitor (t)
170174 wait_for_conn (w)
171175 end
176+ else
177+ unlock (w. c_state)
172178 end
173179end
174180
@@ -187,13 +193,25 @@ function exec_conn_func(w::Worker)
187193end
188194
189195function wait_for_conn (w)
196+ lock (w. c_state)
190197 if w. state === W_CREATED
198+ unlock (w. c_state)
191199 timeout = worker_timeout () - (time () - w. ct_time)
192200 timeout <= 0 && error (" peer $(w. id) has not connected to $(myid ()) " )
193201
194- @async (sleep (timeout); notify (w. c_state; all= true ))
195- wait (w. c_state)
196- w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
202+ T = Threads. @spawn begin
203+ sleep ($ timeout)
204+ lock (w. c_state) do
205+ notify (w. c_state; all= true )
206+ end
207+ end
208+ errormonitor (T)
209+ lock (w. c_state) do
210+ wait (w. c_state)
211+ w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
212+ end
213+ else
214+ unlock (w. c_state)
197215 end
198216 nothing
199217end
@@ -491,7 +509,10 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
491509 while true
492510 if isempty (launched)
493511 istaskdone (t_launch) && break
494- @async (sleep (1 ); notify (launch_ntfy))
512+ @async begin
513+ sleep (1 )
514+ notify (launch_ntfy)
515+ end
495516 wait (launch_ntfy)
496517 end
497518
@@ -645,7 +666,12 @@ function create_worker(manager, wconfig)
645666 # require the value of config.connect_at which is set only upon connection completion
646667 for jw in PGRP. workers
647668 if (jw. id != 1 ) && (jw. id < w. id)
648- (jw. state === W_CREATED) && wait (jw. c_state)
669+ # wait for wl to join
670+ lock (jw. c_state) do
671+ if jw. state === W_CREATED
672+ wait (jw. c_state)
673+ end
674+ end
649675 push! (join_list, jw)
650676 end
651677 end
@@ -668,7 +694,12 @@ function create_worker(manager, wconfig)
668694 end
669695
670696 for wl in wlist
671- (wl. state === W_CREATED) && wait (wl. c_state)
697+ lock (wl. c_state) do
698+ if wl. state === W_CREATED
699+ # wait for wl to join
700+ wait (wl. c_state)
701+ end
702+ end
672703 push! (join_list, wl)
673704 end
674705 end
0 commit comments