Skip to content

Commit 149adb5

Browse files
committed
Streaming tests cleanup and fixes
- Added some whitespace. - Deleted the unused `rand_finite()` methods. - Allow passing the `timeout` to `test_finishes()` - Fix bug in one of the tests where we weren't waiting for all the tasks to finish, which would occasionally cause test failures because of the race condition.
1 parent e6a504d commit 149adb5

File tree

1 file changed

+9
-18
lines changed

1 file changed

+9
-18
lines changed

test/streaming.jl

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,3 @@
1-
@everywhere function rand_finite(T=Float64)
2-
x = rand(T)
3-
if rand() < 0.1
4-
return Dagger.finish_stream(x)
5-
end
6-
return x
7-
end
8-
@everywhere function rand_finite_returns(T=Float64)
9-
x = rand(T)
10-
if rand() < 0.1
11-
return Dagger.finish_stream(x; result=x)
12-
end
13-
return x
14-
end
15-
161
const ACCUMULATOR = Dict{Int,Vector{Real}}()
172
@everywhere function accumulator(x=0)
183
tid = Dagger.task_id()
@@ -37,12 +22,14 @@ function catch_interrupt(f)
3722
rethrow(err)
3823
end
3924
end
25+
4026
function merge_testset!(inner::Test.DefaultTestSet)
4127
outer = Test.get_testset()
4228
append!(outer.results, inner.results)
4329
outer.n_passed += inner.n_passed
4430
end
45-
function test_finishes(f, message::String; ignore_timeout=false, max_evals=10)
31+
32+
function test_finishes(f, message::String; timeout=10, ignore_timeout=false, max_evals=10)
4633
t = @eval Threads.@spawn begin
4734
tset = nothing
4835
try
@@ -61,7 +48,8 @@ function test_finishes(f, message::String; ignore_timeout=false, max_evals=10)
6148
end
6249
return tset
6350
end
64-
timed_out = timedwait(()->istaskdone(t), 10) == :timed_out
51+
52+
timed_out = timedwait(()->istaskdone(t), timeout) == :timed_out
6553
if timed_out
6654
if !ignore_timeout
6755
@warn "Testing task timed out: $message"
@@ -70,6 +58,7 @@ function test_finishes(f, message::String; ignore_timeout=false, max_evals=10)
7058
@everywhere GC.gc()
7159
fetch(Dagger.@spawn 1+1)
7260
end
61+
7362
tset = fetch(t)::Test.DefaultTestSet
7463
merge_testset!(tset)
7564
return !timed_out
@@ -176,6 +165,7 @@ for idx in 1:5
176165
@test length(values[A_tid]) == 1
177166
@test all(v -> 0 <= v <= 10, values[A_tid])
178167
end
168+
179169
@test test_finishes("x -> (A, B)") do
180170
local x, A, B
181171
Dagger.spawn_streaming() do
@@ -324,7 +314,8 @@ for idx in 1:5
324314
@test fetch(y) === nothing
325315
@test fetch(z) === nothing
326316
@test fetch(A) === nothing
327-
@test fetch(A) === nothing
317+
@test fetch(B) === nothing
318+
328319
values = copy(ACCUMULATOR); empty!(ACCUMULATOR)
329320
A_tid = Dagger.task_id(A)
330321
@test length(values[A_tid]) == 10

0 commit comments

Comments
 (0)