@@ -99,7 +99,7 @@ mutable struct Worker
9999 del_msgs:: Array{Any,1} # XXX : Could del_msgs and add_msgs be Channels?
100100 add_msgs:: Array{Any,1}
101101 @atomic gcflag:: Bool
102- state:: WorkerState
102+ @atomic state:: WorkerState
103103 c_state:: Threads.Condition # wait for state changes, lock for state
104104 ct_time:: Float64 # creation time
105105 conn_func:: Any # used to setup connections lazily
@@ -145,15 +145,13 @@ end
145145
146146function set_worker_state (w, state)
147147 lock (w. c_state) do
148- w. state = state
148+ @atomic w. state = state
149149 notify (w. c_state; all= true )
150150 end
151151end
152152
153153function check_worker_state (w:: Worker )
154- lock (w. c_state)
155154 if w. state === W_CREATED
156- unlock (w. c_state)
157155 if ! isclusterlazy ()
158156 if PGRP. topology === :all_to_all
159157 # Since higher pids connect with lower pids, the remote worker
@@ -173,9 +171,8 @@ function check_worker_state(w::Worker)
173171 errormonitor (t)
174172 wait_for_conn (w)
175173 end
176- else
177- unlock (w. c_state)
178174 end
175+ return nothing
179176end
180177
181178exec_conn_func (id:: Int ) = exec_conn_func (worker_from_id (id):: Worker )
@@ -193,9 +190,7 @@ function exec_conn_func(w::Worker)
193190end
194191
195192function wait_for_conn (w)
196- lock (w. c_state)
197193 if w. state === W_CREATED
198- unlock (w. c_state)
199194 timeout = worker_timeout () - (time () - w. ct_time)
200195 timeout <= 0 && error (" peer $(w. id) has not connected to $(myid ()) " )
201196
@@ -210,8 +205,6 @@ function wait_for_conn(w)
210205 wait (w. c_state)
211206 w. state === W_CREATED && error (" peer $(w. id) didn't connect to $(myid ()) within $timeout seconds" )
212207 end
213- else
214- unlock (w. c_state)
215208 end
216209 nothing
217210end
@@ -667,8 +660,8 @@ function create_worker(manager, wconfig)
667660 for jw in PGRP. workers
668661 if (jw. id != 1 ) && (jw. id < w. id)
669662 # wait for wl to join
670- lock ( jw. c_state) do
671- if jw. state === W_CREATED
663+ if jw. state === W_CREATED
664+ lock ( jw. c_state) do
672665 wait (jw. c_state)
673666 end
674667 end
0 commit comments