Skip to content

Commit b6fa541

Browse files
authored
Add concurrency safe iterator wrapper ChannelLike (#121)
`ChannelLike` wraps an indexable object such that it can be iterated by concurrent tasks in a safe manner similar to a `Channel`. This is instead of `Channel` in the chunked `GreedyScheduler`.
1 parent de0227b commit b6fa541

File tree

7 files changed

+91
-10
lines changed

7 files changed

+91
-10
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ Version 0.7.0
77
- ![BREAKING][badge-breaking] If you provide a `chunks` or `index_chunks` as input we now disable the internal chunking without a warning. Previously, we did show a warning unless you had set `chunking=false`. In contrast, we now throw an error when you set any incompatible chunking related keyword arguments.
88
- ![Deprecation][badge-deprecation] The `split` options `:batch` and `:scatter` are now deprecated (they still work but will be dropped at some point). Use `:consecutive` and `:roundrobin`, respectively, instead.
99
- ![Enhancement][badge-enhancement] The `split` keyword argument can now also be a `<: OhMyThreads.Split`. Compared to providing a `Symbol`, the former can potentially give better performance. For example, you can replace `:consecutive` by `OhMyThreads.Consecutive()` and `:roundrobin` by `OhMyThreads.RoundRobin()`.
10+
- ![Feature][badge-feature] `ChannelLike` is a new public (but not exported) type. `ChannelLike(itr)` provide a way to iterate over `itr` in a concurrency safe manner similar to `Channel`. See the docstring for more details. ([#121][gh-pr-121])
11+
- ![Enhancement][badge-enhancement] `ChannelLike` is used internally for the `GreedyScheduler` when `chunking=true`. This improves performance overall but it is especially noticeable when the number of chunks is large. ([#121][gh-pr-121])
1012

1113
Version 0.6.2
1214
-------------
@@ -136,3 +138,4 @@ Version 0.2.0
136138
[gh-issue-25]: https://github.com/JuliaFolds2/OhMyThreads.jl/issues/25
137139

138140
[gh-pr-5]: https://github.com/JuliaFolds2/OhMyThreads.jl/pull/5
141+
[gh-pr-121]: https://github.com/JuliaFolds2/OhMyThreads.jl/pull/121

docs/src/literate/tls/tls.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,13 @@ sort(res_nu) ≈ sort(res_channel_flipped)
382382
@btime matmulsums_perthread_channel_flipped($As_nu, $Bs_nu; ntasks = 2 * nthreads());
383383
@btime matmulsums_perthread_channel_flipped($As_nu, $Bs_nu; ntasks = 10 * nthreads());
384384

385+
# In addition, OhMyThreads provides an iterator-wrapper type
386+
# [`OhMyThreads.ChannelLike`](@ref) which can be used in place of a `Channel`. If
387+
# the number of elements is large this can be more efficient since there is no
388+
# need to copy the elements into the `Channel`. Concretely, in the example above,
389+
# we could replace `Channel() do .. end` with
390+
# `OhMyThreads.ChannelLike(1:length(As))`.
391+
385392
# ## Bumper.jl (only for the brave)
386393
#
387394
# If you are bold and want to cut down temporary allocations even more you can

docs/src/literate/tls/tls.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,13 @@ Quick benchmark:
490490
491491
````
492492

493+
In addition, OhMyThreads provides an iterator-wrapper type
494+
[`OhMyThreads.ChannelLike`](@ref) which can be used in place of a `Channel`. If
495+
the number of elements is large this can be more efficient since there is no
496+
need to copy the elements into the `Channel`. Concretely, in the example above,
497+
we could replace `Channel() do .. end` with
498+
`OhMyThreads.ChannelLike(1:length(As))`.
499+
493500
## Bumper.jl (only for the brave)
494501

495502
If you are bold and want to cut down temporary allocations even more you can

docs/src/refs/api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,5 @@ SerialScheduler
6161
```@docs
6262
OhMyThreads.WithTaskLocals
6363
OhMyThreads.promise_task_local
64+
OhMyThreads.ChannelLike
6465
```

src/implementation.jl

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module Implementation
22

33
import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcollect
4-
using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local
4+
using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local, ChannelLike
55
using OhMyThreads.Tools: nthtid
66
using OhMyThreads: Scheduler,
77
DynamicScheduler, StaticScheduler, GreedyScheduler,
@@ -207,6 +207,7 @@ function _tmapreduce(f,
207207
ntasks = min(length(first(Arrs)), ntasks_desired)
208208
ch_len = length(first(Arrs))
209209
end
210+
# TODO: Use ChannelLike for iterators that support it. Dispatch on IndexLinear?
210211
ch = Channel{Tuple{eltype.(Arrs)...}}(ch_len; spawn = true) do ch
211212
for args in zip(Arrs...)
212213
put!(ch, args)
@@ -255,11 +256,9 @@ function _tmapreduce(f,
255256
ntasks_desired = scheduler.ntasks
256257
ntasks = min(length(chnks), ntasks_desired)
257258

258-
ch = Channel{typeof(first(chnks))}(length(chnks); spawn = true) do ch
259-
for args in chnks
260-
put!(ch, args)
261-
end
262-
end
259+
# ChunkSplitters.IndexChunks support everything needed for ChannelLike
260+
ch = ChannelLike(chnks)
261+
263262
tasks = map(1:ntasks) do _
264263
# Note, calling `promise_task_local` here is only safe because we're assuming that
265264
# Base.mapreduce isn't going to magically try to do multithreading on us...

src/schedulers.jl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,9 @@ end
233233
"""
234234
GreedyScheduler (aka :greedy)
235235
236-
A greedy dynamic scheduler. The elements of the collection are first put into a `Channel`
237-
and then dynamic, non-sticky tasks are spawned to process the channel content in parallel.
236+
A greedy dynamic scheduler. The elements are put into a shared workqueue and dynamic,
237+
non-sticky, tasks are spawned to process the elements of the queue with each task taking a new
238+
element from the queue as soon as the previous one is done.
238239
239240
Note that elements are processed in a non-deterministic order, and thus a potential reducing
240241
function **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in
@@ -249,10 +250,10 @@ some additional overhead.
249250
* Determines the number of parallel tasks to be spawned.
250251
* Setting `ntasks < nthreads()` is an effective way to use only a subset of the available threads.
251252
- `chunking::Bool` (default `false`):
252-
* Controls whether input elements are grouped into chunks (`true`) or not (`false`) before put into the channel. This can improve the performance especially if there are many iterations each of which are computationally cheap.
253+
* Controls whether input elements are grouped into chunks (`true`) or not (`false`) before put into the shared workqueue. This can improve the performance especially if there are many iterations each of which are computationally cheap.
253254
* If `nchunks` or `chunksize` are explicitly specified, `chunking` will be automatically set to `true`.
254255
- `nchunks::Integer` (default `10 * nthreads()`):
255-
* Determines the number of chunks (that will eventually be put into the channel).
256+
* Determines the number of chunks (that will eventually be put into the shared workqueue).
256257
* Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). For `nchunks <= nthreads()` there are not enough chunks for any load balancing.
257258
- `chunksize::Integer` (default not set)
258259
* Specifies the desired chunk size (instead of the number of chunks).

src/types.jl

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,66 @@ promise_task_local(f::Any) = f
7171
function (f::WithTaskLocals{F})(args...; kwargs...) where {F}
7272
promise_task_local(f)(args...; kwargs...)
7373
end
74+
75+
"""
76+
ChannelLike(itr)
77+
78+
This struct wraps an indexable object such that it can be iterated by concurrent tasks in a
79+
safe manner similar to a `Channel`.
80+
81+
`ChannelLike(itr)` is conceptually similar to:
82+
```julia
83+
Channel{eltype(itr)}(length(itr)) do ch
84+
foreach(i -> put!(ch, i), itr)
85+
end
86+
```
87+
i.e. creating a channel, `put!`ing all elements of `itr` into it and closing it. The
88+
advantage is that `ChannelLike` doesn't copy the data.
89+
90+
# Examples
91+
```julia
92+
ch = OhMyThreads.ChannelLike(1:5)
93+
94+
@sync for taskid in 1:2
95+
Threads.@spawn begin
96+
for i in ch
97+
println("Task #\$taskid processing item \$i")
98+
sleep(1 / i)
99+
end
100+
end
101+
end
102+
103+
# output
104+
105+
Task #1 processing item 1
106+
Task #2 processing item 2
107+
Task #2 processing item 3
108+
Task #2 processing item 4
109+
Task #1 processing item 5
110+
```
111+
112+
Note that `ChannelLike` is stateful (just like a `Channel`), so you can't iterate over it
113+
twice.
114+
115+
The wrapped iterator must support `firstindex(itr)::Int`, `lastindex(itr)::Int` and
116+
`getindex(itr, ::Int)`.
117+
"""
118+
mutable struct ChannelLike{T}
119+
const itr::T
120+
@atomic idx::Int
121+
function ChannelLike(itr::T) where {T}
122+
return new{T}(itr, firstindex(itr) - 1)
123+
end
124+
end
125+
126+
Base.length(ch::ChannelLike) = length(ch.itr)
127+
Base.eltype(ch::ChannelLike) = eltype(ch.itr)
128+
129+
function Base.iterate(ch::ChannelLike, ::Nothing = nothing)
130+
this = @atomic ch.idx += 1
131+
if this <= lastindex(ch.itr)
132+
return (@inbounds(ch.itr[this]), nothing)
133+
else
134+
return nothing
135+
end
136+
end

0 commit comments

Comments
 (0)