@@ -151,7 +151,7 @@ function set_worker_state(w, state)
151151end
152152
153153function check_worker_state (w:: Worker )
154- if w. state === W_CREATED
154+ if ( @atomic w. state) === W_CREATED
155155 if ! isclusterlazy ()
156156 if PGRP. topology === :all_to_all
157157 # Since higher pids connect with lower pids, the remote worker
@@ -190,7 +190,7 @@ function exec_conn_func(w::Worker)
190190end
191191
192192function wait_for_conn (w)
193- if w. state === W_CREATED
193+ if ( @atomic w. state) === W_CREATED
194194 timeout = worker_timeout () - (time () - w. ct_time)
195195 timeout <= 0 && error (" peer $(w. id) has not connected to $(myid ()) " )
196196
@@ -654,7 +654,7 @@ function create_worker(manager, wconfig)
654654 for jw in PGRP. workers
655655 if (jw. id != 1 ) && (jw. id < w. id)
656656 # wait for wl to join
657- if jw. state === W_CREATED
657+ if ( @atomic jw. state) === W_CREATED
658658 lock (jw. c_state) do
659659 wait (jw. c_state)
660660 end
@@ -682,7 +682,7 @@ function create_worker(manager, wconfig)
682682
683683 for wl in wlist
684684 lock (wl. c_state) do
685- if wl. state === W_CREATED
685+ if ( @atomic wl. state) === W_CREATED
686686 # wait for wl to join
687687 wait (wl. c_state)
688688 end
@@ -884,7 +884,7 @@ function nprocs()
884884 n = length (PGRP. workers)
885885 # filter out workers in the process of being setup/shutdown.
886886 for jw in PGRP. workers
887- if ! isa (jw, LocalProcess) && (jw. state != = W_CONNECTED)
887+ if ! isa (jw, LocalProcess) && (( @atomic jw. state) != = W_CONNECTED)
888888 n = n - 1
889889 end
890890 end
@@ -935,7 +935,7 @@ julia> procs()
935935function procs ()
936936 if myid () == 1 || (PGRP. topology === :all_to_all && ! isclusterlazy ())
937937 # filter out workers in the process of being setup/shutdown.
938- return Int[x. id for x in PGRP. workers if isa (x, LocalProcess) || (x. state === W_CONNECTED)]
938+ return Int[x. id for x in PGRP. workers if isa (x, LocalProcess) || (( @atomic x. state) === W_CONNECTED)]
939939 else
940940 return Int[x. id for x in PGRP. workers]
941941 end
944944function id_in_procs (id) # faster version of `id in procs()`
945945 if myid () == 1 || (PGRP. topology === :all_to_all && ! isclusterlazy ())
946946 for x in PGRP. workers
947- if (x. id:: Int ) == id && (isa (x, LocalProcess) || (x:: Worker ). state === W_CONNECTED)
947+ if (x. id:: Int ) == id && (isa (x, LocalProcess) || (@atomic ( x:: Worker ). state) === W_CONNECTED)
948948 return true
949949 end
950950 end
@@ -966,7 +966,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
966966"""
967967function procs (pid:: Integer )
968968 if myid () == 1
969- all_workers = [x for x in PGRP. workers if isa (x, LocalProcess) || (x. state === W_CONNECTED)]
969+ all_workers = [x for x in PGRP. workers if isa (x, LocalProcess) || (( @atomic x. state) === W_CONNECTED)]
970970 if (pid == 1 ) || (isa (map_pid_wrkr[pid]. manager, LocalManager))
971971 Int[x. id for x in filter (w -> (w. id== 1 ) || (isa (w. manager, LocalManager)), all_workers)]
972972 else
@@ -1073,11 +1073,11 @@ function _rmprocs(pids, waitfor)
10731073
10741074 start = time_ns ()
10751075 while (time_ns () - start) < waitfor* 1e9
1076- all (w -> w. state === W_TERMINATED, rmprocset) && break
1076+ all (w -> ( @atomic w. state) === W_TERMINATED, rmprocset) && break
10771077 sleep (min (0.1 , waitfor - (time_ns () - start)/ 1e9 ))
10781078 end
10791079
1080- unremoved = [wrkr. id for wrkr in filter (w -> w. state != = W_TERMINATED, rmprocset)]
1080+ unremoved = [wrkr. id for wrkr in filter (w -> ( @atomic w. state) != = W_TERMINATED, rmprocset)]
10811081 if length (unremoved) > 0
10821082 estr = string (" rmprocs: pids " , unremoved, " not terminated after " , waitfor, " seconds." )
10831083 throw (ErrorException (estr))
0 commit comments