@@ -146,184 +146,3 @@ iscompatible_arg(proc::OSProc, opts, args...) =
146146 any (child->
147147 all (arg-> iscompatible_arg (child, opts, arg), args),
148148 children (proc))
149-
150- """
151- ThreadProc <: Processor
152-
153- Julia CPU (OS) thread, identified by Julia thread ID.
154- """
155- struct ThreadProc <: Processor
156- owner:: Int
157- tid:: Int
158- end
159- iscompatible (proc:: ThreadProc , opts, f, args... ) = true
160- iscompatible_func (proc:: ThreadProc , opts, f) = true
161- iscompatible_arg (proc:: ThreadProc , opts, x) = true
162- function execute! (proc:: ThreadProc , @nospecialize (f), @nospecialize (args... ); @nospecialize (kwargs... ))
163- tls = get_tls ()
164- task = Task () do
165- set_tls! (tls)
166- TimespanLogging. prof_task_put! (tls. sch_handle. thunk_id. id)
167- @invokelatest f (args... ; kwargs... )
168- end
169- set_task_tid! (task, proc. tid)
170- schedule (task)
171- try
172- fetch (task)
173- catch err
174- @static if VERSION < v " 1.7-rc1"
175- stk = Base. catch_stack (task)
176- else
177- stk = Base. current_exceptions (task)
178- end
179- err, frames = stk[1 ]
180- rethrow (CapturedException (err, frames))
181- end
182- end
183- get_parent (proc:: ThreadProc ) = OSProc (proc. owner)
184- default_enabled (proc:: ThreadProc ) = true
185-
186- # TODO : ThreadGroupProc?
187-
188- """
189- Context(xs::Vector{OSProc}) -> Context
190- Context(xs::Vector{Int}) -> Context
191-
192- Create a Context, by default adding each available worker.
193-
194- It is also possible to create a Context from a vector of [`OSProc`](@ref),
195- or equivalently the underlying process ids can also be passed directly
196- as a `Vector{Int}`.
197-
198- Special fields include:
199- - 'log_sink': A log sink object to use, if any.
200- - `log_file::Union{String,Nothing}`: Path to logfile. If specified, at
201- scheduler termination, logs will be collected, combined with input thunks, and
202- written out in DOT format to this location.
203- - `profile::Bool`: Whether or not to perform profiling with Profile stdlib.
204- """
205- mutable struct Context
206- procs:: Vector{Processor}
207- proc_lock:: ReentrantLock
208- proc_notify:: Threads.Condition
209- log_sink:: Any
210- log_file:: Union{String,Nothing}
211- profile:: Bool
212- options
213- end
214-
215- Context (procs:: Vector{P} = Processor[OSProc (w) for w in procs ()];
216- proc_lock= ReentrantLock (), proc_notify= Threads. Condition (),
217- log_sink= TimespanLogging. NoOpLog (), log_file= nothing , profile= false ,
218- options= nothing ) where {P<: Processor } =
219- Context (procs, proc_lock, proc_notify, log_sink, log_file,
220- profile, options)
221- Context (xs:: Vector{Int} ; kwargs... ) = Context (map (OSProc, xs); kwargs... )
222- Context (ctx:: Context , xs:: Vector = copy (procs (ctx))) = # make a copy
223- Context (xs; log_sink= ctx. log_sink, log_file= ctx. log_file,
224- profile= ctx. profile, options= ctx. options)
225-
226- const GLOBAL_CONTEXT = Ref {Context} ()
227- function global_context ()
228- if ! isassigned (GLOBAL_CONTEXT)
229- GLOBAL_CONTEXT[] = Context ()
230- end
231- return GLOBAL_CONTEXT[]
232- end
233-
234- """
235- lock(f, ctx::Context)
236-
237- Acquire `ctx.proc_lock`, execute `f` with the lock held, and release the lock
238- when `f` returns.
239- """
240- Base. lock (f, ctx:: Context ) = lock (f, ctx. proc_lock)
241-
242- """
243- procs(ctx::Context)
244-
245- Fetch the list of procs currently known to `ctx`.
246- """
247- procs (ctx:: Context ) = lock (ctx) do
248- copy (ctx. procs)
249- end
250-
251- """
252- addprocs!(ctx::Context, xs)
253-
254- Add new workers `xs` to `ctx`.
255-
256- Workers will typically be assigned new tasks in the next scheduling iteration
257- if scheduling is ongoing.
258-
259- Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
260- """
261- addprocs! (ctx:: Context , xs:: AbstractVector{<:Integer} ) = addprocs! (ctx, map (OSProc, xs))
262- function addprocs! (ctx:: Context , xs:: AbstractVector{<:OSProc} )
263- lock (ctx) do
264- append! (ctx. procs, xs)
265- end
266- lock (ctx. proc_notify) do
267- notify (ctx. proc_notify)
268- end
269- end
270-
271- """
272- rmprocs!(ctx::Context, xs)
273-
274- Remove the specified workers `xs` from `ctx`.
275-
276- Workers will typically finish all their assigned tasks if scheduling is ongoing
277- but will not be assigned new tasks after removal.
278-
279- Workers can be either `Processor`s or the underlying process IDs as `Integer`s.
280- """
281- rmprocs! (ctx:: Context , xs:: AbstractVector{<:Integer} ) = rmprocs! (ctx, map (OSProc, xs))
282- function rmprocs! (ctx:: Context , xs:: AbstractVector{<:OSProc} )
283- lock (ctx) do
284- filter! (p -> (p ∉ xs), ctx. procs)
285- end
286- lock (ctx. proc_notify) do
287- notify (ctx. proc_notify)
288- end
289- end
290-
291- # In-Thunk Helpers
292-
293- """
294- thunk_processor()
295-
296- Get the current processor executing the current thunk.
297- """
298- thunk_processor () = task_local_storage (:_dagger_processor ):: Processor
299-
300- """
301- in_thunk()
302-
303- Returns `true` if currently in a [`Thunk`](@ref) process, else `false`.
304- """
305- in_thunk () = haskey (task_local_storage (), :_dagger_sch_uid )
306-
307- """
308- get_tls()
309-
310- Gets all Dagger TLS variable as a `NamedTuple`.
311- """
312- get_tls () = (
313- sch_uid= task_local_storage (:_dagger_sch_uid ),
314- sch_handle= task_local_storage (:_dagger_sch_handle ),
315- processor= thunk_processor (),
316- task_spec= task_local_storage (:_dagger_task_spec ),
317- )
318-
319- """
320- set_tls!(tls)
321-
322- Sets all Dagger TLS variables from the `NamedTuple` `tls`.
323- """
324- function set_tls! (tls)
325- task_local_storage (:_dagger_sch_uid , tls. sch_uid)
326- task_local_storage (:_dagger_sch_handle , tls. sch_handle)
327- task_local_storage (:_dagger_processor , tls. processor)
328- task_local_storage (:_dagger_task_spec , tls. task_spec)
329- end
0 commit comments