Skip to content

Commit d6a97fa

Browse files
IanButterworthJamesWrigley
authored andcommitted
Add CachingPool method for remotecall
1 parent e4bd504 commit d6a97fa

File tree

2 files changed

+41
-10
lines changed

2 files changed

+41
-10
lines changed

src/workerpool.jl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,3 +400,28 @@ function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...)
400400
put!(pool, worker)
401401
end
402402
end
403+
404+
# Specialization for remotecall. We have to wait for the Future it returns
405+
# before putting the worker back in the pool.
406+
function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args...; kwargs...)
407+
worker = take!(pool)
408+
f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker)))
409+
isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker
410+
411+
local x
412+
try
413+
x = rc_f(exec_from_cache, worker, f_ref, args...; kwargs...)
414+
catch
415+
put!(pool, worker)
416+
rethrow()
417+
end
418+
419+
t = Threads.@spawn Threads.threadpool() try
420+
wait(x)
421+
finally
422+
put!(pool, worker)
423+
end
424+
errormonitor(t)
425+
426+
return x
427+
end

test/distributed_exec.jl

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,8 @@ wp = WorkerPool(workers())
718718
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp)))
719719
wp = WorkerPool(2:3)
720720
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]
721+
@test fetch(remotecall(myid, wp)) in wp.workers
722+
@test_throws RemoteException fetch(remotecall(error, wp))
721723

722724
# wait on worker pool
723725
wp = WorkerPool(2:2)
@@ -743,6 +745,8 @@ status = timedwait(() -> isready(f), 10)
743745
# CachingPool tests
744746
wp = CachingPool(workers())
745747
@test [1:100...] == pmap(x->x, wp, 1:100)
748+
@test fetch(remotecall(myid, wp)) in wp.workers
749+
@test_throws RemoteException fetch(remotecall(error, wp))
746750

747751
clear!(wp)
748752
@test length(wp.map_obj2ref) == 0
@@ -1092,16 +1096,18 @@ end
10921096
# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
10931097
# keep the worker out of the pool until the underlying remotecall has
10941098
# finished.
1095-
let
1096-
remotechan = RemoteChannel(wrkr1)
1097-
pool = WorkerPool([wrkr1])
1098-
put_future = remotecall(() -> wait(remotechan), pool)
1099-
@test !isready(pool)
1100-
put!(remotechan, 1)
1101-
wait(put_future)
1102-
# The task that waits on the future to put it back into the pool runs
1103-
# asynchronously so we use timedwait() to check when the worker is back in.
1104-
@test timedwait(() -> isready(pool), 10) === :ok
1099+
for PoolType in (WorkerPool, CachingPool)
1100+
let
1101+
remotechan = RemoteChannel(wrkr1)
1102+
pool = PoolType([wrkr1])
1103+
put_future = remotecall(() -> wait(remotechan), pool)
1104+
@test !isready(pool)
1105+
put!(remotechan, 1)
1106+
wait(put_future)
1107+
# The task that waits on the future to put it back into the pool runs
1108+
# asynchronously so we use timedwait() to check when the worker is back in.
1109+
@test timedwait(() -> isready(pool), 10) === :ok
1110+
end
11051111
end
11061112

11071113
# Test calling @everywhere from a module not defined on the workers

0 commit comments

Comments
 (0)