Skip to content

Commit 25fd1bb

Browse files
committed
Add eager versions of addprocs!()/rmprocs!()
1 parent 068a636 commit 25fd1bb

File tree

2 files changed

+78
-7
lines changed

2 files changed

+78
-7
lines changed

src/context.jl

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
export Context, addprocs!, rmprocs!
2+
13
"""
24
Context(xs::Vector{OSProc}) -> Context
35
Context(xs::Vector{Int}) -> Context
@@ -62,35 +64,104 @@ procs(ctx::Context) = lock(ctx) do
6264
end
6365

6466
"""
65-
addprocs!(ctx::Context, xs)
67+
addprocs!(xs)
6668
67-
Add new workers `xs` to `ctx`.
69+
Add new workers `xs` to the eager scheduler.
6870
6971
Workers will typically be assigned new tasks in the next scheduling iteration
7072
if scheduling is ongoing.
7173
7274
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
7375
"""
74-
addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs))
75-
function addprocs!(ctx::Context, xs::AbstractVector{<:OSProc})
76+
addprocs!(xs::AbstractVector{<:Integer}) = addprocs!(map(OSProc, xs))
77+
function addprocs!(xs::AbstractVector{<:OSProc})
78+
ctx = Sch.eager_context()
79+
state = Sch.EAGER_STATE[]
80+
81+
timespan_start(ctx, :addprocs!, nothing, nothing)
82+
83+
# Initialize new procs
84+
for p in xs
85+
Sch.init_proc(state, p, ctx.log_sink)
86+
87+
# Empty the processor cache list and force reschedule
88+
lock(state.lock) do
89+
state.procs_cache_list[] = nothing
90+
end
91+
put!(state.chan, Sch.RescheduleSignal())
92+
end
93+
7694
lock(ctx) do
7795
append!(ctx.procs, xs)
7896
end
7997
lock(ctx.proc_notify) do
8098
notify(ctx.proc_notify)
8199
end
100+
101+
timespan_finish(ctx, :addprocs!, nothing, nothing)
82102
end
83103

84104
"""
85-
rmprocs!(ctx::Context, xs)
105+
rmprocs!(xs)
86106
87-
Remove the specified workers `xs` from `ctx`.
107+
Remove the specified workers `xs` from the eager scheduler.
88108
89109
Workers will typically finish all their assigned tasks if scheduling is ongoing
90110
but will not be assigned new tasks after removal.
91111
92112
Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
93113
"""
114+
rmprocs!(xs::AbstractVector{<:Integer}) = rmprocs!(map(OSProc, xs))
115+
function rmprocs!(xs::AbstractVector{<:OSProc})
116+
ctx = Sch.eager_context()
117+
state = Sch.EAGER_STATE[]
118+
119+
timespan_start(ctx, :rmprocs!, nothing, nothing)
120+
121+
for p in xs
122+
Sch.cleanup_proc(state, p, ctx.log_sink)
123+
124+
# Empty the processor cache list
125+
lock(state.lock) do
126+
state.procs_cache_list[] = nothing
127+
end
128+
end
129+
130+
lock(ctx) do
131+
filter!(p -> (p xs), ctx.procs)
132+
end
133+
lock(ctx.proc_notify) do
134+
notify(ctx.proc_notify)
135+
end
136+
timespan_finish(ctx, :rmprocs!, nothing, nothing)
137+
end
138+
139+
"""
140+
addprocs!(ctx::Context, xs)
141+
142+
The lazy API version of [`addprocs!(xs)`](@ref).
143+
144+
!!! warning
145+
The lazy API is buggy and unsupported, prefer using the eager API.
146+
"""
147+
addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs))
148+
function addprocs!(ctx::Context, xs::AbstractVector{<:OSProc})
149+
lock(ctx) do
150+
append!(ctx.procs, xs)
151+
end
152+
lock(ctx.proc_notify) do
153+
notify(ctx.proc_notify)
154+
end
155+
end
156+
157+
"""
158+
rmprocs!(ctx::Context, xs)
159+
160+
The lazy API version of [`rmprocs!(xs)`](@ref).
161+
162+
!!! warning
163+
The lazy API is buggy and unsupported, prefer using the eager API.
164+
"""
94165
rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs))
95166
function rmprocs!(ctx::Context, xs::AbstractVector{<:OSProc})
96167
lock(ctx) do

src/processor.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
export OSProc, Context, addprocs!, rmprocs!
1+
export OSProc
22

33
import Base: @invokelatest
44

0 commit comments

Comments
 (0)