Skip to content

Commit 9c368e2

Browse files
committed
Add streaming API
1 parent d47a1e6 commit 9c368e2

File tree

8 files changed

+762
-1
lines changed

8 files changed

+762
-1
lines changed

Project.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
99
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
1010
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
1111
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
12-
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
1312
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
1413
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
1514
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ makedocs(;
2020
"Task Spawning" => "task-spawning.md",
2121
"Data Management" => "data-management.md",
2222
"Distributed Arrays" => "darray.md",
23+
"Streaming Tasks" => "streaming.md",
2324
"Scopes" => "scopes.md",
2425
"Processors" => "processors.md",
2526
"Task Queues" => "task-queues.md",

docs/src/streaming.md

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Streaming Tasks
2+
3+
Dagger tasks have a limited lifetime - they are created, execute, finish, and
4+
are eventually destroyed when they're no longer needed. Thus, if one wants
5+
to run the same kind of computations over and over, one might re-create a
6+
similar set of tasks for each unit of data that needs processing.
7+
8+
This might be fine for computations which take a long time to run (thus
9+
dwarfing the cost of task creation, which is quite small), or when working with
10+
a limited set of data, but this approach is not great for doing lots of small
11+
computations on a large (or endless) amount of data. For example, processing
12+
image frames from a webcam, reacting to messages from a message bus, reading
13+
samples from a software radio, etc. All of these tasks are better suited to a
14+
"streaming" model of data processing, where data is simply piped into a
15+
continuously-running task (or DAG of tasks) forever, or until the data runs
16+
out.
17+
18+
Thankfully, if you have a problem which is best modeled as a streaming system
19+
of tasks, Dagger has you covered! Building on its support for
20+
["Task Queues"](@ref), Dagger provides a means to convert an entire DAG of
21+
tasks into a streaming DAG, where data flows into and out of each task
22+
asynchronously, using the `spawn_streaming` function:
23+
24+
```julia
25+
Dagger.spawn_streaming() do # enters a streaming region
26+
vals = Dagger.@spawn rand()
27+
print_vals = Dagger.@spawn println(vals)
28+
end # exits the streaming region, and starts the DAG running
29+
```
30+
31+
In the above example, `vals` is a Dagger task which has been transformed to run
32+
in a streaming manner - instead of just calling `rand()` once and returning its
33+
result, it will re-run `rand()` endlessly, continuously producing new random
34+
values. In typical Dagger style, `print_vals` is a Dagger task which depends on
35+
`vals`, but in streaming form - it will continuously `println` the random
36+
values produced from `vals`. Both tasks will run forever, and will run
37+
efficiently, only doing the work necessary to generate, transfer, and consume
38+
values.
39+
40+
As the comments point out, `spawn_streaming` creates a streaming region, during
41+
which `vals` and `print_vals` are created and configured. Both tasks are halted
42+
until `spawn_streaming` returns, allowing large DAGs to be built all at once,
43+
without any task losing a single value. If desired, streaming regions can be
44+
connected, although some values might be lost while tasks are being connected:
45+
46+
```julia
47+
vals = Dagger.spawn_streaming() do
48+
Dagger.@spawn rand()
49+
end
50+
51+
# Some values might be generated by `vals` but thrown away
52+
# before `print_vals` is fully setup and connected to it
53+
54+
print_vals = Dagger.spawn_streaming() do
55+
Dagger.@spawn println(vals)
56+
end
57+
```
58+
59+
More complicated streaming DAGs can be easily constructed, without doing
60+
anything different. For example, we can generate multiple streams of random
61+
numbers, write them all to their own files, and print the combined results:
62+
63+
```julia
64+
Dagger.spawn_streaming() do
65+
all_vals = [Dagger.spawn(rand) for i in 1:4]
66+
all_vals_written = map(1:4) do i
67+
Dagger.spawn(all_vals[i]) do val
68+
open("results_$i.txt"; write=true, create=true, append=true) do io
69+
println(io, repr(val))
70+
end
71+
return val
72+
end
73+
end
74+
Dagger.spawn(all_vals_written...) do all_vals_written...
75+
vals_sum = sum(all_vals_written)
76+
println(vals_sum)
77+
end
78+
end
79+
```
80+
81+
If you want to stop the streaming DAG and tear it all down, you can call
82+
`Dagger.kill!(all_vals[1])` (or `Dagger.kill!(all_vals_written[2])`, etc., the
83+
kill propagates throughout the DAG).
84+
85+
Alternatively, tasks can stop themselves from the inside with
86+
`finish_streaming`, optionally returning a value that can be `fetch`'d. Let's
87+
do this when our randomly-drawn number falls within some arbitrary range:
88+
89+
```julia
90+
vals = Dagger.spawn_streaming() do
91+
Dagger.spawn() do
92+
x = rand()
93+
if x < 0.001
94+
# That's good enough, let's be done
95+
return Dagger.finish_streaming("Finished!")
96+
end
97+
return x
98+
end
99+
end
100+
fetch(vals)
101+
```
102+
103+
In this example, the call to `fetch` will hang (while random numbers continue
104+
to be drawn), until a drawn number is less than 0.001; at that point, `fetch`
105+
will return with "Finished!", and the task `vals` will have terminated.

src/Dagger.jl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ include("sch/Sch.jl"); using .Sch
5656
# Data dependency task queue
5757
include("datadeps.jl")
5858

59+
# Streaming
60+
include("stream-buffers.jl")
61+
include("stream-fetchers.jl")
62+
include("stream.jl")
63+
5964
# Array computations
6065
include("array/darray.jl")
6166
include("array/alloc.jl")

src/sch/eager.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ function eager_cleanup(state, uid)
116116
# N.B. cache and errored expire automatically
117117
delete!(state.thunk_dict, tid)
118118
end
119+
remotecall_wait(1, uid) do uid
120+
lock(EAGER_THUNK_STREAMS) do global_streams
121+
if haskey(global_streams, uid)
122+
delete!(global_streams, uid)
123+
end
124+
end
125+
end
119126
end
120127

121128
function _find_thunk(e::Dagger.EagerThunk)

src/stream-buffers.jl

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
"""
2+
A buffer that drops all elements put into it. Only to be used as the output
3+
buffer for a task - will throw if attached as an input.
4+
"""
5+
struct DropBuffer{T} end
6+
DropBuffer{T}(_) where T = DropBuffer{T}()
7+
Base.isempty(::DropBuffer) = true
8+
isfull(::DropBuffer) = false
9+
Base.put!(::DropBuffer, _) = nothing
10+
Base.take!(::DropBuffer) = error("Cannot `take!` from a DropBuffer")
11+
12+
"A process-local buffer backed by a `Channel{T}`."
13+
struct ChannelBuffer{T}
14+
channel::Channel{T}
15+
len::Int
16+
count::Threads.Atomic{Int}
17+
ChannelBuffer{T}(len::Int=1024) where T =
18+
new{T}(Channel{T}(len), len, Threads.Atomic{Int}(0))
19+
end
20+
Base.isempty(cb::ChannelBuffer) = isempty(cb.channel)
21+
isfull(cb::ChannelBuffer) = cb.count[] == cb.len
22+
function Base.put!(cb::ChannelBuffer{T}, x) where T
23+
put!(cb.channel, convert(T, x))
24+
Threads.atomic_add!(cb.count, 1)
25+
end
26+
function Base.take!(cb::ChannelBuffer)
27+
take!(cb.channel)
28+
Threads.atomic_sub!(cb.count, 1)
29+
end
30+
31+
"A cross-worker buffer backed by a `RemoteChannel{T}`."
32+
struct RemoteChannelBuffer{T}
33+
channel::RemoteChannel{Channel{T}}
34+
len::Int
35+
count::Threads.Atomic{Int}
36+
RemoteChannelBuffer{T}(len::Int=1024) where T =
37+
new{T}(RemoteChannel(()->Channel{T}(len)), len, Threads.Atomic{Int}(0))
38+
end
39+
Base.isempty(cb::RemoteChannelBuffer) = isempty(cb.channel)
40+
isfull(cb::RemoteChannelBuffer) = cb.count[] == cb.len
41+
function Base.put!(cb::RemoteChannelBuffer{T}, x) where T
42+
put!(cb.channel, convert(T, x))
43+
Threads.atomic_add!(cb.count, 1)
44+
end
45+
function Base.take!(cb::RemoteChannelBuffer)
46+
take!(cb.channel)
47+
Threads.atomic_sub!(cb.count, 1)
48+
end
49+
50+
"A process-local ring buffer."
51+
mutable struct ProcessRingBuffer{T}
52+
read_idx::Int
53+
write_idx::Int
54+
@atomic count::Int
55+
buffer::Vector{T}
56+
function ProcessRingBuffer{T}(len::Int=1024) where T
57+
buffer = Vector{T}(undef, len)
58+
return new{T}(1, 1, 0, buffer)
59+
end
60+
end
61+
Base.isempty(rb::ProcessRingBuffer) = (@atomic rb.count) == 0
62+
isfull(rb::ProcessRingBuffer) = (@atomic rb.count) == length(rb.buffer)
63+
function Base.put!(rb::ProcessRingBuffer{T}, x) where T
64+
len = length(rb.buffer)
65+
while (@atomic rb.count) == len
66+
yield()
67+
end
68+
to_write_idx = mod1(rb.write_idx, len)
69+
rb.buffer[to_write_idx] = convert(T, x)
70+
rb.write_idx += 1
71+
@atomic rb.count += 1
72+
end
73+
function Base.take!(rb::ProcessRingBuffer)
74+
while (@atomic rb.count) == 0
75+
yield()
76+
end
77+
to_read_idx = rb.read_idx
78+
rb.read_idx += 1
79+
@atomic rb.count -= 1
80+
to_read_idx = mod1(to_read_idx, length(rb.buffer))
81+
return rb.buffer[to_read_idx]
82+
end
83+
84+
#= TODO
85+
"A server-local ring buffer backed by shared-memory."
86+
mutable struct ServerRingBuffer{T}
87+
read_idx::Int
88+
write_idx::Int
89+
@atomic count::Int
90+
buffer::Vector{T}
91+
function ServerRingBuffer{T}(len::Int=1024) where T
92+
buffer = Vector{T}(undef, len)
93+
return new{T}(1, 1, 0, buffer)
94+
end
95+
end
96+
Base.isempty(rb::ServerRingBuffer) = (@atomic rb.count) == 0
97+
function Base.put!(rb::ServerRingBuffer{T}, x) where T
98+
len = length(rb.buffer)
99+
while (@atomic rb.count) == len
100+
yield()
101+
end
102+
to_write_idx = mod1(rb.write_idx, len)
103+
rb.buffer[to_write_idx] = convert(T, x)
104+
rb.write_idx += 1
105+
@atomic rb.count += 1
106+
end
107+
function Base.take!(rb::ServerRingBuffer)
108+
while (@atomic rb.count) == 0
109+
yield()
110+
end
111+
to_read_idx = rb.read_idx
112+
rb.read_idx += 1
113+
@atomic rb.count -= 1
114+
to_read_idx = mod1(to_read_idx, length(rb.buffer))
115+
return rb.buffer[to_read_idx]
116+
end
117+
=#
118+
119+
#=
120+
"A TCP-based ring buffer."
121+
mutable struct TCPRingBuffer{T}
122+
read_idx::Int
123+
write_idx::Int
124+
@atomic count::Int
125+
buffer::Vector{T}
126+
function TCPRingBuffer{T}(len::Int=1024) where T
127+
buffer = Vector{T}(undef, len)
128+
return new{T}(1, 1, 0, buffer)
129+
end
130+
end
131+
Base.isempty(rb::TCPRingBuffer) = (@atomic rb.count) == 0
132+
function Base.put!(rb::TCPRingBuffer{T}, x) where T
133+
len = length(rb.buffer)
134+
while (@atomic rb.count) == len
135+
yield()
136+
end
137+
to_write_idx = mod1(rb.write_idx, len)
138+
rb.buffer[to_write_idx] = convert(T, x)
139+
rb.write_idx += 1
140+
@atomic rb.count += 1
141+
end
142+
function Base.take!(rb::TCPRingBuffer)
143+
while (@atomic rb.count) == 0
144+
yield()
145+
end
146+
to_read_idx = rb.read_idx
147+
rb.read_idx += 1
148+
@atomic rb.count -= 1
149+
to_read_idx = mod1(to_read_idx, length(rb.buffer))
150+
return rb.buffer[to_read_idx]
151+
end
152+
=#
153+
154+
#=
155+
"""
156+
A flexible puller which switches to the most efficient buffer type based
157+
on the sender and receiver locations.
158+
"""
159+
mutable struct UniBuffer{T}
160+
buffer::Union{ProcessRingBuffer{T}, Nothing}
161+
end
162+
function initialize_stream_buffer!(::Type{UniBuffer{T}}, T, send_proc, recv_proc, buffer_amount) where T
163+
if buffer_amount == 0
164+
error("Return NullBuffer")
165+
end
166+
send_osproc = get_parent(send_proc)
167+
recv_osproc = get_parent(recv_proc)
168+
if send_osproc.pid == recv_osproc.pid
169+
inner = RingBuffer{T}(buffer_amount)
170+
elseif system_uuid(send_osproc.pid) == system_uuid(recv_osproc.pid)
171+
inner = ProcessBuffer{T}(buffer_amount)
172+
else
173+
inner = RemoteBuffer{T}(buffer_amount)
174+
end
175+
return UniBuffer{T}(buffer_amount)
176+
end
177+
178+
struct LocalPuller{T,B}
179+
buffer::B{T}
180+
id::UInt
181+
function LocalPuller{T,B}(id::UInt, buffer_amount::Integer) where {T,B}
182+
buffer = initialize_stream_buffer!(B, T, buffer_amount)
183+
return new{T,B}(buffer, id)
184+
end
185+
end
186+
function Base.take!(pull::LocalPuller{T,B}) where {T,B}
187+
if pull.buffer === nothing
188+
pull.buffer =
189+
error("Return NullBuffer")
190+
end
191+
value = take!(pull.buffer)
192+
end
193+
function initialize_input_stream!(stream::Stream{T,B}, id::UInt, send_proc::Processor, recv_proc::Processor, buffer_amount::Integer) where {T,B}
194+
local_buffer = remotecall_fetch(stream.ref.handle.owner, stream.ref.handle, id) do ref, id
195+
local_buffer, remote_buffer = initialize_stream_buffer!(B, T, send_proc, recv_proc, buffer_amount)
196+
ref.buffers[id] = remote_buffer
197+
return local_buffer
198+
end
199+
stream.buffer = local_buffer
200+
return stream
201+
end
202+
=#

src/stream-fetchers.jl

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
struct RemoteFetcher end
2+
function stream_fetch_values!(::Type{RemoteFetcher}, T, store_ref::Chunk{Store_remote}, buffer::Blocal, id::UInt) where {Store_remote, Blocal}
3+
if store_ref.handle.owner == myid()
4+
store = fetch(store_ref)::Store_remote
5+
while !isfull(buffer)
6+
value = take!(store, id)::T
7+
put!(buffer, value)
8+
end
9+
else
10+
tls = Dagger.get_tls()
11+
values = remotecall_fetch(store_ref.handle.owner, store_ref.handle, id, T, Store_remote) do store_ref, id, T, Store_remote
12+
store = MemPool.poolget(store_ref)::Store_remote
13+
values = T[]
14+
while !isempty(store, id)
15+
value = take!(store, id)::T
16+
push!(values, value)
17+
end
18+
return values
19+
end::Vector{T}
20+
for value in values
21+
put!(buffer, value)
22+
end
23+
end
24+
end

0 commit comments

Comments
 (0)