From c7eb21da57c8fda807aa1b754fd9028af0872a51 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Wed, 15 Oct 2025 16:35:48 +0200 Subject: [PATCH 1/3] Use Malt instead of Distributed.jl --- Project.toml | 4 ++-- src/ParallelTestRunner.jl | 48 +++++++++++++++++++++++++-------------- test/runtests.jl | 2 +- 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/Project.toml b/Project.toml index 6c83409..7b227ea 100644 --- a/Project.toml +++ b/Project.toml @@ -5,8 +5,8 @@ version = "1.0.2" [deps] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" -Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" IOCapture = "b5f81e59-6552-4d32-b1f0-c071b021bf89" +Malt = "36869731-bdee-424d-aa32-cab38c994e3b" Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Scratch = "6c6a2e73-6563-6170-7368-637461726353" @@ -16,8 +16,8 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [compat] Dates = "1" -Distributed = "1" IOCapture = "0.2.5" +Malt = "1.2.1" Printf = "1" Random = "1" Scratch = "1.3.0" diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 074e161..7b1b7d9 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -2,7 +2,7 @@ module ParallelTestRunner export runtests, addworkers, addworker -using Distributed +using Malt using Dates using Printf: @sprintf using Base.Filesystem: path_separator @@ -375,24 +375,37 @@ function test_exe() return test_exeflags end +# Map PIDs to logical worker IDs +# Malt doesn't have a global worker ID, and PID make printing ugly +const WORKER_IDS = Dict{Int32, Int32}() + +worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid] + """ addworkers(X; kwargs...) -Add `X` worker processes, with additional keyword arguments passed to `Distributed.addprocs`. +Add `X` worker processes. """ -function addworkers(X; kwargs...) +function addworkers(X; env=Vector{Pair{String, String}}()) exe = test_exe() - exename = exe[1] exeflags = exe[2:end] - return withenv("JULIA_NUM_THREADS" => 1, "OPENBLAS_NUM_THREADS" => 1) do - addprocs(X; exename, exeflags, kwargs...) + push!(env, "JULIA_NUM_THREADS" => "1") + # Malt already sets OPENBLAS_NUM_THREADS to 1 + push!(env, "OPENBLAS_NUM_THREADS" => "1") + + workers = Malt.Worker[] + for _ in 1:X + wrkr = Malt.Worker(;exeflags, env) + WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1 + push!(workers, wrkr) end + return workers end addworker(; kwargs...) = addworkers(1; kwargs...)[1] -function recycle_worker(p) - rmprocs(p, waitfor = 30) +function recycle_worker(w::Malt.Worker) + Malt.stop(w; exit_timeout = 15.0, term_timeout = 15.0) return nothing end @@ -572,7 +585,8 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T end jobs = clamp(jobs, 1, length(tests)) println(stdout, "Running $jobs tests in parallel. If this is too many, specify the `--jobs=N` argument to the tests, or set the `JULIA_CPU_THREADS` environment variable.") - addworkers(min(jobs, length(tests))) + workers = addworkers(min(jobs, length(tests))) + nworkers = length(workers) t0 = time() results = [] @@ -604,7 +618,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader); map( x -> textwidth(x) + - 3 + ndigits(nworkers()), tests + 3 + ndigits(nworkers), tests ) ] ) @@ -765,7 +779,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T # worker_tasks = Task[] - for p in workers() + for p in workers push!(worker_tasks, @async begin while !done # if a worker failed, spawn a new one @@ -780,16 +794,16 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T wrkr = something(test_worker(test), p) test_t0 = time() - running_tests[test] = (wrkr, test_t0) + running_tests[test] = (worker_id(wrkr), test_t0) test, wrkr, test_t0 end # run the test - put!(printer_channel, (:started, test, wrkr)) + put!(printer_channel, (:started, test, worker_id(wrkr))) result = try - Distributed.remotecall_eval(Main, wrkr, :(import ParallelTestRunner)) - remotecall_fetch(runtest, wrkr, RecordType, test_runners[test], test, + Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner)) + Malt.remote_call_fetch(invokelatest, wrkr, runtest, RecordType, test_runners[test], test, init_code, io_ctx.color) catch ex if isa(ex, InterruptException) @@ -806,7 +820,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T # act on the results if result isa AbstractTestRecord @assert result isa RecordType - put!(printer_channel, (:finished, test, wrkr, result)) + put!(printer_channel, (:finished, test, worker_id(wrkr), result)) if memory_usage(result) > max_worker_rss # the worker has reached the max-rss limit, recycle it @@ -815,7 +829,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T end else @assert result isa Exception - put!(printer_channel, (:crashed, test, wrkr)) + put!(printer_channel, (:crashed, test, worker_id(wrkr))) if do_quickfail stop_work() end diff --git a/test/runtests.jl b/test/runtests.jl index 35912e2..94378d4 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -141,7 +141,7 @@ end @test contains(str, r"abort .+ crashed at") @test contains(str, "FAILURE") @test contains(str, "Error During Test") - @test contains(str, "ProcessExitedException") + @test contains(str, "Malt.TerminatedWorkerException") end @testset "test output" begin From 99fe41787f529a70886f7893bf3eecd94e21a8c7 Mon Sep 17 00:00:00 2001 From: Tim Besard Date: Thu, 16 Oct 2025 08:50:02 +0200 Subject: [PATCH 2/3] Simplifications. --- src/ParallelTestRunner.jl | 7 ++----- test/runtests.jl | 4 ---- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 7b1b7d9..4f19b82 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -615,11 +615,8 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T workerheader = "(Worker)" name_align = maximum( [ - textwidth(testgroupheader) + textwidth(" ") + - textwidth(workerheader); map( - x -> textwidth(x) + - 3 + ndigits(nworkers), tests - ) + textwidth(testgroupheader) + textwidth(" ") + textwidth(workerheader); + map(x -> textwidth(x) + 5, tests) ] ) diff --git a/test/runtests.jl b/test/runtests.jl index 94378d4..4dcc0b4 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -127,14 +127,10 @@ end end ) - print("""NOTE: The next test is expected to crash a worker process, - which may print some output to the terminal. - """) io = IOBuffer() @test_throws Test.FallbackTestSetException("Test run finished with errors") begin runtests(ParallelTestRunner, ["--verbose"]; custom_tests, stdout=io, stderr=io) end - println() str = String(take!(io)) @test contains(str, r"abort .+ started at") From 11f4f7853305473edf9b516eea18b498f71eb1ae Mon Sep 17 00:00:00 2001 From: Tim Besard Date: Thu, 16 Oct 2025 08:57:21 +0200 Subject: [PATCH 3/3] More simplifications: rely on Malt.isrunning. --- src/ParallelTestRunner.jl | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index 4f19b82..a5409e0 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -378,7 +378,6 @@ end # Map PIDs to logical worker IDs # Malt doesn't have a global worker ID, and PID make printing ugly const WORKER_IDS = Dict{Int32, Int32}() - worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid] """ @@ -386,7 +385,8 @@ worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid] Add `X` worker processes. """ -function addworkers(X; env=Vector{Pair{String, String}}()) +addworkers(X; kwargs...) = [addworker(; kwargs...) for _ in 1:X] +function addworker(; env=Vector{Pair{String, String}}()) exe = test_exe() exeflags = exe[2:end] @@ -394,20 +394,9 @@ function addworkers(X; env=Vector{Pair{String, String}}()) # Malt already sets OPENBLAS_NUM_THREADS to 1 push!(env, "OPENBLAS_NUM_THREADS" => "1") - workers = Malt.Worker[] - for _ in 1:X - wrkr = Malt.Worker(;exeflags, env) - WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1 - push!(workers, wrkr) - end - return workers -end -addworker(; kwargs...) = addworkers(1; kwargs...)[1] - -function recycle_worker(w::Malt.Worker) - Malt.stop(w; exit_timeout = 15.0, term_timeout = 15.0) - - return nothing + wrkr = Malt.Worker(;exeflags, env) + WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1 + return wrkr end """ @@ -780,7 +769,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T push!(worker_tasks, @async begin while !done # if a worker failed, spawn a new one - if p === nothing + if !Malt.isrunning(p) p = addworker() end @@ -822,7 +811,7 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T if memory_usage(result) > max_worker_rss # the worker has reached the max-rss limit, recycle it # so future tests start with a smaller working set - p = recycle_worker(p) + Malt.stop(wrkr) end else @assert result isa Exception @@ -832,12 +821,12 @@ function runtests(mod::Module, ARGS; test_filter = Returns(true), RecordType = T end # the worker encountered some serious failure, recycle it - p = recycle_worker(p) + Malt.stop(wrkr) end # get rid of the custom worker if wrkr != p - recycle_worker(wrkr) + Malt.stop(wrkr) end delete!(running_tests, test)