Skip to content

Commit 761063f

Browse files
authored
Add special region support to @tasks (#93)
* critical section * minor docstring update * section single; overhaul of forbody escaping * update docstring * drop comments * one_by_one and one_only * one_only -> only_one
1 parent 97da820 commit 761063f

File tree

7 files changed

+389
-87
lines changed

7 files changed

+389
-87
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
OhMyThreads.jl Changelog
22
=========================
33

4+
Version 0.5.1
5+
-------------
6+
- ![Feature][badge-feature] Within a parallel `@tasks` block one can now mark a region with `@one_by_one`. This region will be run by one task at a time ("critical region").
7+
- ![Feature][badge-feature] Within a `@tasks` block one can now mark a region as with `@only_one`. This region will be run by a single parallel task only (other tasks will skip over it).
8+
49
Version 0.5.0
510
-------------
611

docs/src/refs/api.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ CollapsedDocStrings = true
1111
@tasks
1212
@set
1313
@local
14+
@only_one
15+
@one_by_one
1416
```
1517

1618
### Functions

src/OhMyThreads.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler
2020
SerialScheduler
2121
include("implementation.jl")
2222

23-
export @tasks, @set, @local
23+
export @tasks, @set, @local, @one_by_one, @only_one
2424
export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect
2525
export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, SerialScheduler
2626

src/macro_impl.jl

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using OhMyThreads.Tools: OnlyOneRegion, try_enter!
2+
13
function tasks_macro(forex)
24
if forex.head != :for
35
throw(ErrorException("Expected a for loop after `@tasks`."))
@@ -17,11 +19,26 @@ function tasks_macro(forex)
1719

1820
settings = Settings()
1921

22+
# Escape everything in the loop body that is not used in conjuction with one of our
23+
# "macros", e.g. @set or @local. Code inside of these macro blocks will be escaped by
24+
# the respective "macro" handling functions below.
25+
for i in findall(forbody.args) do arg
26+
!(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set")) &&
27+
!(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@local")) &&
28+
!(arg isa Expr && arg.head == :macrocall &&
29+
arg.args[1] == Symbol("@only_one")) &&
30+
!(arg isa Expr && arg.head == :macrocall &&
31+
arg.args[1] == Symbol("@one_by_one"))
32+
end
33+
forbody.args[i] = esc(forbody.args[i])
34+
end
35+
2036
locals_before, locals_names = _maybe_handle_atlocal_block!(forbody.args)
2137
tls_names = isnothing(locals_before) ? [] : map(x -> x.args[1], locals_before)
2238
_maybe_handle_atset_block!(settings, forbody.args)
39+
setup_onlyone_blocks = _maybe_handle_atonlyone_blocks!(forbody.args)
40+
setup_onebyone_blocks = _maybe_handle_atonebyone_blocks!(forbody.args)
2341

24-
forbody = esc(forbody)
2542
itrng = esc(itrng)
2643
itvar = esc(itvar)
2744

@@ -39,19 +56,25 @@ function tasks_macro(forex)
3956
end
4057
q = if isgiven(settings.reducer)
4158
quote
59+
$setup_onlyone_blocks
60+
$setup_onebyone_blocks
4261
$make_mapping_function
4362
tmapreduce(mapping_function, $(settings.reducer),
4463
$(itrng))
4564
end
4665
elseif isgiven(settings.collect)
4766
maybe_warn_useless_init(settings)
4867
quote
68+
$setup_onlyone_blocks
69+
$setup_onebyone_blocks
4970
$make_mapping_function
5071
tmap(mapping_function, $(itrng))
5172
end
5273
else
5374
maybe_warn_useless_init(settings)
5475
quote
76+
$setup_onlyone_blocks
77+
$setup_onebyone_blocks
5578
$make_mapping_function
5679
tforeach(mapping_function, $(itrng))
5780
end
@@ -68,7 +91,7 @@ function tasks_macro(forex)
6891
for (k, v) in settings.kwargs
6992
push!(kwexpr.args, Expr(:kw, k, v))
7093
end
71-
insert!(q.args[4].args, 2, kwexpr)
94+
insert!(q.args[8].args, 2, kwexpr)
7295

7396
# wrap everything in a let ... end block
7497
# and, potentially, define the `TaskLocalValue`s.
@@ -151,16 +174,15 @@ function _atlocal_assign_to_exprs(ex)
151174
tls_type = esc(left_ex.args[2])
152175
local_before = :($(tl_storage) = TaskLocalValue{$tls_type}(() -> $(tls_def)))
153176
else
154-
tls_sym = esc(left_ex)
177+
tls_sym = esc(left_ex)
155178
local_before = :($(tl_storage) = let f = () -> $(tls_def)
156-
TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f)
157-
end)
179+
TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f)
180+
end)
158181
end
159182
local_name = :($(tls_sym))
160183
return local_before, local_name
161184
end
162185

163-
164186
function _maybe_handle_atset_block!(settings, args)
165187
idcs = findall(args) do arg
166188
arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set")
@@ -201,3 +223,43 @@ function _handle_atset_single_assign!(settings, ex)
201223
push!(settings.kwargs, sym => esc(def))
202224
end
203225
end
226+
227+
function _maybe_handle_atonlyone_blocks!(args)
228+
idcs = findall(args) do arg
229+
arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@only_one")
230+
end
231+
isnothing(idcs) && return # no @only_one blocks
232+
setup_onlyone_blocks = quote end
233+
for i in idcs
234+
body = args[i].args[3]
235+
@gensym onlyone
236+
init_onlyone_ex = :($(onlyone) = $(OnlyOneRegion()))
237+
push!(setup_onlyone_blocks.args, init_onlyone_ex)
238+
args[i] = quote
239+
Tools.try_enter!($(onlyone)) do
240+
$(esc(body))
241+
end
242+
end
243+
end
244+
return setup_onlyone_blocks
245+
end
246+
247+
function _maybe_handle_atonebyone_blocks!(args)
248+
idcs = findall(args) do arg
249+
arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@one_by_one")
250+
end
251+
isnothing(idcs) && return # no @one_by_one blocks
252+
setup_onebyone_blocks = quote end
253+
for i in idcs
254+
body = args[i].args[3]
255+
@gensym onebyone
256+
init_lock_ex = :($(onebyone) = $(Base.ReentrantLock()))
257+
push!(setup_onebyone_blocks.args, init_lock_ex)
258+
args[i] = quote
259+
$(esc(:lock))($(onebyone)) do
260+
$(esc(body))
261+
end
262+
end
263+
end
264+
return setup_onebyone_blocks
265+
end

src/macros.jl

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,58 @@ end
151151
error("The @local macro may only be used inside of a @tasks block.")
152152
end
153153
end
154+
155+
"""
156+
@only_one begin ... end
157+
158+
This can be used inside a `@tasks for ... end` block to mark a region of code to be
159+
executed by only one of the parallel tasks (all other tasks skip over this region).
160+
161+
## Example
162+
163+
```julia
164+
using OhMyThreads: @tasks
165+
166+
@tasks for i in 1:10
167+
@set ntasks = 10
168+
169+
println(i, ": before")
170+
@only_one begin
171+
println(i, ": only printed by a single task")
172+
sleep(1)
173+
end
174+
println(i, ": after")
175+
end
176+
```
177+
"""
178+
macro only_one(args...)
179+
error("The @only_one macro may only be used inside of a @tasks block.")
180+
end
181+
182+
"""
183+
@one_by_one begin ... end
184+
185+
This can be used inside a `@tasks for ... end` block to mark a region of code to be
186+
executed by one parallel task at a time (i.e. exclusive access). The order may be arbitrary
187+
and non-deterministic.
188+
189+
## Example
190+
191+
```julia
192+
using OhMyThreads: @tasks
193+
194+
@tasks for i in 1:10
195+
@set ntasks = 10
196+
197+
println(i, ": before")
198+
@one_by_one begin
199+
println(i, ": one task at a time")
200+
sleep(0.5)
201+
end
202+
println(i, ": after")
203+
end
204+
```
205+
"""
206+
macro one_by_one(args...)
207+
error("The @one_by_one macro may only be used inside of a @tasks block.")
208+
end

src/tools.jl

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,62 @@ Return a `UInt` identifier for the current running [Task](https://docs.julialang
2424
"""
2525
taskid() = objectid(current_task())
2626

27+
"""
28+
May be used to mark a region in parallel code to be executed by a single task only
29+
(all other tasks shall skip over it).
30+
31+
See [`try_enter!`](@ref) and [`reset!`](@ref).
32+
"""
33+
mutable struct OnlyOneRegion
34+
@atomic latch::Bool
35+
OnlyOneRegion() = new(false)
36+
end
37+
38+
"""
39+
try_enter!(f, s::OnlyOneRegion)
40+
41+
When called from multiple parallel tasks (on a shared `s::OnlyOneRegion`) only a single
42+
task will execute `f`.
43+
44+
## Example
45+
46+
```julia
47+
using OhMyThreads: @tasks
48+
using OhMyThreads.Tools: OnlyOneRegion, try_enter!
49+
50+
only_one = OnlyOneRegion()
51+
52+
@tasks for i in 1:10
53+
@set ntasks = 10
54+
55+
println(i, ": before")
56+
try_enter!(only_one) do
57+
println(i, ": only printed by a single task")
58+
sleep(1)
59+
end
60+
println(i, ": after")
61+
end
62+
```
63+
"""
64+
function try_enter!(f, s::OnlyOneRegion)
65+
latch = @atomic :monotonic s.latch
66+
if latch
67+
return
68+
end
69+
(_, success) = @atomicreplace s.latch false=>true
70+
if !success
71+
return
72+
end
73+
f()
74+
return
75+
end
76+
77+
"""
78+
Reset the `OnlyOneRegion` (so that it can be used again).
79+
"""
80+
function reset!(s::OnlyOneRegion)
81+
@atomicreplace s.latch true=>false
82+
nothing
83+
end
84+
2785
end # Tools

0 commit comments

Comments
 (0)