Skip to content

Commit af51162

Browse files
authored
SimpleBarrier and @barrier (#97)
* add SimpleBarrier to tools * barrier but hardcoded ntasks * barrier(n) where n is given or deduced from set ntasks=... + some tests * few more tests * changelog entry (experimental) * drop atbarrier(n) * drop unneccessary comments * update warning in docstring * add OhMyThreads.Experimental and move barrier macro there * require explicit loading of atbarrier * docstring update
1 parent 9c772fe commit af51162

File tree

10 files changed

+212
-9
lines changed

10 files changed

+212
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Version 0.5.1
55
-------------
66
- ![Feature][badge-feature] Within a parallel `@tasks` block one can now mark a region with `@one_by_one`. This region will be run by one task at a time ("critical region").
77
- ![Feature][badge-feature] Within a `@tasks` block one can now mark a region as with `@only_one`. This region will be run by a single parallel task only (other tasks will skip over it).
8+
- ![Experimental][badge-experimental] Added tentative support for `@barrier` in `@tasks` blocks. See `?OhMyThreads.Tools.@barrier` for more information. Note that this feature is experimental and **not** part of the public API (i.e. doesn't fall under SemVer).
89

910
Version 0.5.0
1011
-------------
@@ -97,6 +98,7 @@ Version 0.2.0
9798
[badge-breaking]: https://img.shields.io/badge/BREAKING-red.svg
9899
[badge-deprecation]: https://img.shields.io/badge/Deprecation-orange.svg
99100
[badge-feature]: https://img.shields.io/badge/Feature-green.svg
101+
[badge-experimental]: https://img.shields.io/badge/Experimental-yellow.svg
100102
[badge-enhancement]: https://img.shields.io/badge/Enhancement-blue.svg
101103
[badge-bugfix]: https://img.shields.io/badge/Bugfix-purple.svg
102104
[badge-fix]: https://img.shields.io/badge/Fix-purple.svg

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ makedocs(;
2424
# ],
2525
"API" => [
2626
"Public API" => "refs/api.md",
27+
"Experimental" => "refs/experimental.md",
2728
"Internal" => "refs/internal.md"
2829
]
2930
],

docs/src/refs/experimental.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
```@meta
2+
CollapsedDocStrings = true
3+
```
4+
5+
# Experimental
6+
7+
!!! warning
8+
**Everything on this page is experimental and might changed or dropped at any point!**
9+
10+
## References
11+
12+
```@autodocs
13+
Modules = [OhMyThreads, OhMyThreads.Experimental]
14+
Public = false
15+
Pages = ["OhMyThreads.jl", "experimental.jl"]
16+
```

docs/src/refs/internal.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ CollapsedDocStrings = true
55
# Internal
66

77
!!! warning
8-
**Everything on this page is internal and might change at any point!**
8+
**Everything on this page is internal and and might changed or dropped at any point!**
99

1010
## References
1111

src/OhMyThreads.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ include("schedulers.jl")
1919
using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler,
2020
SerialScheduler
2121
include("implementation.jl")
22+
include("experimental.jl")
2223

2324
export @tasks, @set, @local, @one_by_one, @only_one
2425
export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect

src/experimental.jl

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
module Experimental
2+
3+
"""
4+
@barrier
5+
6+
This can be used inside a `@tasks for ... end` to synchronize `n` parallel tasks.
7+
Specifically, a task can only pass the `@barrier` if `n-1` other tasks have reached it
8+
as well. The value of `n` is determined from `@set ntasks=...`, which
9+
is required if one wants to use `@barrier`.
10+
11+
Because this feature is experimental, it is required to load `@barrier` explicitly, e.g. via
12+
`using OhMyThreads.Experimental: @barrier`.
13+
14+
**WARNING:** It is the responsibility of the user to ensure that the right number of tasks
15+
actually reach the barrier. Otherwise, a **deadlock** can occur. In partictular, if the
16+
number of iterations is not a multiple of `n`, the last few iterations (remainder) will be
17+
run by less than `n` tasks which will never be able to pass a `@barrier`.
18+
19+
## Example
20+
21+
```julia
22+
using OhMyThreads: @tasks
23+
24+
# works
25+
@tasks for i in 1:20
26+
@set ntasks = 20
27+
28+
sleep(i * 0.2)
29+
println(i, ": before")
30+
@barrier
31+
println(i, ": after")
32+
end
33+
34+
# wrong - deadlock!
35+
@tasks for i in 1:22 # ntasks % niterations != 0
36+
@set ntasks = 20
37+
38+
println(i, ": before")
39+
@barrier
40+
println(i, ": after")
41+
end
42+
```
43+
"""
44+
macro barrier(args...)
45+
error("The @barrier macro may only be used inside of a @tasks block.")
46+
end
47+
48+
end # Experimental

src/macro_impl.jl

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using OhMyThreads.Tools: OnlyOneRegion, try_enter!
2+
using OhMyThreads.Tools: SimpleBarrier
3+
using OhMyThreads: OhMyThreads
24

3-
function tasks_macro(forex)
5+
function tasks_macro(forex; __module__)
46
if forex.head != :for
57
throw(ErrorException("Expected a for loop after `@tasks`."))
68
else
@@ -28,7 +30,8 @@ function tasks_macro(forex)
2830
!(arg isa Expr && arg.head == :macrocall &&
2931
arg.args[1] == Symbol("@only_one")) &&
3032
!(arg isa Expr && arg.head == :macrocall &&
31-
arg.args[1] == Symbol("@one_by_one"))
33+
arg.args[1] == Symbol("@one_by_one")) &&
34+
!(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@barrier"))
3235
end
3336
forbody.args[i] = esc(forbody.args[i])
3437
end
@@ -38,6 +41,14 @@ function tasks_macro(forex)
3841
_maybe_handle_atset_block!(settings, forbody.args)
3942
setup_onlyone_blocks = _maybe_handle_atonlyone_blocks!(forbody.args)
4043
setup_onebyone_blocks = _maybe_handle_atonebyone_blocks!(forbody.args)
44+
if isdefined(__module__, Symbol("@barrier"))
45+
if __module__.var"@barrier" != OhMyThreads.Experimental.var"@barrier"
46+
error("There seems to be a macro `@barrier` around which isn't `OhMyThreads.Experimental.@barrier`. This isn't supported.")
47+
end
48+
setup_barriers = _maybe_handle_atbarriers!(forbody.args, settings)
49+
else
50+
setup_barriers = nothing
51+
end
4152

4253
itrng = esc(itrng)
4354
itvar = esc(itvar)
@@ -58,6 +69,7 @@ function tasks_macro(forex)
5869
quote
5970
$setup_onlyone_blocks
6071
$setup_onebyone_blocks
72+
$setup_barriers
6173
$make_mapping_function
6274
tmapreduce(mapping_function, $(settings.reducer),
6375
$(itrng))
@@ -67,6 +79,7 @@ function tasks_macro(forex)
6779
quote
6880
$setup_onlyone_blocks
6981
$setup_onebyone_blocks
82+
$setup_barriers
7083
$make_mapping_function
7184
tmap(mapping_function, $(itrng))
7285
end
@@ -75,6 +88,7 @@ function tasks_macro(forex)
7588
quote
7689
$setup_onlyone_blocks
7790
$setup_onebyone_blocks
91+
$setup_barriers
7892
$make_mapping_function
7993
tforeach(mapping_function, $(itrng))
8094
end
@@ -91,7 +105,7 @@ function tasks_macro(forex)
91105
for (k, v) in settings.kwargs
92106
push!(kwexpr.args, Expr(:kw, k, v))
93107
end
94-
insert!(q.args[8].args, 2, kwexpr)
108+
insert!(q.args[10].args, 2, kwexpr)
95109

96110
# wrap everything in a let ... end block
97111
# and, potentially, define the `TaskLocalValue`s.
@@ -113,12 +127,11 @@ function maybe_warn_useless_init(settings)
113127
end
114128

115129
Base.@kwdef mutable struct Settings
116-
# scheduler::Expr = :(DynamicScheduler())
117130
scheduler::Union{Expr, QuoteNode, NotGiven} = NotGiven()
118131
reducer::Union{Expr, Symbol, NotGiven} = NotGiven()
119132
collect::Union{Bool, NotGiven} = NotGiven()
120133
init::Union{Expr, Symbol, NotGiven} = NotGiven()
121-
kwargs::Vector{Pair{Symbol, Any}} = Pair{Symbol, Any}[]
134+
kwargs::Dict{Symbol, Any} = Dict{Symbol, Any}()
122135
end
123136

124137
function _maybe_handle_atlocal_block!(args)
@@ -220,7 +233,8 @@ function _handle_atset_single_assign!(settings, ex)
220233
def = def isa Bool ? def : esc(def)
221234
setfield!(settings, sym, def)
222235
else
223-
push!(settings.kwargs, sym => esc(def))
236+
# push!(settings.kwargs, sym => esc(def))
237+
settings.kwargs[sym] = esc(def)
224238
end
225239
end
226240

@@ -263,3 +277,21 @@ function _maybe_handle_atonebyone_blocks!(args)
263277
end
264278
return setup_onebyone_blocks
265279
end
280+
281+
function _maybe_handle_atbarriers!(args, settings)
282+
idcs = findall(args) do arg
283+
arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@barrier")
284+
end
285+
isnothing(idcs) && return # no @barrier found
286+
setup_barriers = quote end
287+
for i in idcs
288+
!haskey(settings.kwargs, :ntasks) &&
289+
throw(ErrorException("When using `@barrier`, the number of tasks must be " *
290+
"specified explicitly, e.g. via `@set ntasks=...`. "))
291+
ntasks = settings.kwargs[:ntasks]
292+
@gensym barrier
293+
push!(setup_barriers.args, :($(barrier) = $(SimpleBarrier)($ntasks)))
294+
args[i] = :($(esc(:wait))($(barrier)))
295+
end
296+
return setup_barriers
297+
end

src/macros.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ end
6464
```
6565
"""
6666
macro tasks(args...)
67-
Implementation.tasks_macro(args...)
67+
Implementation.tasks_macro(args...; __module__)
6868
end
6969

7070
"""

src/tools.jl

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,52 @@ function reset!(s::OnlyOneRegion)
8282
nothing
8383
end
8484

85+
"""
86+
SimpleBarrier(n::Integer)
87+
88+
Simple reusable barrier for `n` parallel tasks.
89+
90+
Given `b = SimpleBarrier(n)` and `n` parallel tasks, each task that calls
91+
`wait(b)` will block until the other `n-1` tasks have called `wait(b)` as well.
92+
93+
## Example
94+
```
95+
n = nthreads()
96+
barrier = SimpleBarrier(n)
97+
@sync for i in 1:n
98+
@spawn begin
99+
println("A")
100+
wait(barrier) # synchronize all tasks
101+
println("B")
102+
wait(barrier) # synchronize all tasks (reusable)
103+
println("C")
104+
end
105+
end
106+
```
107+
"""
108+
mutable struct SimpleBarrier
109+
const n::Int64
110+
const c::Threads.Condition
111+
cnt::Int64
112+
113+
function SimpleBarrier(n::Integer)
114+
new(n, Threads.Condition(), 0)
115+
end
116+
end
117+
118+
function Base.wait(b::SimpleBarrier)
119+
lock(b.c)
120+
try
121+
b.cnt += 1
122+
if b.cnt == b.n
123+
b.cnt = 0
124+
notify(b.c)
125+
else
126+
wait(b.c)
127+
end
128+
finally
129+
unlock(b.c)
130+
end
131+
end
132+
85133
end # Tools

test/runtests.jl

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Test, OhMyThreads
22
using OhMyThreads: TaskLocalValue, WithTaskLocals, @fetch, promise_task_local
3+
using OhMyThreads.Experimental: @barrier
34

45
include("Aqua.jl")
56

@@ -470,11 +471,65 @@ end
470471
y += 1 # parallel-safe
471472
end
472473
end
473-
@test x == 1 && y == 10
474+
@test x == 1 && y == 10
474475
catch ErrorException
475476
@test false
476477
end
477478
end
478479
end;
479480

481+
@testset "@barrier" begin
482+
@test (@tasks for i in 1:20
483+
@set ntasks = 20
484+
@barrier
485+
end) |> isnothing
486+
487+
@test try
488+
@macroexpand @tasks for i in 1:20
489+
@barrier
490+
end
491+
false
492+
catch
493+
true
494+
end
495+
496+
@test try
497+
x = Threads.Atomic{Int64}(0)
498+
y = Threads.Atomic{Int64}(0)
499+
@tasks for i in 1:20
500+
@set ntasks = 20
501+
502+
Threads.atomic_add!(x, 1)
503+
@barrier
504+
if x[] < 20 && y[] > 0 # x hasn't reached 20 yet and y is already > 0
505+
error("shouldn't happen")
506+
end
507+
Threads.atomic_add!(y, 1)
508+
end
509+
true
510+
catch ErrorException
511+
false
512+
end
513+
514+
@test try
515+
x = Threads.Atomic{Int64}(0)
516+
y = Threads.Atomic{Int64}(0)
517+
@tasks for i in 1:20
518+
@set ntasks = 20
519+
520+
Threads.atomic_add!(x, 1)
521+
@barrier
522+
Threads.atomic_add!(x, 1)
523+
@barrier
524+
if x[] < 40 && y[] > 0 # x hasn't reached 20 yet and y is already > 0
525+
error("shouldn't happen")
526+
end
527+
Threads.atomic_add!(y, 1)
528+
end
529+
true
530+
catch ErrorException
531+
false
532+
end
533+
end
534+
480535
# Todo way more testing, and easier tests to deal with

0 commit comments

Comments
 (0)