Skip to content

Commit 1aed585

Browse files
jpsamarooJamesWrigley
authored andcommitted
streaming: Add tests
1 parent 0b815c4 commit 1aed585

File tree

2 files changed

+95
-1
lines changed

2 files changed

+95
-1
lines changed

test/runtests.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ tests = [
1010
("Mutation", "mutation.jl"),
1111
("Task Queues", "task-queues.jl"),
1212
("Datadeps", "datadeps.jl"),
13+
("Streaming", "streaming.jl"),
1314
("Domain Utilities", "domain.jl"),
1415
("Array - Allocation", "array/allocation.jl"),
1516
("Array - Indexing", "array/indexing.jl"),
@@ -81,7 +82,6 @@ else
8182
@info "Running all tests"
8283
end
8384

84-
8585
using Distributed
8686
if additional_workers > 0
8787
# We put this inside a branch because addprocs() takes a minimum of 1s to

test/streaming.jl

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
@everywhere ENV["JULIA_DEBUG"] = "Dagger"
2+
3+
@everywhere function rand_finite()
4+
x = rand()
5+
if x < 0.1
6+
return Dagger.finish_stream(x)
7+
end
8+
return x
9+
end
10+
function catch_interrupt(f)
11+
try
12+
f()
13+
catch err
14+
if err isa Dagger.ThunkFailedException && err.ex isa InterruptException
15+
return
16+
elseif err isa Dagger.Sch.SchedulingException
17+
return
18+
end
19+
rethrow(err)
20+
end
21+
end
22+
function test_finishes(f, message::String; ignore_timeout=false)
23+
t = @eval Threads.@spawn @testset $message catch_interrupt($f)
24+
if timedwait(()->istaskdone(t), 10) == :timed_out
25+
if !ignore_timeout
26+
@warn "Testing task timed out: $message"
27+
end
28+
Dagger.cancel!(;halt_sch=true, force=true)
29+
fetch(Dagger.@spawn 1+1)
30+
return false
31+
end
32+
return true
33+
end
34+
@testset "Basics" begin
35+
@test test_finishes("Single task") do
36+
local x
37+
Dagger.spawn_streaming() do
38+
x = Dagger.@spawn rand_finite()
39+
end
40+
@test fetch(x) === nothing
41+
end
42+
43+
@test !test_finishes("Single task running forever"; ignore_timeout=true) do
44+
local x
45+
Dagger.spawn_streaming() do
46+
x = Dagger.spawn() do
47+
y = rand()
48+
sleep(1)
49+
return y
50+
end
51+
end
52+
fetch(x)
53+
end
54+
55+
@test test_finishes("Two tasks (sequential)") do
56+
local x, y
57+
@warn "\n\n\nStart streaming\n\n\n"
58+
Dagger.spawn_streaming() do
59+
x = Dagger.@spawn rand_finite()
60+
y = Dagger.@spawn x+1
61+
end
62+
@test fetch(x) === nothing
63+
@test_throws Dagger.ThunkFailedException fetch(y)
64+
end
65+
66+
# TODO: Two tasks (parallel)
67+
68+
# TODO: Three tasks (2 -> 1) and (1 -> 2)
69+
# TODO: Four tasks (diamond)
70+
71+
# TODO: With pass-through/Without result
72+
# TODO: With pass-through/With result
73+
# TODO: Without pass-through/Without result
74+
75+
@test test_finishes("Without pass-through/With result") do
76+
local x
77+
Dagger.spawn_streaming() do
78+
x = Dagger.spawn() do
79+
x = rand()
80+
if x < 0.1
81+
return Dagger.finish_stream(x; result=123)
82+
end
83+
return x
84+
end
85+
end
86+
@test fetch(x) == 123
87+
end
88+
end
89+
# TODO: Custom stream buffers/buffer amounts
90+
# TODO: Cross-worker streaming
91+
# TODO: Different stream element types (immutable and mutable)
92+
93+
# TODO: Zero-allocation examples
94+
# FIXME: Streaming across threads

0 commit comments

Comments
 (0)