Skip to content

Commit b400189

Browse files
committed
Add streaming API
1 parent 7204709 commit b400189

File tree

6 files changed

+440
-1
lines changed

6 files changed

+440
-1
lines changed

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ makedocs(;
1919
"Task Spawning" => "task-spawning.md",
2020
"Data Management" => "data-management.md",
2121
"Distributed Arrays" => "darray.md",
22+
"Streaming Tasks" => "streaming.md",
2223
"Scopes" => "scopes.md",
2324
"Processors" => "processors.md",
2425
"Task Queues" => "task-queues.md",

docs/src/streaming.md

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Streaming Tasks
2+
3+
Dagger tasks have a limited lifetime - they are created, execute, finish, and
4+
are eventually destroyed when they're no longer needed. Thus, if one wants
5+
to run the same kind of computations over and over, one might re-create a
6+
similar set of tasks for each unit of data that needs processing.
7+
8+
This might be fine for computations which take a long time to run (thus
9+
dwarfing the cost of task creation, which is quite small), or when working with
10+
a limited set of data, but this approach is not great for doing lots of small
11+
computations on a large (or endless) amount of data. For example, processing
12+
image frames from a webcam, reacting to messages from a message bus, reading
13+
samples from a software radio, etc. All of these tasks are better suited to a
14+
"streaming" model of data processing, where data is simply piped into a
15+
continuously-running task (or DAG of tasks) forever, or until the data runs
16+
out.
17+
18+
Thankfully, if you have a problem which is best modeled as a streaming system
19+
of tasks, Dagger has you covered! Building on its support for
20+
["Task Queues"](@ref), Dagger provides a means to convert an entire DAG of
21+
tasks into a streaming DAG, where data flows into and out of each task
22+
asynchronously, using the `spawn_streaming` function:
23+
24+
```julia
25+
Dagger.spawn_streaming() do # enters a streaming region
26+
vals = Dagger.@spawn rand()
27+
print_vals = Dagger.@spawn println(vals)
28+
end # exits the streaming region, and starts the DAG running
29+
```
30+
31+
In the above example, `vals` is a Dagger task which has been transformed to run
32+
in a streaming manner - instead of just calling `rand()` once and returning its
33+
result, it will re-run `rand()` endlessly, continuously producing new random
34+
values. In typical Dagger style, `print_vals` is a Dagger task which depends on
35+
`vals`, but in streaming form - it will continuously `println` the random
36+
values produced from `vals`. Both tasks will run forever, and will run
37+
efficiently, only doing the work necessary to generate, transfer, and consume
38+
values.
39+
40+
As the comments point out, `spawn_streaming` creates a streaming region, during
41+
which `vals` and `print_vals` are created and configured. Both tasks are halted
42+
until `spawn_streaming` returns, allowing large DAGs to be built all at once,
43+
without any task losing a single value. If desired, streaming regions can be
44+
connected, although some values might be lost while tasks are being connected:
45+
46+
```julia
47+
vals = Dagger.spawn_streaming() do
48+
Dagger.@spawn rand()
49+
end
50+
51+
# Some values might be generated by `vals` but thrown away
52+
# before `print_vals` is fully setup and connected to it
53+
54+
print_vals = Dagger.spawn_streaming() do
55+
Dagger.@spawn println(vals)
56+
end
57+
```
58+
59+
More complicated streaming DAGs can be easily constructed, without doing
60+
anything different. For example, we can generate multiple streams of random
61+
numbers, write them all to their own files, and print the combined results:
62+
63+
```julia
64+
Dagger.spawn_streaming() do
65+
all_vals = [Dagger.spawn(rand) for i in 1:4]
66+
all_vals_written = map(1:4) do i
67+
Dagger.spawn(all_vals[i]) do val
68+
open("results_$i.txt"; write=true, create=true, append=true) do io
69+
println(io, repr(val))
70+
end
71+
return val
72+
end
73+
end
74+
Dagger.spawn(all_vals_written...) do all_vals_written...
75+
vals_sum = sum(all_vals_written)
76+
println(vals_sum)
77+
end
78+
end
79+
```
80+
81+
If you want to stop the streaming DAG and tear it all down, you can call
82+
`Dagger.kill!(all_vals[1])` (or `Dagger.kill!(all_vals_written[2])`, etc., the
83+
kill propagates throughout the DAG).
84+
85+
Alternatively, tasks can stop themselves from the inside with
86+
`finish_streaming`, optionally returning a value that can be `fetch`'d. Let's
87+
do this when our randomly-drawn number falls within some arbitrary range:
88+
89+
```julia
90+
vals = Dagger.spawn_streaming() do
91+
Dagger.spawn() do
92+
x = rand()
93+
if x < 0.001
94+
# That's good enough, let's be done
95+
return Dagger.finish_streaming("Finished!")
96+
end
97+
return x
98+
end
99+
end
100+
fetch(vals)
101+
```
102+
103+
In this example, the call to `fetch` will hang (while random numbers continue
104+
to be drawn), until a drawn number is less than 0.001; at that point, `fetch`
105+
will return with "Finished!", and the task `vals` will have terminated.

src/Dagger.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ include("utils/system_uuid.jl")
4242
include("utils/caching.jl")
4343
include("sch/Sch.jl"); using .Sch
4444

45+
# Streaming
46+
include("stream.jl")
47+
4548
# Array computations
4649
include("array/darray.jl")
4750
include("array/alloc.jl")

src/eager_thunk.jl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,17 @@ function Base.fetch(t::EagerThunk; raw=false)
6767
if !isdefined(t, :thunk_ref)
6868
throw(ConcurrencyViolationError("Cannot `fetch` an unlaunched `EagerThunk`"))
6969
end
70-
return fetch(t.future; raw)
70+
stream = task_to_stream(t.uid)
71+
if stream !== nothing
72+
add_waiters!(stream, [0])
73+
end
74+
try
75+
return fetch(t.future; raw)
76+
finally
77+
if stream !== nothing
78+
remove_waiters!(stream, [0])
79+
end
80+
end
7181
end
7282
function Base.show(io::IO, t::EagerThunk)
7383
status = if isdefined(t, :thunk_ref)

src/sch/eager.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ function eager_cleanup(state, uid)
116116
# N.B. cache and errored expire automatically
117117
delete!(state.thunk_dict, tid)
118118
end
119+
remotecall_wait(1, uid) do uid
120+
lock(EAGER_THUNK_STREAMS) do global_streams
121+
if haskey(global_streams, uid)
122+
delete!(global_streams, uid)
123+
end
124+
end
125+
end
119126
end
120127

121128
function _find_thunk(e::Dagger.EagerThunk)

0 commit comments

Comments
 (0)