@@ -100,9 +100,9 @@ mutable struct Worker
100100 add_msgs:: Array{Any,1}
101101 @atomic gcflag:: Bool
102102 state:: WorkerState
103- c_state:: Condition # wait for state changes
104- ct_time:: Float64 # creation time
105- conn_func:: Any # used to setup connections lazily
103+ c_state:: Threads. Condition # wait for state changes, lock for state
104+ ct_time:: Float64 # creation time
105+ conn_func:: Any # used to setup connections lazily
106106
107107 r_stream:: IO
108108 w_stream:: IO
@@ -134,7 +134,7 @@ mutable struct Worker
134134 if haskey (map_pid_wrkr, id)
135135 return map_pid_wrkr[id]
136136 end
137- w= new (id, Threads. ReentrantLock (), [], [], false , W_CREATED, Condition (), time (), conn_func)
137+ w= new (id, Threads. ReentrantLock (), [], [], false , W_CREATED, Threads . Condition (), time (), conn_func)
138138 w. initialized = Event ()
139139 register_worker (w)
140140 w
@@ -144,12 +144,16 @@ mutable struct Worker
144144end
145145
146146function set_worker_state (w, state)
147- w. state = state
148- notify (w. c_state; all= true )
147+ lock (w. c_state) do
148+ w. state = state
149+ notify (w. c_state; all= true )
150+ end
149151end
150152
151153function check_worker_state (w:: Worker )
154+ lock (w. c_state)
152155 if w. state === W_CREATED
156+ unlock (w. c_state)
153157 if ! isclusterlazy ()
154158 if PGRP. topology === :all_to_all
155159 # Since higher pids connect with lower pids, the remote worker
@@ -169,6 +173,8 @@ function check_worker_state(w::Worker)
169173 errormonitor (t)
170174 wait_for_conn (w)
171175 end
176+ else
177+ unlock (w. c_state)
172178 end
173179end
174180
@@ -187,13 +193,25 @@ function exec_conn_func(w::Worker)
187193end
188194
189195function wait_for_conn (w)
196+ lock (w. c_state)
190197 if w. state === W_CREATED
198+ unlock (w. c_state)
191199 timeout = worker_timeout () - (time () - w. ct_time)
192200 timeout <= 0 && error (" peer $(w. id) has not connected to $(myid ()) " )
193201
194- @async (sleep (timeout); notify (w. c_state; all= true ))
195- wait (w. c_state)
196- w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
202+ T = Threads. @spawn begin
203+ sleep ($ timeout)
204+ lock (w. c_state) do
205+ notify (w. c_state; all= true )
206+ end
207+ end
208+ errormonitor (T)
209+ lock (w. c_state) do
210+ wait (w. c_state)
211+ w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
212+ end
213+ else
214+ unlock (w. c_state)
197215 end
198216 nothing
199217end
@@ -488,7 +506,10 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
488506 while true
489507 if isempty (launched)
490508 istaskdone (t_launch) && break
491- @async (sleep (1 ); notify (launch_ntfy))
509+ @async begin
510+ sleep (1 )
511+ notify (launch_ntfy)
512+ end
492513 wait (launch_ntfy)
493514 end
494515
@@ -641,7 +662,12 @@ function create_worker(manager, wconfig)
641662 # require the value of config.connect_at which is set only upon connection completion
642663 for jw in PGRP. workers
643664 if (jw. id != 1 ) && (jw. id < w. id)
644- (jw. state === W_CREATED) && wait (jw. c_state)
665+ # wait for wl to join
666+ lock (jw. c_state) do
667+ if jw. state === W_CREATED
668+ wait (jw. c_state)
669+ end
670+ end
645671 push! (join_list, jw)
646672 end
647673 end
@@ -664,7 +690,12 @@ function create_worker(manager, wconfig)
664690 end
665691
666692 for wl in wlist
667- (wl. state === W_CREATED) && wait (wl. c_state)
693+ lock (wl. c_state) do
694+ if wl. state === W_CREATED
695+ # wait for wl to join
696+ wait (wl. c_state)
697+ end
698+ end
668699 push! (join_list, wl)
669700 end
670701 end
@@ -681,7 +712,11 @@ function create_worker(manager, wconfig)
681712 @async manage (w. manager, w. id, w. config, :register )
682713 # wait for rr_ntfy_join with timeout
683714 timedout = false
684- @async (sleep ($ timeout); timedout = true ; put! (rr_ntfy_join, 1 ))
715+ @async begin
716+ sleep ($ timeout)
717+ timedout = true
718+ put! (rr_ntfy_join, 1 )
719+ end
685720 wait (rr_ntfy_join)
686721 if timedout
687722 error (" worker did not connect within $timeout seconds" )
0 commit comments