Skip to content

Commit dd1d319

Browse files
committed
Use Malt instead of Distributed.jl
1 parent 26a0984 commit dd1d319

File tree

3 files changed

+34
-20
lines changed

3 files changed

+34
-20
lines changed

Project.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ version = "1.0.2"
55

66
[deps]
77
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
8-
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
98
IOCapture = "b5f81e59-6552-4d32-b1f0-c071b021bf89"
9+
Malt = "36869731-bdee-424d-aa32-cab38c994e3b"
1010
Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7"
1111
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
1212
Scratch = "6c6a2e73-6563-6170-7368-637461726353"
@@ -16,8 +16,8 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
1616

1717
[compat]
1818
Dates = "1"
19-
Distributed = "1"
2019
IOCapture = "0.2.5"
20+
Malt = "1.2.1"
2121
Printf = "1"
2222
Random = "1"
2323
Scratch = "1.3.0"

src/ParallelTestRunner.jl

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module ParallelTestRunner
22

33
export runtests, addworkers, addworker
44

5-
using Distributed
5+
using Malt
66
using Dates
77
using Printf: @sprintf
88
using Base.Filesystem: path_separator
@@ -375,24 +375,37 @@ function test_exe()
375375
return test_exeflags
376376
end
377377

378+
# Map PIDs to logical worker IDs
379+
# Malt doesn't have a global worker ID, and PID make printing ugly
380+
const WORKER_IDS = Dict{Int32, Int32}()
381+
382+
worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid]
383+
378384
"""
379385
addworkers(X; kwargs...)
380386
381-
Add `X` worker processes, with additional keyword arguments passed to `Distributed.addprocs`.
387+
Add `X` worker processes.
382388
"""
383-
function addworkers(X; kwargs...)
389+
function addworkers(X; env=Vector{Pair{String, String}}())
384390
exe = test_exe()
385-
exename = exe[1]
386391
exeflags = exe[2:end]
387392

388-
return withenv("JULIA_NUM_THREADS" => 1, "OPENBLAS_NUM_THREADS" => 1) do
389-
addprocs(X; exename, exeflags, kwargs...)
393+
push!(env, "JULIA_NUM_THREADS" => "1")
394+
# Malt already sets OPENBLAS_NUM_THREADS to 1
395+
push!(env, "OPENBLAS_NUM_THREADS" => "1")
396+
397+
workers = Malt.Worker[]
398+
for _ in 1:X
399+
wrkr = Malt.Worker(;exeflags, env)
400+
WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1
401+
push!(workers, wrkr)
390402
end
403+
return workers
391404
end
392405
addworker(; kwargs...) = addworkers(1; kwargs...)[1]
393406

394-
function recycle_worker(p)
395-
rmprocs(p, waitfor = 30)
407+
function recycle_worker(w::Malt.Worker)
408+
Malt.stop(w; exit_timeout = 15.0, term_timeout = 15.0)
396409

397410
return nothing
398411
end
@@ -572,7 +585,8 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
572585
end
573586
jobs = clamp(jobs, 1, length(tests))
574587
println(stdout, "Running $jobs tests in parallel. If this is too many, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.")
575-
addworkers(min(jobs, length(tests)))
588+
workers = addworkers(min(jobs, length(tests)))
589+
nworkers = length(workers)
576590

577591
t0 = time()
578592
results = []
@@ -604,7 +618,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
604618
textwidth(testgroupheader) + textwidth(" ") +
605619
textwidth(workerheader); map(
606620
x -> textwidth(x) +
607-
3 + ndigits(nworkers()), tests
621+
3 + ndigits(nworkers), tests
608622
)
609623
]
610624
)
@@ -765,7 +779,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
765779
#
766780

767781
worker_tasks = Task[]
768-
for p in workers()
782+
for p in workers
769783
push!(worker_tasks, @async begin
770784
while !done
771785
# if a worker failed, spawn a new one
@@ -780,16 +794,16 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
780794
wrkr = something(test_worker(test), p)
781795

782796
test_t0 = time()
783-
running_tests[test] = (wrkr, test_t0)
797+
running_tests[test] = (worker_id(wrkr), test_t0)
784798

785799
test, wrkr, test_t0
786800
end
787801

788802
# run the test
789-
put!(printer_channel, (:started, test, wrkr))
803+
put!(printer_channel, (:started, test, worker_id(wrkr)))
790804
result = try
791-
Distributed.remotecall_eval(Main, wrkr, :(import ParallelTestRunner))
792-
remotecall_fetch(runtest, wrkr, RecordType, test_runners[test], test,
805+
Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner))
806+
Malt.remote_call_fetch(invokelatest, wrkr, runtest, RecordType, test_runners[test], test,
793807
init_code, io_ctx.color)
794808
catch ex
795809
if isa(ex, InterruptException)
@@ -806,7 +820,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
806820
# act on the results
807821
if result isa AbstractTestRecord
808822
@assert result isa RecordType
809-
put!(printer_channel, (:finished, test, wrkr, result))
823+
put!(printer_channel, (:finished, test, worker_id(wrkr), result))
810824

811825
if memory_usage(result) > max_worker_rss
812826
# the worker has reached the max-rss limit, recycle it
@@ -815,7 +829,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T
815829
end
816830
else
817831
@assert result isa Exception
818-
put!(printer_channel, (:crashed, test, wrkr))
832+
put!(printer_channel, (:crashed, test, worker_id(wrkr)))
819833
if do_quickfail
820834
stop_work()
821835
end

test/runtests.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ end
141141
@test contains(str, r"abort .+ crashed at")
142142
@test contains(str, "FAILURE")
143143
@test contains(str, "Error During Test")
144-
@test contains(str, "ProcessExitedException")
144+
@test contains(str, "Malt.TerminatedWorkerException")
145145
end
146146

147147
@testset "test output" begin

0 commit comments

Comments
 (0)