Skip to content

Commit fd3ac58

Browse files
authored
Detect and throw an error when multithreading with Boxed variables (#141)
* Detect and throw an error when multithreading with `Box`ed variables * elaborate on error msg * don't try to downgrade `Test` * don't escape `lock` * update Changelog * bump major version, add ScopedValues dep for v1.10 * add docs on boxing * add `@allow_boxed_captures` and `@disallow_boxed_captures` * include `@allow_boxed_captures` and `@disallow_boxed_captures` docstrings
1 parent cfdf596 commit fd3ac58

File tree

13 files changed

+473
-85
lines changed

13 files changed

+473
-85
lines changed

.github/workflows/downgrade_CI.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ jobs:
2323
version: ${{ matrix.version }}
2424
- uses: cjdoris/julia-downgrade-compat-action@v1
2525
with:
26-
skip: Pkg,TOML
26+
skip: Pkg,TOML,Test
2727
- uses: julia-actions/julia-buildpkg@v1
2828
- uses: julia-actions/julia-runtest@v1

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
OhMyThreads.jl Changelog
22
=========================
33

4+
Version 0.8.0
5+
-------------
6+
- ![BREAKING][badge-breaking] We now detect and throw errors if an `OhMyThreads` parallel function is passed a closure containing a `Box`ed variable. This behaviour can be disabled with the new `@allow_boxed_captures` macro, and re-enabled with `@disallow_boxed_captures`. ([#141][gh-pr-141])
7+
48
Version 0.7.0
59
-------------
610
- ![BREAKING][badge-breaking] We now use ChunkSplitters version 3.0. The function `OhMyThreads.chunks` has been renamed to `OhMyThreads.index_chunks`. The new functions `index_chunks` and `chunks` (different from the old one with the same name!) are now exported. See ChunkSplitters.jl for more information.
@@ -139,3 +143,4 @@ Version 0.2.0
139143

140144
[gh-pr-5]: https://github.com/JuliaFolds2/OhMyThreads.jl/pull/5
141145
[gh-pr-121]: https://github.com/JuliaFolds2/OhMyThreads.jl/pull/121
146+
[gh-pr-141]: https://github.com/JuliaFolds2/OhMyThreads.jl/pull/141

Project.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
name = "OhMyThreads"
22
uuid = "67456a42-1dca-4109-a031-0a68de7e3ad5"
33
authors = ["Carsten Bauer <[email protected]>", "Mason Protter <[email protected]>"]
4-
version = "0.7.0"
4+
version = "0.8.0"
55

66
[deps]
77
BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66"
88
ChunkSplitters = "ae650224-84b6-46f8-82ea-d812ca08434e"
9+
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
910
StableTasks = "91464d47-22a1-43fe-8b7f-2d57ee82463f"
1011
TaskLocalValues = "ed4db957-447d-4319-bfb6-7fa9ae7ecf34"
1112

@@ -15,6 +16,7 @@ BangBang = "0.3.40, 0.4"
1516
ChunkSplitters = "3"
1617
StableTasks = "0.1.5"
1718
TaskLocalValues = "0.1"
19+
ScopedValues = "1.3"
1820
Test = "1"
1921
julia = "1.10"
2022

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ makedocs(;
2727
"Trapezoidal Integration" => "literate/integration/integration.md"
2828
],
2929
"Translation Guide" => "translation.md",
30+
"Boxed Variables" => "literate/boxing/boxing.md",
3031
"Thread-Safe Storage" => "literate/tls/tls.md",
3132
"False Sharing" => "literate/falsesharing/falsesharing.md",
3233
# "Explanations" => [

docs/src/literate/boxing/Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[deps]
2+
OhMyThreads = "67456a42-1dca-4109-a031-0a68de7e3ad5"

docs/src/literate/boxing/boxing.jl

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#====================================
2+
# Boxed Variables
3+
4+
All multithreading in julia is built around the idea of passing around
5+
and executing functions, but often these functions "enclose" data from
6+
an outer local scope, making them what's called a "closure".
7+
8+
Julia allows functions which capture variables to re-bind those variables
9+
to different values, but doing so can cause subtle race conditions in
10+
multithreaded code.
11+
12+
Consider the following example:
13+
====================================#
14+
15+
let out = zeros(Int, 10)
16+
Threads.@threads for i in 1:10
17+
A = i
18+
sleep(1/100)
19+
out[i] = A
20+
end
21+
A = 1
22+
out
23+
end
24+
25+
#====================================
26+
You may have expected that to return `[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]`,
27+
but the nonsense result is caused by `A` actually being a shared mutable
28+
container here which all the parallel tasks are accessing and mutating
29+
in parallel, giving unpredictable results.
30+
31+
OhMyThreads.jl tries to protect users from this surprising behaviour:
32+
====================================#
33+
using OhMyThreads
34+
35+
try
36+
let
37+
## this throws an error!
38+
out = tmap(1:10) do i
39+
A = i
40+
sleep(1/100)
41+
A
42+
end
43+
A = 1
44+
out
45+
end
46+
catch e;
47+
println(e.msg) # show that error
48+
end
49+
50+
#====================================
51+
If you really desire to bypass this behaviour, you can use the
52+
`@allow_boxed_captures` macro
53+
====================================#
54+
55+
@allow_boxed_captures let
56+
out = tmap(1:10) do i
57+
A = i
58+
sleep(1/100)
59+
A
60+
end
61+
A = 1
62+
out
63+
end

docs/src/literate/boxing/boxing.md

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
```@meta
2+
EditURL = "boxing.jl"
3+
```
4+
5+
# Boxed Variables
6+
7+
All multithreading in julia is built around the idea of passing around
8+
and executing functions, but often these functions "enclose" data from
9+
an outer local scope, making them what's called a "closure".
10+
11+
Julia allows functions which capture variables to re-bind those variables
12+
to different values, but doing so can cause subtle race conditions in
13+
multithreaded code.
14+
15+
Consider the following example:
16+
17+
````julia
18+
let out = zeros(Int, 10)
19+
Threads.@threads for i in 1:10
20+
A = i
21+
sleep(1/100)
22+
out[i] = A
23+
end
24+
A = 1
25+
out
26+
end
27+
````
28+
29+
````
30+
10-element Vector{Int64}:
31+
6
32+
2
33+
3
34+
2
35+
3
36+
2
37+
3
38+
2
39+
3
40+
4
41+
````
42+
43+
You may have expected that to return `[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]`,
44+
but the nonsense result is caused by `A` actually being a shared mutable
45+
container here which all the parallel tasks are accessing and mutating
46+
in parallel, giving unpredictable results.
47+
48+
OhMyThreads.jl tries to protect users from this surprising behaviour:
49+
50+
````julia
51+
using OhMyThreads
52+
53+
try
54+
let
55+
# this throws an error!
56+
out = tmap(1:10) do i
57+
A = i
58+
sleep(1/100)
59+
A
60+
end
61+
A = 1
62+
out
63+
end
64+
catch e;
65+
println(e.msg) # show that error
66+
end
67+
````
68+
69+
````
70+
Attempted to capture and modify outer local variable(s) A, which would be not only slow, but could also cause a race condition. Consider marking these variables as local inside their respective closure, or redesigning your code to avoid the race condition.
71+
72+
If these variables are inside a @one_by_one or @only_one block, consider using a mutable Ref instead of re-binding the variable.
73+
74+
This error can be bypassed with the @allow_boxed_captures macro.
75+
76+
````
77+
78+
If you really desire to bypass this behaviour, you can use the
79+
`@allow_boxed_captures` macro
80+
81+
````julia
82+
@allow_boxed_captures let
83+
out = tmap(1:10) do i
84+
A = i
85+
sleep(1/100)
86+
A
87+
end
88+
A = 1
89+
out
90+
end
91+
````
92+
93+
````
94+
10-element Vector{Int64}:
95+
7
96+
6
97+
7
98+
6
99+
7
100+
6
101+
7
102+
6
103+
7
104+
7
105+
````
106+
107+
---
108+
109+
*This page was generated using [Literate.jl](https://github.com/fredrikekre/Literate.jl).*
110+

docs/src/refs/api.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ CollapsedDocStrings = true
1313
@local
1414
@only_one
1515
@one_by_one
16+
@allow_boxed_captures
17+
@disallow_boxed_captures
1618
```
1719

1820
### Functions

src/OhMyThreads.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ export chunks, index_chunks
1515

1616
using TaskLocalValues: TaskLocalValues
1717
const TaskLocalValue = TaskLocalValues.TaskLocalValue
18+
19+
using ScopedValues: ScopedValues, ScopedValue, @with
20+
1821
include("types.jl")
1922
include("functions.jl")
2023
include("macros.jl")
@@ -26,7 +29,7 @@ using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler
2629
include("implementation.jl")
2730
include("experimental.jl")
2831

29-
export @tasks, @set, @local, @one_by_one, @only_one
32+
export @tasks, @set, @local, @one_by_one, @only_one, @allow_boxed_captures, @disallow_boxed_captures
3033
export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect
3134
export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, SerialScheduler
3235

src/implementation.jl

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module Implementation
22

33
import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcollect
4-
using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local, ChannelLike
4+
using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local, ChannelLike, allowing_boxed_captures
55
using OhMyThreads.Tools: nthtid
66
using OhMyThreads: Scheduler,
77
DynamicScheduler, StaticScheduler, GreedyScheduler,
@@ -93,6 +93,7 @@ end
9393

9494
treducemap(op, f, A...; kwargs...) = tmapreduce(f, op, A...; kwargs...)
9595

96+
9697
# DynamicScheduler: AbstractArray/Generic
9798
function _tmapreduce(f,
9899
op,
@@ -102,6 +103,7 @@ function _tmapreduce(f,
102103
mapreduce_kwargs)::OutputType where {OutputType}
103104
(; threadpool) = scheduler
104105
check_all_have_same_indices(Arrs)
106+
throw_if_boxed_captures(f, op)
105107
if chunking_enabled(scheduler)
106108
tasks = map(_index_chunks(scheduler, first(Arrs))) do inds
107109
args = map(A -> view(A, inds), Arrs)
@@ -128,6 +130,7 @@ function _tmapreduce(f,
128130
scheduler::DynamicScheduler,
129131
mapreduce_kwargs)::OutputType where {OutputType, T}
130132
(; threadpool) = scheduler
133+
throw_if_boxed_captures(f, op)
131134
tasks = map(only(Arrs)) do idcs
132135
@spawn threadpool promise_task_local(f)(idcs)
133136
end
@@ -143,6 +146,7 @@ function _tmapreduce(f,
143146
mapreduce_kwargs)::OutputType where {OutputType}
144147
nt = nthreads()
145148
check_all_have_same_indices(Arrs)
149+
throw_if_boxed_captures(f, op)
146150
if chunking_enabled(scheduler)
147151
tasks = map(enumerate(_index_chunks(scheduler, first(Arrs)))) do (c, inds)
148152
tid = @inbounds nthtid(mod1(c, nt))
@@ -175,6 +179,7 @@ function _tmapreduce(f,
175179
scheduler::StaticScheduler,
176180
mapreduce_kwargs)::OutputType where {OutputType, T}
177181
check_all_have_same_indices(Arrs)
182+
throw_if_boxed_captures(f, op)
178183
chnks = only(Arrs)
179184
nt = nthreads()
180185
tasks = map(enumerate(chnks)) do (c, idcs)
@@ -227,6 +232,7 @@ function _tmapreduce(f,
227232
ntasks = min(length(first(Arrs)), ntasks_desired)
228233
ch_len = length(first(Arrs))
229234
end
235+
throw_if_boxed_captures(f, op)
230236
# TODO: Use ChannelLike for iterators that support it. Dispatch on IndexLinear?
231237
ch = Channel{Tuple{eltype.(Arrs)...}}(ch_len; spawn = true) do ch
232238
for args in zip(Arrs...)
@@ -272,6 +278,7 @@ function _tmapreduce(f,
272278
throw(ArgumentError("SizeUnkown iterators in combination with a greedy scheduler and chunking are currently not supported."))
273279
end
274280
check_all_have_same_indices(Arrs)
281+
throw_if_boxed_captures(f, op)
275282
chnks = _index_chunks(scheduler, first(Arrs))
276283
ntasks_desired = scheduler.ntasks
277284
ntasks = min(length(chnks), ntasks_desired)
@@ -316,6 +323,33 @@ function check_all_have_same_indices(Arrs)
316323
end
317324
end
318325

326+
327+
function throw_if_boxed_captures(f)
328+
if allowing_boxed_captures[]
329+
return nothing
330+
end
331+
T = typeof(f)
332+
if any(FT -> FT <: Core.Box, fieldtypes(T))
333+
boxed_fields = join((fieldname(T, i) for i in 1:fieldcount(T) if fieldtype(T,i) <: Core.Box), ", ")
334+
error("Attempted to capture and modify outer local variable(s) $boxed_fields, which would be not only slow, but could also cause a race condition. Consider marking these variables as local inside their respective closure, or redesigning your code to avoid the race condition.\n\nIf these variables are inside a @one_by_one or @only_one block, consider using a mutable Ref instead of re-binding the variable.\n\nThis error can be bypassed with the @allow_boxed_captures macro.")
335+
end
336+
for i 1:fieldcount(T)
337+
# recurse into nested captured functions.
338+
if fieldtype(T, i) <: Function
339+
f_inner = getfield(f, i)
340+
if f !== f_inner
341+
# don't recurse into self!
342+
throw_if_boxed_captures(getfield(f, i))
343+
end
344+
end
345+
end
346+
end
347+
348+
function throw_if_boxed_captures(f, fs...)
349+
throw_if_boxed_captures(f)
350+
throw_if_boxed_captures(fs...)
351+
end
352+
319353
#-------------------------------------------------------------
320354

321355
function treduce(op, A...; kwargs...)
@@ -401,6 +435,7 @@ function _tmap(scheduler::DynamicScheduler{NoChunking},
401435
_Arrs::AbstractArray...;)
402436
(; threadpool) = scheduler
403437
Arrs = (A, _Arrs...)
438+
throw_if_boxed_captures(f)
404439
tasks = map(eachindex(A)) do i
405440
@spawn threadpool begin
406441
args = map(A -> A[i], Arrs)
@@ -417,6 +452,7 @@ function _tmap(scheduler::DynamicScheduler{NoChunking},
417452
A::Union{AbstractChunks, ChunkSplitters.Internals.Enumerate},
418453
_Arrs::AbstractArray...)
419454
(; threadpool) = scheduler
455+
throw_if_boxed_captures(f)
420456
tasks = map(A) do idcs
421457
@spawn threadpool promise_task_local(f)(idcs)
422458
end
@@ -429,6 +465,7 @@ function _tmap(scheduler::StaticScheduler{NoChunking},
429465
A::AbstractChunks,
430466
_Arrs::AbstractArray...)
431467
nt = nthreads()
468+
throw_if_boxed_captures(f)
432469
tasks = map(enumerate(A)) do (c, idcs)
433470
tid = @inbounds nthtid(mod1(c, nt))
434471
@spawnat tid promise_task_local(f)(idcs)
@@ -443,6 +480,7 @@ function _tmap(scheduler::StaticScheduler{NoChunking},
443480
_Arrs::AbstractArray...;)
444481
Arrs = (A, _Arrs...)
445482
nt = nthreads()
483+
throw_if_boxed_captures(f)
446484
tasks = map(enumerate(A)) do (c, i)
447485
tid = @inbounds nthtid(mod1(c, nt))
448486
@spawnat tid begin
@@ -485,6 +523,7 @@ end
485523
map!(f, out, Arrs...)
486524
else
487525
@boundscheck check_all_have_same_indices((out, Arrs...))
526+
throw_if_boxed_captures(f)
488527
mapping_f = maybe_rewrap(f) do f
489528
function mapping_function(i)
490529
args = map(A -> @inbounds(A[i]), Arrs)

0 commit comments

Comments
 (0)