Skip to content

Commit c361e0a

Browse files
author
José Valim
committed
Ensure suspensions are closed on failures
1 parent a736b55 commit c361e0a

File tree

2 files changed

+96
-15
lines changed

2 files changed

+96
-15
lines changed

lib/elixir/lib/stream.ex

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,22 @@ defmodule Stream do
109109
last && { last, fun }, :lists.reverse(accs), acc)
110110
end
111111

112+
defp do_each(reduce, [], last, accs, { command, acc }) do
113+
handle_reduce reduce.({ command, [acc|accs] }), [], last
114+
end
115+
112116
defp do_each(reduce, after_funs, last, accs, { command, acc }) do
113-
case reduce.({ command, [acc|accs] }) do
117+
try do
118+
handle_reduce reduce.({ command, [acc|accs] }), after_funs, last
119+
catch
120+
kind, reason ->
121+
lc fun inlist after_funs, do: fun.()
122+
:erlang.raise(kind, reason, :erlang.get_stacktrace)
123+
end
124+
end
125+
126+
defp handle_reduce(res, after_funs, last) do
127+
case res do
114128
{ :suspended, [acc|accs], continuation } ->
115129
{ :suspended, acc, &do_each(continuation, after_funs, last, accs, &1) }
116130
{ :halted, [acc|_] } ->
@@ -456,8 +470,7 @@ defmodule Stream do
456470
defp do_flat_map(next_acc, next, mapper, acc, fun) do
457471
case next.({ :cont, next_acc }) do
458472
{ :suspended, [val|next_acc], next } ->
459-
enum = mapper.(val)
460-
do_flat_map(next_acc, next, mapper, acc, fun, &Enumerable.reduce(enum, &1, fun))
473+
do_flat_map(next_acc, next, mapper, acc, fun, &Enumerable.reduce(mapper.(val), &1, fun))
461474
{ reason, _ } ->
462475
{ reason, elem(acc, 1) }
463476
end
@@ -468,9 +481,11 @@ defmodule Stream do
468481
reduce.(acc)
469482
catch
470483
{ :stream_flat_map, h } ->
471-
reduce.({ :halt, elem(acc, 1) })
472484
next.({ :halt, next_acc })
473485
{ :halted, h }
486+
kind, reason ->
487+
next.({ :halt, next_acc })
488+
:erlang.raise(kind, reason, :erlang.get_stacktrace)
474489
else
475490
{ _, acc } -> do_flat_map(next_acc, next, mapper, { :cont, acc }, fun)
476491
{ :suspended, acc, c } -> { :suspended, acc, &do_flat_map(next_acc, next, mapper, &1, fun, c) }
@@ -734,7 +749,18 @@ defmodule Stream do
734749
end
735750

736751
defp do_zip(zips, { :cont, acc }, callback) do
737-
do_zip(zips, acc, callback, [], [])
752+
try do
753+
do_zip(zips, acc, callback, [], [])
754+
catch
755+
kind, reason ->
756+
do_zip_close(zips)
757+
:erlang.raise(kind, reason, :erlang.get_stacktrace)
758+
else
759+
{ :next, buffer, acc } ->
760+
do_zip(buffer, acc, callback)
761+
{ :done, _ } = o ->
762+
o
763+
end
738764
end
739765

740766
defp do_zip([{ fun, fun_acc }|t], acc, callback, list, buffer) do
@@ -749,7 +775,7 @@ defmodule Stream do
749775

750776
defp do_zip([], acc, callback, list, buffer) do
751777
zipped = list_to_tuple(:lists.reverse(list))
752-
do_zip(:lists.reverse(buffer), callback.(zipped, acc), callback)
778+
{ :next, :lists.reverse(buffer), callback.(zipped, acc) }
753779
end
754780

755781
defp do_zip_close([]), do: :ok
@@ -892,7 +918,10 @@ defmodule Stream do
892918

893919
defp do_resource(next_acc, next_fun, { :cont, acc }, fun, after_fun) do
894920
try do
895-
next_fun.(next_acc)
921+
case next_fun.(next_acc) do
922+
nil -> nil
923+
{ v, next_acc } -> { fun.(v, acc), next_acc }
924+
end
896925
catch
897926
kind, reason ->
898927
after_fun.(next_acc)
@@ -901,8 +930,8 @@ defmodule Stream do
901930
nil ->
902931
after_fun.(next_acc)
903932
{ :done, acc }
904-
{ v, next_acc } ->
905-
do_resource(next_acc, next_fun, fun.(v, acc), fun, after_fun)
933+
{ acc, next_acc } ->
934+
do_resource(next_acc, next_fun, acc, fun, after_fun)
906935
end
907936
end
908937

lib/elixir/test/elixir/stream_test.exs

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,26 @@ defmodule StreamTest do
2626
end
2727

2828
test "after" do
29-
stream = Stream.after([1,2,3], fn -> Process.put(:after, true) end)
29+
stream = Stream.after([1,2,3], fn -> Process.put(:stream_after, true) end)
3030

3131
# Done
32-
Process.put(:after, false)
32+
Process.put(:stream_after, false)
3333
assert Enum.to_list(stream) == [1,2,3]
34-
assert Process.get(:after)
34+
assert Process.get(:stream_after)
3535

3636
# Halted
37-
Process.put(:after, false)
37+
Process.put(:stream_after, false)
3838
assert Enum.take(stream, 1) == [1]
39-
assert Process.get(:after)
39+
assert Process.get(:stream_after)
40+
end
41+
42+
test "after closes on errors" do
43+
stream = Stream.after([1,2,3], fn -> Process.put(:stream_after, true) end)
44+
45+
Process.put(:stream_after, false)
46+
stream = Stream.map(stream, fn x -> if x > 2, do: throw(:error), else: x end)
47+
assert catch_throw(Enum.to_list(stream)) == :error
48+
assert Process.get(:stream_after)
4049
end
4150

4251
test "chunk" do
@@ -256,7 +265,7 @@ defmodule StreamTest do
256265
assert Enum.zip(list, list) == Enum.zip(stream, stream)
257266
end
258267

259-
test "flat_map does not leave stream suspended" do
268+
test "flat_map does not leave inner stream suspended" do
260269
stream = Stream.flat_map [1,2,3],
261270
fn i ->
262271
Stream.resource(fn -> i end,
@@ -267,7 +276,9 @@ defmodule StreamTest do
267276
Process.put(:stream_flat_map, false)
268277
assert stream |> Enum.take(3) == [1,2,3]
269278
assert Process.get(:stream_flat_map)
279+
end
270280

281+
test "flat_map does not leave outer stream suspended" do
271282
stream = Stream.resource(fn -> 1 end,
272283
fn acc -> { acc, acc + 1 } end,
273284
fn _ -> Process.put(:stream_flat_map, true) end)
@@ -278,6 +289,17 @@ defmodule StreamTest do
278289
assert Process.get(:stream_flat_map)
279290
end
280291

292+
test "flat_map closes on error" do
293+
stream = Stream.resource(fn -> 1 end,
294+
fn acc -> { acc, acc + 1 } end,
295+
fn _ -> Process.put(:stream_flat_map, true) end)
296+
stream = Stream.flat_map(stream, fn _ -> throw(:error) end)
297+
298+
Process.put(:stream_flat_map, false)
299+
assert catch_throw(Enum.to_list(stream)) == :error
300+
assert Process.get(:stream_flat_map)
301+
end
302+
281303
test "iterate" do
282304
stream = Stream.iterate(0, &(&1+2))
283305
assert Enum.take(stream, 5) == [0,2,4,6,8]
@@ -402,6 +424,17 @@ defmodule StreamTest do
402424
assert Enum.to_list(stream) == [5, 4, 3, 2, 1]
403425
end
404426

427+
test "resource closes on errors" do
428+
stream = Stream.resource(fn -> 1 end,
429+
fn acc -> { acc, acc + 1 } end,
430+
fn _ -> Process.put(:stream_resource, true) end)
431+
432+
Process.put(:stream_resource, false)
433+
stream = Stream.map(stream, fn x -> if x > 2, do: throw(:error), else: x end)
434+
assert catch_throw(Enum.to_list(stream)) == :error
435+
assert Process.get(:stream_resource)
436+
end
437+
405438
test "resource is zippable" do
406439
# File.stream! uses Stream.resource underneath
407440
stream = File.stream!(__FILE__)
@@ -479,6 +512,25 @@ defmodule StreamTest do
479512
assert Process.get(:stream_zip) == :done
480513
end
481514

515+
test "zip/2 closes on inner error" do
516+
stream = Stream.after([1, 2, 3], fn -> Process.put(:stream_zip, true) end)
517+
stream = Stream.zip(stream, Stream.map([:a, :b, :c], fn _ -> throw(:error) end))
518+
519+
Process.put(:stream_zip, false)
520+
assert catch_throw(Enum.to_list(stream)) == :error
521+
assert Process.get(:stream_zip)
522+
end
523+
524+
test "zip/2 closes on outer error" do
525+
stream = Stream.after([1, 2, 3], fn -> Process.put(:stream_zip, true) end)
526+
|> Stream.zip([:a, :b, :c])
527+
|> Stream.map(fn _ -> throw(:error) end)
528+
529+
Process.put(:stream_zip, false)
530+
assert catch_throw(Enum.to_list(stream)) == :error
531+
assert Process.get(:stream_zip)
532+
end
533+
482534
test "with_index" do
483535
stream = Stream.with_index([1,2,3])
484536
assert is_lazy(stream)

0 commit comments

Comments
 (0)