diff --git a/Project.toml b/Project.toml index a50950ee3..610ed6c89 100644 --- a/Project.toml +++ b/Project.toml @@ -4,6 +4,7 @@ version = "0.22.0" [deps] Compat = "34da2185-b29b-5c13-b0c7-acf172513d20" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" FillArrays = "1a297f60-69ca-5386-bcde-b61e274b549b" LineSearches = "d3d80556-e9d4-5f37-9878-2ab0fcc64255" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" diff --git a/src/Optim.jl b/src/Optim.jl index 2a21165bc..0cecca801 100644 --- a/src/Optim.jl +++ b/src/Optim.jl @@ -37,7 +37,7 @@ using FillArrays # For handling scalar bounds in Fminbox # for extensions of functions defined in Base. import Base: length, push!, show, getindex, setindex!, maximum, minimum - +import Distributed: RemoteChannel, remote_do, fetch, put!, take!, @async, @sync # objective and constraints types and functions relevant to them. import NLSolversBase: NonDifferentiable, OnceDifferentiable, TwiceDifferentiable, nconstraints, nconstraints_x, NotInplaceObjective, InplaceObjective diff --git a/src/multivariate/solvers/constrained/samin.jl b/src/multivariate/solvers/constrained/samin.jl index c791a1472..0815af439 100644 --- a/src/multivariate/solvers/constrained/samin.jl +++ b/src/multivariate/solvers/constrained/samin.jl @@ -47,6 +47,7 @@ algorithm x_tol::T = 1e-6 # the required tolerance level for x coverage_ok::Bool = false # if false, increase temperature until initial parameter space is covered verbosity::Int = 1 # scalar: 0, 1, 2 or 3 (default = 1: see final results). + workers::Array{Int,1} = [] # associated worker processes for parallel evaluation end # * verbosity: scalar: 0, 1, 2 or 3 (default = 1). # * 0 = no screen output @@ -56,17 +57,42 @@ end # covered by the trial values. 1: start decreasing temperature immediately Base.summary(::SAMIN) = "SAMIN" + + +function remote_eval(d,Tx,lb_ub_bounds_chan,xp_fold_i_chan,ret_chan,stop_sig) + while !fetch(stop_sig) + lnobds = 0 + if timedwait(()->isready(xp_fold_i_chan),0.1) ==:ok + xp ,f_old, i = take!(xp_fold_i_chan) + lb, ub, bounds = fetch(lb_ub_bounds_chan) + xp[i] += (Tx(2.0) * rand(Tx) - Tx(1.0)) * bounds[i] + if (xp[i] < lb[i]) || (xp[i] > ub[i]) + xp[i] = lb[i] + (ub[i] - lb[i]) * rand(Tx) + lnobds =1 + end + f_proposal = value(d, xp) + put!(ret_chan,(xp,f_old,f_proposal,lnobds,i)) + + elseif fetch(stop_sig) # exit on stop + return nothing + end + end +end + + + function optimize(obj_fn, lb::AbstractArray, ub::AbstractArray, x::AbstractArray{Tx}, method::SAMIN, options::Options = Options()) where Tx + t0 = time() # Initial time stamp used to control early stopping by options.time_limit - + hline = "="^80 d = NonDifferentiable(obj_fn, x) tr = OptimizationTrace{typeof(value(d)), typeof(method)}() tracing = options.store_trace || options.show_trace || options.extended_trace || options.callback != nothing - @unpack nt, ns, rt, neps, f_tol, x_tol, coverage_ok, verbosity = method + @unpack nt, ns, rt, neps, f_tol, x_tol, coverage_ok, verbosity ,workers = method verbose = verbosity > 0 x0 = copy(x) @@ -96,7 +122,22 @@ function optimize(obj_fn, lb::AbstractArray, ub::AbstractArray, x::AbstractArray trace!(tr, d, (x=xopt, iteration=iteration), iteration, method, options, _time-t0) stopped_by_callback = false - + + if !isempty(workers) #prepare parallel evaluation + xp_fold_i_chan = RemoteChannel(()->Channel{Tuple}(1024)); #put and take jobs from here + ret_chan = RemoteChannel(()->Channel{Tuple}(1024)); #put and take results from here + lb_ub_bounds_chan = RemoteChannel(()->Channel{Tuple}(1)); #put and fetch current bounds here + stop_sig = RemoteChannel(()->Channel{Bool}(1)); # signal the workers to end the eval loop + put!(stop_sig,false) + for p in workers # start tasks on the workers to process evals in parallel + remote_do(remote_eval, p, d,Tx,lb_ub_bounds_chan, xp_fold_i_chan, ret_chan,stop_sig) + end + put!(lb_ub_bounds_chan,(lb,ub,bounds)) # Fill channel with initial bounds + #start the initial evaluations at same x + for i in (length(workers) < (n*ns) ? (1:length(workers)) : (1:(n*ns))) + put!(xp_fold_i_chan,(x,f_old,(i % n) + 1)) + end + end # main loop, first increase temperature until parameter space covered, then reduce until convergence while converge==0 # statistics to report at each temp change, set back to zero @@ -105,109 +146,117 @@ function optimize(obj_fn, lb::AbstractArray, ub::AbstractArray, x::AbstractArray nnew = 0 ndown = 0 lnobds = 0 + # repeat nt times then adjust temperature - for m = 1:nt + @sync for m = 1:nt + h=0 # repeat ns times, then adjust bounds nacp = zeros(n) - for j = 1:ns - # generate new point by taking last and adding a random value - # to each of elements, in turn - for h = 1:n - iteration += 1 - # new Sept 2011, if bounds are same, skip the search for that vbl. - # Allows restrictions without complicated programming - if (lb[h] != ub[h]) + while h ub[h]) - xp[h] = lb[h] + (ub[h] - lb[h]) * rand(Tx) - lnobds += 1 + xp[i] += (Tx(2.0) * rand(Tx) - Tx(1.0)) * bounds[i] + if (xp[i] < lb[i]) || (xp[i] > ub[i]) + xp[i] = lb[i] + (ub[i] - lb[i]) * rand(Tx) + lnobds =1 end - # Evaluate function at new point f_proposal = value(d, xp) - # Accept the new point if the function value decreases - if (f_proposal <= f_old) + + else + xp, f_old_r, f_proposal, lnobds, i = take!(ret_chan) + end + if (f_proposal <= f_old) + x = copy(xp) + f_old = f_proposal + nacc += 1 # total number of acceptances + nacp[i] += 1 # acceptances for this parameter + nup += 1 + # If lower than any other point, record as new optimum + if f_proposal < fopt + xopt = copy(xp) + fopt = f_proposal + d.F = f_proposal + nnew +=1 + details = [details; [iteration t f_proposal xp']] + end + # If the point is higher, use the Metropolis criteria to decide on + # acceptance or rejection. + else + p = exp(-(f_proposal - f_old) / t) + if rand(Tx) < p x = copy(xp) - f_old = f_proposal - nacc += 1 # total number of acceptances - nacp[h] += 1 # acceptances for this parameter - nup += 1 - # If lower than any other point, record as new optimum - if f_proposal < fopt - xopt = copy(xp) - fopt = f_proposal - d.F = f_proposal - nnew +=1 - details = [details; [f_calls(d) t f_proposal xp']] - end - # If the point is higher, use the Metropolis criteria to decide on - # acceptance or rejection. + f_old = copy(f_proposal) + d.F = f_proposal + nacc += 1 + nacp[i] += 1 + ndown += 1 else - p = exp(-(f_proposal - f_old) / t) - if rand(Tx) < p - x = copy(xp) - f_old = copy(f_proposal) - d.F = f_proposal - nacc += 1 - nacp[h] += 1 - ndown += 1 - else - nrej += 1 - end + nrej += 1 end end - if tracing - # update trace; callbacks can stop routine early by returning true - stopped_by_callback = trace!(tr, d, (x=xopt,iteration=iteration), iteration, method, options, time()-t0) - end - - # If options.iterations exceeded, terminate the algorithm - _time = time() - if f_calls(d) >= options.iterations || _time-t0 > options.time_limit || stopped_by_callback + !isempty(workers) && @async put!(xp_fold_i_chan,(x,f_old,i_vbl)) # next iteration, @async rusults in better scaling with workers + end + end - if verbose - println(hline) - println("SAMIN results") - println("NO CONVERGENCE: MAXEVALS exceeded") - @printf("\n Obj. value: %16.5f\n\n", fopt) - println(" parameter search width") - for i=1:n - @printf("%16.5f %16.5f \n", xopt[i], bounds[i]) - end - println(hline) - end - converge = 0 + if tracing + # update trace; callbacks can stop routine early by returning true + stopped_by_callback = trace!(tr, d, (x=xopt,iteration=iteration), iteration, method, options, time()-t0) + end + # If options.iterations exceeded, terminate the algorithm + _time = time() + if iteration >= options.iterations || _time-t0 > options.time_limit || stopped_by_callback + + # tell the workers to stop + !isempty(workers) && take!(stop_sig) + !isempty(workers) && put!(stop_sig,true) - return MultivariateOptimizationResults(method, - x0,# initial_x, - xopt, #pick_best_x(f_incr_pick, state), - fopt, # pick_best_f(f_incr_pick, state, d), - f_calls(d), #iteration, - f_calls(d) >= options.iterations, #iteration == options.iterations, - false, # x_converged, - 0.0,#T(options.x_tol), - 0.0,#T(options.x_tol), - NaN,# x_abschange(state), - NaN,# x_abschange(state), - false,# f_converged, - 0.0,#T(options.f_tol), - 0.0,#T(options.f_tol), - NaN,#f_abschange(d, state), - NaN,#f_abschange(d, state), - false,#g_converged, - 0.0,#T(options.g_tol), - NaN,#g_residual(d), - false, #f_increased, - tr, - f_calls(d), - g_calls(d), - h_calls(d), - true, - options.time_limit, - _time-t0,) + if verbose + println(hline) + println("SAMIN results") + println("NO CONVERGENCE: MAXEVALS exceeded") + @printf("\n Obj. value: %16.5f\n\n", fopt) + println(" parameter search width") + for i=1:n + @printf("%16.5f %16.5f \n", xopt[i], bounds[i]) end + println(hline) end + converge = 0 + + return MultivariateOptimizationResults(method, + x0,# initial_x, + xopt, #pick_best_x(f_incr_pick, state), + fopt, # pick_best_f(f_incr_pick, state, d), + iteration, #iteration, + iteration >= options.iterations, #iteration == options.iterations, + false, # x_converged, + 0.0,#T(options.x_tol), + 0.0,#T(options.x_tol), + NaN,# x_abschange(state), + NaN,# x_abschange(state), + false,# f_converged, + 0.0,#T(options.f_tol), + 0.0,#T(options.f_tol), + NaN,#f_abschange(d, state), + NaN,#f_abschange(d, state), + false,#g_converged, + 0.0,#T(options.g_tol), + NaN,#g_residual(d), + false, #f_increased, + tr, + iteration, + 0, + 0, + true, + options.time_limit, + _time-t0,) end # Adjust bounds so that approximately half of all evaluations are accepted test = 0 @@ -225,6 +274,9 @@ function optimize(obj_fn, lb::AbstractArray, ub::AbstractArray, x::AbstractArray test += 1 # make sure coverage check passes for the fixed parameters end end + !isempty(workers) && take!(lb_ub_bounds_chan) # clear bounds + !isempty(workers) && put!(lb_ub_bounds_chan,(lb,ub,bounds)) # populate new bounds + nacp = 0 # set back to zero # check if we cover parameter space, if we have yet to do so if !coverage_ok @@ -318,12 +370,16 @@ function optimize(obj_fn, lb::AbstractArray, ub::AbstractArray, x::AbstractArray x = xopt end end + # tell the workers to stop + !isempty(workers) && take!(stop_sig) + !isempty(workers) && put!(stop_sig,true) + return MultivariateOptimizationResults(method, x0,# initial_x, xopt, #pick_best_x(f_incr_pick, state), fopt, # pick_best_f(f_incr_pick, state, d), - f_calls(d), #iteration, - f_calls(d) >= options.iterations, #iteration == options.iterations, + iteration, #iteration, + iteration >= options.iterations, #iteration == options.iterations, false, # x_converged, 0.0,#T(options.x_tol), 0.0,#T(options.x_tol), @@ -339,9 +395,9 @@ function optimize(obj_fn, lb::AbstractArray, ub::AbstractArray, x::AbstractArray NaN,#g_residual(d), false, #f_increased, tr, - f_calls(d), - g_calls(d), - h_calls(d), + iteration, + 0, + 0, true, options.time_limit, _time-t0,) diff --git a/test/multivariate/solvers/constrained/samin.jl b/test/multivariate/solvers/constrained/samin.jl index 657166fb6..ae5fa287d 100644 --- a/test/multivariate/solvers/constrained/samin.jl +++ b/test/multivariate/solvers/constrained/samin.jl @@ -1,4 +1,5 @@ @testset "SAMIN" begin + using Optim, Distributed prob = MVP.UnconstrainedProblems.examples["Himmelblau"] xtrue = prob.solutions @@ -6,4 +7,80 @@ x0 = prob.initial_x res = optimize(f, x0.-100., x0.+100.0, x0, Optim.SAMIN(), Optim.Options(iterations=1000000)) @test Optim.minimum(res) < 1e-6 + + #test distributed evaluation + w= addprocs(4) + @everywhere begin + + using Optim, OptimTestProblems, OptimTestProblems.MultivariateProblems + const MVP = MultivariateProblems + prob = MVP.UnconstrainedProblems.examples["Himmelblau"] + xtrue = prob.solutions + f = OptimTestProblems.MultivariateProblems.objective(prob) + x0 = prob.initial_x + end + res = optimize(f, x0.-100., x0.+100.0, x0, Optim.SAMIN(workers=w), Optim.Options(iterations=1000000)) + @test Optim.minimum(res) < 1e-6 + rmprocs(w) +end +#= +using BenchmarkTools, Plots, Distributed, Statistics + +@everywhere using OptimTestProblems, OptimTestProblems.MultivariateProblems + +1 + end +samples=50 +wrkrs=8 +iters=1000 + +addprocs(wrkrs) +w=workers() +@everywhere using Pkg +@everywhere Pkg.activate(".") +@everywhere include("src/Optim.jl") +const MVP = MultivariateProblems +@everywhere begin + const MVP = MultivariateProblems + prob = MVP.UnconstrainedProblems.examples["Himmelblau"] + + xtrue = prob.solutions + f = OptimTestProblems.MultivariateProblems.objective(prob) + x0 = prob.initial_x +end +@everywhere function fu(u,wait=1e-4) + sleep(wait) + f(u) +end +@everywhere fu(u) = fu(u,1e-5) + +r=[] +opts= Array{Array{Float64,1},1}() +[push!(opts,[]) for i in 1:(wrkrs+1)] +push!(r,@benchmarkable push!(opts[1],Optim.minimum(Optim.optimize(fu, x0.-100., x0.+100.0, x0, Optim.SAMIN(ns=20), Optim.Options(iterations=iters))))) +for i in 1:8 + a= @benchmarkable push!(opts[$i+1],Optim.minimum(Optim.optimize(fu, x0.-100., x0.+100.0, x0, Optim.SAMIN(ns=20,workers = workers()[1:$i]), Optim.Options(iterations=iters)))) + push!(r,a) +end +aa= BenchmarkTools.run.(r,samples=samples) + + +plot( + 0:(wrkrs), + [minimum(i.times) for i in aa]./mean(aa[1].times), + title = "Distributed-SAMIN", + label= "rel. runtime", + lw = 1, + ylabel="rel. runtime", + xlabel = "Workers", + marker = (:dot, 2, 0.6), + + ribbon= [std(i.times) for i in aa]./mean(aa[1].times), + ) +plot!(0:(wrkrs), NaN.*(1:10), label = "f_min", marker = (:dot,:green, 3, 0.6),linecolor=:green, grid=false, legend=:right) +p=plot!(twinx(),0:(wrkrs), ylabel="f_min", [mean(i) for i in opts],yerror= [std(i) for i in opts], legend=false, marker = (:dot,:green, 3, 0.6),linecolor=:green) + +savefig(p,"~/p.svg") +=# + diff --git a/test/runtests.jl b/test/runtests.jl index c2a469fa7..3bce2f139 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -3,7 +3,7 @@ using Optim using OptimTestProblems using OptimTestProblems.MultivariateProblems const MVP = MultivariateProblems - +using Distributed: @everywhere, addprocs, rmprocs import PositiveFactorizations: Positive, cholesky # for the IPNewton tests using Random