Skip to content

Commit 1ed25be

Browse files
committed
Revert "Keep most 'client oriented' tasks in the same threadpool"
This reverts commit 90041ca.
1 parent f7ba3ca commit 1ed25be

File tree

4 files changed

+15
-15
lines changed

4 files changed

+15
-15
lines changed

src/cluster.jl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ function check_worker_state(w::Worker)
163163
else
164164
w.ct_time = time()
165165
if myid() > w.id
166-
t = Threads.@spawn Threads.threadpool() exec_conn_func(w)
166+
t = Threads.@spawn exec_conn_func(w)
167167
else
168168
# route request via node 1
169-
t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
169+
t = Threads.@spawn remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
170170
end
171171
errormonitor(t)
172172
wait_for_conn(w)
@@ -323,7 +323,7 @@ function read_worker_host_port(io::IO)
323323
leader = String[]
324324
try
325325
while ntries > 0
326-
readtask = Threads.@spawn Threads.threadpool() readline(io)
326+
readtask = Threads.@spawn readline(io)
327327
yield()
328328
while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
329329
sleep(0.05)
@@ -490,13 +490,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
490490
# call manager's `launch` is a separate task. This allows the master
491491
# process initiate the connection setup process as and when workers come
492492
# online
493-
t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy)
493+
t_launch = Threads.@spawn launch(manager, params, launched, launch_ntfy)
494494

495495
@sync begin
496496
while true
497497
if isempty(launched)
498498
istaskdone(t_launch) && break
499-
Threads.@spawn Threads.threadpool() begin
499+
Threads.@spawn begin
500500
sleep(1)
501501
notify(launch_ntfy)
502502
end
@@ -506,7 +506,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
506506
if !isempty(launched)
507507
wconfig = popfirst!(launched)
508508
let wconfig=wconfig
509-
Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q)
509+
Threads.@spawn setup_launched_worker(manager, wconfig, launched_q)
510510
end
511511
end
512512
end
@@ -586,7 +586,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
586586
wconfig.port = port
587587

588588
let wconfig=wconfig
589-
Threads.@spawn Threads.threadpool() begin
589+
Threads.@spawn begin
590590
pid = create_worker(manager, wconfig)
591591
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
592592
push!(launched_q, pid)
@@ -1044,13 +1044,13 @@ function rmprocs(pids...; waitfor=typemax(Int))
10441044

10451045
pids = vcat(pids...)
10461046
if waitfor == 0
1047-
t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int))
1047+
t = Threads.@spawn _rmprocs(pids, typemax(Int))
10481048
yield()
10491049
return t
10501050
else
10511051
_rmprocs(pids, waitfor)
10521052
# return a dummy task object that user code can wait on.
1053-
return Threads.@spawn Threads.threadpool() nothing
1053+
return Threads.@spawn nothing
10541054
end
10551055
end
10561056

@@ -1233,7 +1233,7 @@ function interrupt(pids::AbstractVector=workers())
12331233
@assert myid() == 1
12341234
@sync begin
12351235
for pid in pids
1236-
Threads.@spawn Threads.threadpool() interrupt(pid)
1236+
Threads.@spawn interrupt(pid)
12371237
end
12381238
end
12391239
end

src/macros.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex)
230230
# execute locally last as we do not want local execution to block serialization
231231
# of the request to remote nodes.
232232
for _ in 1:run_locally
233-
Threads.@spawn Threads.threadpool() Core.eval(m, ex)
233+
Threads.@spawn Core.eval(m, ex)
234234
end
235235
end
236236
nothing
@@ -275,7 +275,7 @@ function preduce(reducer, f, R)
275275
end
276276

277277
function pfor(f, R)
278-
t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
278+
t = Threads.@spawn @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
279279
@spawnat :any f(R, first(c), last(c))
280280
end
281281
errormonitor(t)

src/managers.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
178178
# Wait for all launches to complete.
179179
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
180180
let machine=machine, cnt=cnt
181-
Threads.@spawn Threads.threadpool() try
181+
Threads.@spawn try
182182
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
183183
catch e
184184
print(stderr, "exception launching on machine $(machine) : $(e)\n")
@@ -744,7 +744,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou
744744
# First, try sending `exit()` to the remote over the usual control channels
745745
remote_do(exit, pid)
746746

747-
timer_task = Threads.@spawn Threads.threadpool() begin
747+
timer_task = Threads.@spawn begin
748748
sleep(exit_timeout)
749749

750750
# Check to see if our child exited, and if not, send an actual kill signal

src/remotecall.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ function process_worker(rr)
322322
msg = (remoteref_id(rr), myid())
323323

324324
# Needs to acquire a lock on the del_msg queue
325-
T = Threads.@spawn Threads.threadpool() begin
325+
T = Threads.@spawn begin
326326
publish_del_msg!($w, $msg)
327327
end
328328
Base.errormonitor(T)

0 commit comments

Comments
 (0)