Skip to content

Commit b87426d

Browse files
authored
Merge pull request #488 from MilesCranmer/timeout_parameter
feat: add `worker_timeout` to allow workers more time to connect
2 parents a5b5c3e + 82137af commit b87426d

File tree

4 files changed

+22
-1
lines changed

4 files changed

+22
-1
lines changed

src/Configure.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ function configure_workers(;
368368
procs::Union{Vector{Int},Nothing},
369369
numprocs::Int,
370370
addprocs_function::Function,
371+
worker_timeout::Float64,
371372
options::AbstractOptions,
372373
@nospecialize(worker_imports::Union{Vector{Symbol},Nothing}),
373374
project_path,
@@ -378,7 +379,9 @@ function configure_workers(;
378379
runtests::Bool,
379380
)
380381
(procs, we_created_procs) = if procs === nothing
381-
(addprocs_function(numprocs; lazy=false, exeflags), true)
382+
withenv("JULIA_WORKER_TIMEOUT" => string(worker_timeout)) do
383+
(addprocs_function(numprocs; lazy=false, exeflags), true)
384+
end
382385
else
383386
(procs, false)
384387
end

src/MLJInterface.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ function modelexpr(
7878
procs::Union{Vector{Int},Nothing} = nothing
7979
addprocs_function::Union{Function,Nothing} = nothing
8080
heap_size_hint_in_bytes::Union{Integer,Nothing} = nothing
81+
worker_timeout::Union{Real,Nothing} = nothing
8182
worker_imports::Union{Vector{Symbol},Nothing} = nothing
8283
logger::Union{AbstractSRLogger,Nothing} = nothing
8384
runtests::Bool = true
@@ -300,6 +301,7 @@ function _update(
300301
procs=m.procs,
301302
addprocs_function=m.addprocs_function,
302303
heap_size_hint_in_bytes=m.heap_size_hint_in_bytes,
304+
worker_timeout=m.worker_timeout,
303305
worker_imports=m.worker_imports,
304306
runtests=m.runtests,
305307
saved_state=(old_fitresult === nothing ? nothing : old_fitresult.state),

src/SearchUtils.jl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ struct RuntimeOptions{PARALLELISM,DIM_OUT,RETURN_STATE,LOGGER} <: AbstractRuntim
8282
numprocs::Int64
8383
init_procs::Union{Vector{Int},Nothing}
8484
addprocs_function::Function
85+
worker_timeout::Float64
8586
exeflags::Cmd
8687
worker_imports::Union{Vector{Symbol},Nothing}
8788
runtests::Bool
@@ -118,6 +119,7 @@ end
118119
procs::Union{Vector{Int},Nothing}=nothing,
119120
addprocs_function::Union{Function,Nothing}=nothing,
120121
heap_size_hint_in_bytes::Union{Integer,Nothing}=nothing,
122+
worker_timeout::Union{Real,Nothing}=nothing,
121123
worker_imports::Union{Vector{Symbol},Nothing}=nothing,
122124
runtests::Bool=true,
123125
return_state::VRS=nothing,
@@ -190,6 +192,13 @@ end
190192
_verbosity = something(verbosity, options_verbosity, 1)
191193
_progress = something(progress, options_progress, (_verbosity > 0) && nout == 1)
192194
_addprocs_function = something(addprocs_function, addprocs)
195+
_worker_timeout = Float64(
196+
something(
197+
worker_timeout,
198+
tryparse(Float64, get(ENV, "JULIA_WORKER_TIMEOUT", "")),
199+
max(60, _numprocs^2),
200+
),
201+
)
193202
_run_id = @something(run_id, generate_run_id())
194203

195204
exeflags = if concurrency == :multiprocessing && isnothing(procs)
@@ -211,6 +220,7 @@ end
211220
_numprocs,
212221
procs,
213222
_addprocs_function,
223+
_worker_timeout,
214224
exeflags,
215225
worker_imports,
216226
runtests,

src/SymbolicRegression.jl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,9 @@ which is useful for debugging and profiling.
411411
is close to the recommended size. This is important for long-running distributed
412412
jobs where each process has an independent memory, and can help avoid
413413
out-of-memory errors. By default, this is set to `Sys.free_memory() / numprocs`.
414+
- `worker_timeout::Union{Real,Nothing}=nothing`: Timeout in seconds for worker processes
415+
to establish connection with the master process. If `JULIA_WORKER_TIMEOUT` is already set,
416+
that value is used. Otherwise defaults to `max(60, numprocs^2)`.
414417
- `worker_imports::Union{Vector{Symbol},Nothing}=nothing`: If you want to import
415418
additional modules on each worker, pass them here as a vector of symbols.
416419
By default some of the extensions will automatically be loaded when needed.
@@ -470,6 +473,7 @@ function equation_search(
470473
procs::Union{Vector{Int},Nothing}=nothing,
471474
addprocs_function::Union{Function,Nothing}=nothing,
472475
heap_size_hint_in_bytes::Union{Integer,Nothing}=nothing,
476+
worker_timeout::Union{Real,Nothing}=nothing,
473477
worker_imports::Union{Vector{Symbol},Nothing}=nothing,
474478
runtests::Bool=true,
475479
saved_state=nothing,
@@ -521,6 +525,7 @@ function equation_search(
521525
procs=procs,
522526
addprocs_function=addprocs_function,
523527
heap_size_hint_in_bytes=heap_size_hint_in_bytes,
528+
worker_timeout=worker_timeout,
524529
worker_imports=worker_imports,
525530
runtests=runtests,
526531
saved_state=saved_state,
@@ -644,6 +649,7 @@ end
644649
procs=ropt.init_procs,
645650
ropt.numprocs,
646651
ropt.addprocs_function,
652+
ropt.worker_timeout,
647653
options,
648654
worker_imports=ropt.worker_imports,
649655
project_path=splitdir(Pkg.project().path)[1],

0 commit comments

Comments
 (0)