Skip to content

Commit b7201d6

Browse files
vtjnashJeffBezanson
authored andcommitted
threading: support more than nthreads at runtime
Hook a couple functions (notably cfunction) to handle adopting foreign threads automatically when used. n.b. If returning an object pointer, we do not gc_unsafe_leave afterwards as that would render the pointer invalid. However, this means that it can be a long time before the next safepoint (if ever). We should look into ways of improving this bad situation, such as pinning only that specific object temporarily. n.b. There are some remaining issues to clean up. For example, we may trap pages in the ptls after GC to keep them "warm", and trap other pages in the unwind buffer, etc.
1 parent 1755994 commit b7201d6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+864
-515
lines changed

NEWS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ Multi-threading changes
5858
An interactive task desires low latency and implicitly agrees to be short duration or to
5959
yield frequently. Interactive tasks will run on interactive threads, if any are specified
6060
when Julia is started ([#42302]).
61+
* Threads started outside the Julia runtime (e.g. from C or Java) can now become able to
62+
call into Julia code by calling `jl_adopt_thread`. This is done automatically when
63+
entering Julia code via `cfunction` or a `@ccallable` entry point. As a consequence, the
64+
number of threads can now change during execution ([#46609]).
6165

6266
Build system changes
6367
--------------------

base/deprecated.jl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,4 +336,34 @@ function setproperty!(ci::CodeInfo, s::Symbol, v)
336336
return setfield!(ci, s, convert(fieldtype(CodeInfo, s), v))
337337
end
338338

339+
@eval Threads nthreads() = threadpoolsize()
340+
341+
@eval Threads begin
342+
"""
343+
resize_nthreads!(A, copyvalue=A[1])
344+
345+
Resize the array `A` to length [`nthreads()`](@ref). Any new
346+
elements that are allocated are initialized to `deepcopy(copyvalue)`,
347+
where `copyvalue` defaults to `A[1]`.
348+
349+
This is typically used to allocate per-thread variables, and
350+
should be called in `__init__` if `A` is a global constant.
351+
352+
!!! warning
353+
354+
This function is deprecated, since as of Julia v1.9 the number of
355+
threads can change at run time. Instead, per-thread state should be
356+
created as needed based on the thread id of the caller.
357+
"""
358+
function resize_nthreads!(A::AbstractVector, copyvalue=A[1])
359+
nthr = nthreads()
360+
nold = length(A)
361+
resize!(A, nthr)
362+
for i = nold+1:nthr
363+
A[i] = deepcopy(copyvalue)
364+
end
365+
return A
366+
end
367+
end
368+
339369
# END 1.9 deprecations

base/partr.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module Partr
44

5-
using ..Threads: SpinLock, nthreads, threadid
5+
using ..Threads: SpinLock, maxthreadid, threadid
66

77
# a task minheap
88
mutable struct taskheap

base/pcre.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ THREAD_MATCH_CONTEXTS::Vector{Ptr{Cvoid}} = [C_NULL]
2929
PCRE_COMPILE_LOCK = nothing
3030

3131
_tid() = Int(ccall(:jl_threadid, Int16, ())) + 1
32-
_nth() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))
32+
_mth() = Int(Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire))
3333

3434
function get_local_match_context()
3535
tid = _tid()
@@ -41,7 +41,7 @@ function get_local_match_context()
4141
try
4242
ctxs = THREAD_MATCH_CONTEXTS
4343
if length(ctxs) < tid
44-
global THREAD_MATCH_CONTEXTS = ctxs = copyto!(fill(C_NULL, _nth()), ctxs)
44+
global THREAD_MATCH_CONTEXTS = ctxs = copyto!(fill(C_NULL, length(ctxs) + _mth()), ctxs)
4545
end
4646
finally
4747
unlock(l)

base/task.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ function workqueue_for(tid::Int)
754754
@lock l begin
755755
qs = Workqueues
756756
if length(qs) < tid
757-
nt = Threads.nthreads()
757+
nt = Threads.maxthreadid()
758758
@assert tid <= nt
759759
global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs)
760760
end
@@ -767,7 +767,7 @@ end
767767

768768
function enq_work(t::Task)
769769
(t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable")
770-
if t.sticky || Threads.nthreads() == 1
770+
if t.sticky || Threads.threadpoolsize() == 1
771771
tid = Threads.threadid(t)
772772
if tid == 0
773773
# Issue #41324

base/threadingconstructs.jl

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,27 @@ ID `1`.
1111
"""
1212
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)
1313

14+
# lower bound on the largest threadid()
1415
"""
15-
Threads.nthreads([:default|:interactive]) -> Int
16+
Threads.maxthreadid() -> Int
1617
17-
Get the number of threads (across all thread pools or within the specified
18-
thread pool) available to Julia. The number of threads across all thread
19-
pools is the inclusive upper bound on [`threadid()`](@ref).
18+
Get a lower bound on the number of threads (across all thread pools) available
19+
to the Julia process, with atomic-acquire semantics. The result will always be
20+
greater than or equal to [`threadid()`](@ref) as well as `threadid(task)` for
21+
any task you were able to observe before calling `maxthreadid`.
22+
"""
23+
maxthreadid() = Int(Core.Intrinsics.atomic_pointerref(cglobal(:jl_n_threads, Cint), :acquire))
2024

21-
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
22-
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
23-
[`Distributed`](@ref man-distributed) standard library.
2425
"""
25-
function nthreads end
26+
Threads.nthreads(:default | :interactive) -> Int
2627
27-
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))
28+
Get the current number of threads within the specified thread pool. The threads in default
29+
have id numbers `1:nthreads(:default)`.
30+
31+
See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref
32+
man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed)
33+
standard library and [`Threads.maxthreadid()`](@ref).
34+
"""
2835
function nthreads(pool::Symbol)
2936
if pool === :default
3037
tpid = Int8(0)
@@ -35,6 +42,7 @@ function nthreads(pool::Symbol)
3542
end
3643
return _nthreads_in_pool(tpid)
3744
end
45+
3846
function _nthreads_in_pool(tpid::Int8)
3947
p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
4048
return Int(unsafe_load(p, tpid + 1))
@@ -57,10 +65,20 @@ Returns the number of threadpools currently configured.
5765
"""
5866
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))
5967

68+
"""
69+
Threads.threadpoolsize()
70+
71+
Get the number of threads available to the Julia default worker-thread pool.
72+
73+
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
74+
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
75+
[`Distributed`](@ref man-distributed) standard library.
76+
"""
77+
threadpoolsize() = Threads._nthreads_in_pool(Int8(0))
6078

6179
function threading_run(fun, static)
6280
ccall(:jl_enter_threaded_region, Cvoid, ())
63-
n = nthreads()
81+
n = threadpoolsize()
6482
tasks = Vector{Task}(undef, n)
6583
for i = 1:n
6684
t = Task(() -> fun(i)) # pass in tid
@@ -93,7 +111,7 @@ function _threadsfor(iter, lbody, schedule)
93111
tid = 1
94112
len, rem = lenr, 0
95113
else
96-
len, rem = divrem(lenr, nthreads())
114+
len, rem = divrem(lenr, threadpoolsize())
97115
end
98116
# not enough iterations for all the threads?
99117
if len == 0
@@ -185,7 +203,7 @@ assumption may be removed in the future.
185203
This scheduling option is merely a hint to the underlying execution mechanism. However, a
186204
few properties can be expected. The number of `Task`s used by `:dynamic` scheduler is
187205
bounded by a small constant multiple of the number of available worker threads
188-
([`nthreads()`](@ref Threads.nthreads)). Each task processes contiguous regions of the
206+
([`Threads.threadpoolsize()`](@ref)). Each task processes contiguous regions of the
189207
iteration space. Thus, `@threads :dynamic for x in xs; f(x); end` is typically more
190208
efficient than `@sync for x in xs; @spawn f(x); end` if `length(xs)` is significantly
191209
larger than the number of the worker threads and the run-time of `f(x)` is relatively
@@ -222,15 +240,15 @@ julia> function busywait(seconds)
222240
223241
julia> @time begin
224242
Threads.@spawn busywait(5)
225-
Threads.@threads :static for i in 1:Threads.nthreads()
243+
Threads.@threads :static for i in 1:Threads.threadpoolsize()
226244
busywait(1)
227245
end
228246
end
229247
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
230248
231249
julia> @time begin
232250
Threads.@spawn busywait(5)
233-
Threads.@threads :dynamic for i in 1:Threads.nthreads()
251+
Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
234252
busywait(1)
235253
end
236254
end

base/threads.jl

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,4 @@ include("threadingconstructs.jl")
1111
include("atomics.jl")
1212
include("locks-mt.jl")
1313

14-
15-
"""
16-
resize_nthreads!(A, copyvalue=A[1])
17-
18-
Resize the array `A` to length [`nthreads()`](@ref). Any new
19-
elements that are allocated are initialized to `deepcopy(copyvalue)`,
20-
where `copyvalue` defaults to `A[1]`.
21-
22-
This is typically used to allocate per-thread variables, and
23-
should be called in `__init__` if `A` is a global constant.
24-
"""
25-
function resize_nthreads!(A::AbstractVector, copyvalue=A[1])
26-
nthr = nthreads()
27-
nold = length(A)
28-
resize!(A, nthr)
29-
for i = nold+1:nthr
30-
A[i] = deepcopy(copyvalue)
31-
end
32-
return A
33-
end
34-
3514
end

base/threads_overloads.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44
Threads.foreach(f, channel::Channel;
55
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
6-
ntasks=Threads.nthreads())
6+
ntasks=Base.threadpoolsize())
77
88
Similar to `foreach(f, channel)`, but iteration over `channel` and calls to
99
`f` are split across `ntasks` tasks spawned by `Threads.@spawn`. This function
@@ -40,7 +40,7 @@ collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256
4040
"""
4141
function Threads.foreach(f, channel::Channel;
4242
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
43-
ntasks=Threads.nthreads())
43+
ntasks=Threads.threadpoolsize())
4444
apply = _apply_for_schedule(schedule)
4545
stop = Threads.Atomic{Bool}(false)
4646
@sync for _ in 1:ntasks

cli/loader_exe.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ extern "C" {
1515
JULIA_DEFINE_FAST_TLS
1616

1717
#ifdef _COMPILER_ASAN_ENABLED_
18-
JL_DLLEXPORT const char* __asan_default_options()
18+
JL_DLLEXPORT const char* __asan_default_options(void)
1919
{
2020
return "allow_user_segv_handler=1:detect_leaks=0";
2121
// FIXME: enable LSAN after fixing leaks & defining __lsan_default_suppressions(),

contrib/generate_precompile.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# This file is a part of Julia. License is MIT: https://julialang.org/license
22

3-
if Threads.nthreads() != 1
4-
@warn "Running this file with multiple Julia threads may lead to a build error" Threads.nthreads()
3+
if Threads.maxthreadid() != 1
4+
@warn "Running this file with multiple Julia threads may lead to a build error" Base.maxthreadid()
55
end
66

77
if Base.isempty(Base.ARGS) || Base.ARGS[1] !== "0"
@@ -340,7 +340,7 @@ function generate_precompile_statements()
340340
# wait for the next prompt-like to appear
341341
readuntil(output_copy, "\n")
342342
strbuf = ""
343-
while true
343+
while !eof(output_copy)
344344
strbuf *= String(readavailable(output_copy))
345345
occursin(JULIA_PROMPT, strbuf) && break
346346
occursin(PKG_PROMPT, strbuf) && break

0 commit comments

Comments
 (0)