Skip to content

Commit 6a081d8

Browse files
IanButterworthJamesWrigley
authored andcommitted
Add CachingPool method for remotecall
1 parent 365610f commit 6a081d8

File tree

2 files changed

+42
-9
lines changed

2 files changed

+42
-9
lines changed

src/workerpool.jl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,3 +400,28 @@ function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...)
400400
put!(pool, worker)
401401
end
402402
end
403+
404+
# Specialization for remotecall. We have to wait for the Future it returns
405+
# before putting the worker back in the pool.
406+
function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args...; kwargs...)
407+
worker = take!(pool)
408+
f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker)))
409+
isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker
410+
411+
local x
412+
try
413+
x = rc_f(exec_from_cache, worker, f_ref, args...; kwargs...)
414+
catch
415+
put!(pool, worker)
416+
rethrow()
417+
end
418+
419+
t = Threads.@spawn Threads.threadpool() try
420+
wait(x)
421+
finally
422+
put!(pool, worker)
423+
end
424+
errormonitor(t)
425+
426+
return x
427+
end

test/distributed_exec.jl

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,8 @@ end
722722
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp)))
723723
wp = WorkerPool(2:3)
724724
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]
725+
@test fetch(remotecall(myid, wp)) in wp.workers
726+
@test_throws RemoteException fetch(remotecall(error, wp))
725727

726728
# wait on worker pool
727729
wp = WorkerPool(2:2)
@@ -747,6 +749,8 @@ end
747749
# CachingPool tests
748750
wp = CachingPool(workers())
749751
@test [1:100...] == pmap(x->x, wp, 1:100)
752+
@test fetch(remotecall(myid, wp)) in wp.workers
753+
@test_throws RemoteException fetch(remotecall(error, wp))
750754

751755
clear!(wp)
752756
@test length(wp.map_obj2ref) == 0
@@ -1017,15 +1021,19 @@ f16091b = () -> 1
10171021
# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
10181022
# keep the worker out of the pool until the underlying remotecall has
10191023
# finished.
1020-
remotechan = RemoteChannel(wrkr1)
1021-
pool = WorkerPool([wrkr1])
1022-
put_future = remotecall(() -> wait(remotechan), pool)
1023-
@test !isready(pool)
1024-
put!(remotechan, 1)
1025-
wait(put_future)
1026-
# The task that waits on the future to put it back into the pool runs
1027-
# asynchronously so we use timedwait() to check when the worker is back in.
1028-
@test timedwait(() -> isready(pool), 10) === :ok
1024+
for PoolType in (WorkerPool, CachingPool)
1025+
let
1026+
remotechan = RemoteChannel(wrkr1)
1027+
pool = PoolType([wrkr1])
1028+
put_future = remotecall(() -> wait(remotechan), pool)
1029+
@test !isready(pool)
1030+
put!(remotechan, 1)
1031+
wait(put_future)
1032+
# The task that waits on the future to put it back into the pool runs
1033+
# asynchronously so we use timedwait() to check when the worker is back in.
1034+
@test timedwait(() -> isready(pool), 10) === :ok
1035+
end
1036+
end
10291037

10301038
# Test calling @everywhere from a module not defined on the workers
10311039
LocalBar.bar()

0 commit comments

Comments
 (0)