Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ version = "1.0.2"

[deps]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
IOCapture = "b5f81e59-6552-4d32-b1f0-c071b021bf89"
Malt = "36869731-bdee-424d-aa32-cab38c994e3b"
Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Scratch = "6c6a2e73-6563-6170-7368-637461726353"
Expand All @@ -16,8 +16,8 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[compat]
Dates = "1"
Distributed = "1"
IOCapture = "0.2.5"
Malt = "1.2.1"
Printf = "1"
Random = "1"
Scratch = "1.3.0"
Expand Down
60 changes: 30 additions & 30 deletions src/ParallelTestRunner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module ParallelTestRunner

export runtests, addworkers, addworker

using Distributed
using Malt
using Dates
using Printf: @sprintf
using Base.Filesystem: path_separator
Expand Down Expand Up @@ -375,26 +375,28 @@ function test_exe()
return test_exeflags
end

# Map PIDs to logical worker IDs
# Malt doesn't have a global worker ID, and PID make printing ugly
const WORKER_IDS = Dict{Int32, Int32}()
worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid]

"""
addworkers(X; kwargs...)

Add `X` worker processes, with additional keyword arguments passed to `Distributed.addprocs`.
Add `X` worker processes.
"""
function addworkers(X; kwargs...)
addworkers(X; kwargs...) = [addworker(; kwargs...) for _ in 1:X]
function addworker(; env=Vector{Pair{String, String}}())
exe = test_exe()
exename = exe[1]
exeflags = exe[2:end]

return withenv("JULIA_NUM_THREADS" => 1, "OPENBLAS_NUM_THREADS" => 1) do
addprocs(X; exename, exeflags, kwargs...)
end
end
addworker(; kwargs...) = addworkers(1; kwargs...)[1]
push!(env, "JULIA_NUM_THREADS" => "1")
# Malt already sets OPENBLAS_NUM_THREADS to 1
push!(env, "OPENBLAS_NUM_THREADS" => "1")

function recycle_worker(p)
rmprocs(p, waitfor = 30)

return nothing
wrkr = Malt.Worker(;exeflags, env)
WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1
return wrkr
end

"""
Expand Down Expand Up @@ -572,7 +574,8 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
end
jobs = clamp(jobs, 1, length(tests))
println(stdout, "Running $jobs tests in parallel. If this is too many, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.")
addworkers(min(jobs, length(tests)))
workers = addworkers(min(jobs, length(tests)))
nworkers = length(workers)

t0 = time()
results = []
Expand Down Expand Up @@ -601,11 +604,8 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
workerheader = "(Worker)"
name_align = maximum(
[
textwidth(testgroupheader) + textwidth(" ") +
textwidth(workerheader); map(
x -> textwidth(x) +
3 + ndigits(nworkers()), tests
)
textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader);
map(x -> textwidth(x) + 5, tests)
]
)

Expand Down Expand Up @@ -765,11 +765,11 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
#

worker_tasks = Task[]
for p in workers()
for p in workers
push!(worker_tasks, @async begin
while !done
# if a worker failed, spawn a new one
if p === nothing
if !Malt.isrunning(p)
p = addworker()
end

Expand All @@ -780,16 +780,16 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
wrkr = something(test_worker(test), p)

test_t0 = time()
running_tests[test] = (wrkr, test_t0)
running_tests[test] = (worker_id(wrkr), test_t0)

test, wrkr, test_t0
end

# run the test
put!(printer_channel, (:started, test, wrkr))
put!(printer_channel, (:started, test, worker_id(wrkr)))
result = try
Distributed.remotecall_eval(Main, wrkr, :(import ParallelTestRunner))
remotecall_fetch(runtest, wrkr, RecordType, test_runners[test], test,
Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner))
Malt.remote_call_fetch(invokelatest, wrkr, runtest, RecordType, test_runners[test], test,
init_code, io_ctx.color)
catch ex
if isa(ex, InterruptException)
Expand All @@ -806,27 +806,27 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
# act on the results
if result isa AbstractTestRecord
@assert result isa RecordType
put!(printer_channel, (:finished, test, wrkr, result))
put!(printer_channel, (:finished, test, worker_id(wrkr), result))

if memory_usage(result) > max_worker_rss
# the worker has reached the max-rss limit, recycle it
# so future tests start with a smaller working set
p = recycle_worker(p)
Malt.stop(wrkr)
end
else
@assert result isa Exception
put!(printer_channel, (:crashed, test, wrkr))
put!(printer_channel, (:crashed, test, worker_id(wrkr)))
if do_quickfail
stop_work()
end

# the worker encountered some serious failure, recycle it
p = recycle_worker(p)
Malt.stop(wrkr)
end

# get rid of the custom worker
if wrkr != p
recycle_worker(wrkr)
Malt.stop(wrkr)
end

delete!(running_tests, test)
Expand Down
6 changes: 1 addition & 5 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,17 @@ end
end
)

print("""NOTE: The next test is expected to crash a worker process,
which may print some output to the terminal.
""")
io = IOBuffer()
@test_throws Test.FallbackTestSetException("Test run finished with errors") begin
runtests(ParallelTestRunner, ["--verbose"]; custom_tests, stdout=io, stderr=io)
end
println()

str = String(take!(io))
@test contains(str, r"abort .+ started at")
@test contains(str, r"abort .+ crashed at")
@test contains(str, "FAILURE")
@test contains(str, "Error During Test")
@test contains(str, "ProcessExitedException")
@test contains(str, "Malt.TerminatedWorkerException")
end

@testset "test output" begin
Expand Down