Skip to content

Commit 5b4a712

Browse files
committed
feat: add worker_timeout
1 parent 2cc71ae commit 5b4a712

File tree

4 files changed

+23
-1
lines changed

4 files changed

+23
-1
lines changed

src/Configure.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ function configure_workers(;
368368
procs::Union{Vector{Int},Nothing},
369369
numprocs::Int,
370370
addprocs_function::Function,
371+
worker_timeout::Union{Float64,Nothing},
371372
options::AbstractOptions,
372373
@nospecialize(worker_imports::Union{Vector{Symbol},Nothing}),
373374
project_path,
@@ -378,7 +379,9 @@ function configure_workers(;
378379
runtests::Bool,
379380
)
380381
(procs, we_created_procs) = if procs === nothing
381-
(addprocs_function(numprocs; lazy=false, exeflags), true)
382+
withenv("JULIA_WORKER_TIMEOUT" => string(worker_timeout)) do
383+
(addprocs_function(numprocs; lazy=false, exeflags), true)
384+
end
382385
else
383386
(procs, false)
384387
end

src/MLJInterface.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ function modelexpr(
7777
numprocs::Union{Int,Nothing} = nothing
7878
procs::Union{Vector{Int},Nothing} = nothing
7979
addprocs_function::Union{Function,Nothing} = nothing
80+
worker_timeout::Union{Real,Nothing} = nothing
8081
heap_size_hint_in_bytes::Union{Integer,Nothing} = nothing
8182
worker_imports::Union{Vector{Symbol},Nothing} = nothing
8283
logger::Union{AbstractSRLogger,Nothing} = nothing
@@ -299,6 +300,7 @@ function _update(
299300
numprocs=m.numprocs,
300301
procs=m.procs,
301302
addprocs_function=m.addprocs_function,
303+
worker_timeout=m.worker_timeout,
302304
heap_size_hint_in_bytes=m.heap_size_hint_in_bytes,
303305
worker_imports=m.worker_imports,
304306
runtests=m.runtests,

src/SearchUtils.jl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ struct RuntimeOptions{PARALLELISM,DIM_OUT,RETURN_STATE,LOGGER} <: AbstractRuntim
8282
numprocs::Int64
8383
init_procs::Union{Vector{Int},Nothing}
8484
addprocs_function::Function
85+
worker_timeout::Float64
8586
exeflags::Cmd
8687
worker_imports::Union{Vector{Symbol},Nothing}
8788
runtests::Bool
@@ -117,6 +118,7 @@ end
117118
numprocs::Union{Int,Nothing}=nothing,
118119
procs::Union{Vector{Int},Nothing}=nothing,
119120
addprocs_function::Union{Function,Nothing}=nothing,
121+
worker_timeout::Union{Real,Nothing}=nothing,
120122
heap_size_hint_in_bytes::Union{Integer,Nothing}=nothing,
121123
worker_imports::Union{Vector{Symbol},Nothing}=nothing,
122124
runtests::Bool=true,
@@ -190,6 +192,13 @@ end
190192
_verbosity = something(verbosity, options_verbosity, 1)
191193
_progress = something(progress, options_progress, (_verbosity > 0) && nout == 1)
192194
_addprocs_function = something(addprocs_function, addprocs)
195+
_worker_timeout = Float64(
196+
@something(
197+
worker_timeout,
198+
tryparse(Float64, get(ENV, "JULIA_WORKER_TIMEOUT", "")),
199+
min(60, _numprocs^2)
200+
)
201+
)
193202
_run_id = @something(run_id, generate_run_id())
194203

195204
exeflags = if concurrency == :multiprocessing && isnothing(procs)
@@ -211,6 +220,7 @@ end
211220
_numprocs,
212221
procs,
213222
_addprocs_function,
223+
_worker_timeout,
214224
exeflags,
215225
worker_imports,
216226
runtests,

src/SymbolicRegression.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,10 @@ which is useful for debugging and profiling.
406406
which is the number of processes to use, as well as the `lazy` keyword argument.
407407
For example, if set up on a slurm cluster, you could pass
408408
`addprocs_function = addprocs_slurm`, which will set up slurm processes.
409+
- `worker_timeout::Union{Real,Nothing}=nothing`: Timeout in seconds for worker processes
410+
to establish connection with the master process. If `JULIA_WORKER_TIMEOUT` is already set,
411+
that value is used. Otherwise defaults to `min(60, numprocs^2)`. When explicitly provided,
412+
this temporarily overrides `JULIA_WORKER_TIMEOUT` only during worker creation.
409413
- `heap_size_hint_in_bytes::Union{Int,Nothing}=nothing`: On Julia 1.9+, you may set the `--heap-size-hint`
410414
flag on Julia processes, recommending garbage collection once a process
411415
is close to the recommended size. This is important for long-running distributed
@@ -469,6 +473,7 @@ function equation_search(
469473
numprocs::Union{Int,Nothing}=nothing,
470474
procs::Union{Vector{Int},Nothing}=nothing,
471475
addprocs_function::Union{Function,Nothing}=nothing,
476+
worker_timeout::Union{Real,Nothing}=nothing,
472477
heap_size_hint_in_bytes::Union{Integer,Nothing}=nothing,
473478
worker_imports::Union{Vector{Symbol},Nothing}=nothing,
474479
runtests::Bool=true,
@@ -520,6 +525,7 @@ function equation_search(
520525
numprocs=numprocs,
521526
procs=procs,
522527
addprocs_function=addprocs_function,
528+
worker_timeout=worker_timeout,
523529
heap_size_hint_in_bytes=heap_size_hint_in_bytes,
524530
worker_imports=worker_imports,
525531
runtests=runtests,
@@ -644,6 +650,7 @@ end
644650
procs=ropt.init_procs,
645651
ropt.numprocs,
646652
ropt.addprocs_function,
653+
ropt.worker_timeout,
647654
options,
648655
worker_imports=ropt.worker_imports,
649656
project_path=splitdir(Pkg.project().path)[1],

0 commit comments

Comments
 (0)