diff --git a/Project.toml b/Project.toml index bb30760..8faf61c 100644 --- a/Project.toml +++ b/Project.toml @@ -7,9 +7,19 @@ Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" +[compat] +Aqua = "0.8.10" +LinearAlgebra = "1" +Random = "1" +Serialization = "1" +Sockets = "1" +Test = "1" +julia = "1" + [extras] +Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["LinearAlgebra", "Test"] +test = ["Aqua", "LinearAlgebra", "Test"] diff --git a/src/workerpool.jl b/src/workerpool.jl index a6bc2b7..6cd355e 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -149,6 +149,7 @@ function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool, t = Threads.@spawn Threads.threadpool() try wait(x) + catch # just wait, ignore errors here finally put!(pool, worker) end @@ -400,3 +401,29 @@ function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...) put!(pool, worker) end end + +# Specialization for remotecall. We have to wait for the Future it returns +# before putting the worker back in the pool. +function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args...; kwargs...) + worker = take!(pool) + f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker))) + isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker + + local x + try + x = rc_f(exec_from_cache, worker, f_ref, args...; kwargs...) + catch + put!(pool, worker) + rethrow() + end + + t = Threads.@spawn Threads.threadpool() try + wait(x) + catch # just wait, ignore errors here + finally + put!(pool, worker) + end + errormonitor(t) + + return x +end diff --git a/test/aqua.jl b/test/aqua.jl new file mode 100644 index 0000000..fc0c1be --- /dev/null +++ b/test/aqua.jl @@ -0,0 +1,9 @@ +using Aqua +using Distributed + +Aqua.test_all( + Distributed, + # This should be excluded, but it's not clear how to do that on Aqua's API + # given it's not-defined. (The Julia Base ambiguity test does it something like this) + # ambiguities=(exclude=[GlobalRef(Distributed, :cluster_manager)]) +) diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 18d3cc4..b7f7b63 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -718,6 +718,8 @@ wp = WorkerPool(workers()) @test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp))) wp = WorkerPool(2:3) @test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3] +@test fetch(remotecall(myid, wp)) in wp.workers +@test_throws RemoteException fetch(remotecall(error, wp)) # wait on worker pool wp = WorkerPool(2:2) @@ -743,6 +745,8 @@ status = timedwait(() -> isready(f), 10) # CachingPool tests wp = CachingPool(workers()) @test [1:100...] == pmap(x->x, wp, 1:100) +@test fetch(remotecall(myid, wp)) in wp.workers +@test_throws RemoteException fetch(remotecall(error, wp)) clear!(wp) @test length(wp.map_obj2ref) == 0 @@ -1092,16 +1096,18 @@ end # Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should # keep the worker out of the pool until the underlying remotecall has # finished. -let - remotechan = RemoteChannel(wrkr1) - pool = WorkerPool([wrkr1]) - put_future = remotecall(() -> wait(remotechan), pool) - @test !isready(pool) - put!(remotechan, 1) - wait(put_future) - # The task that waits on the future to put it back into the pool runs - # asynchronously so we use timedwait() to check when the worker is back in. - @test timedwait(() -> isready(pool), 10) === :ok +for PoolType in (WorkerPool, CachingPool) + let + remotechan = RemoteChannel(wrkr1) + pool = PoolType([wrkr1]) + put_future = remotecall(() -> wait(remotechan), pool) + @test !isready(pool) + put!(remotechan, 1) + wait(put_future) + # The task that waits on the future to put it back into the pool runs + # asynchronously so we use timedwait() to check when the worker is back in. + @test timedwait(() -> isready(pool), 10) === :ok + end end # Test calling @everywhere from a module not defined on the workers diff --git a/test/runtests.jl b/test/runtests.jl index 085bd18..84e1c81 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,6 +1,13 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license using Test +using Distributed + +if !Base.get_bool_env("BUILDKITE", false) + @testset "Aqua.jl tests" begin + include("aqua.jl") + end +end # Run the distributed test outside of the main driver since it needs its own # set of dedicated workers.