Skip to content

Commit f2d8064

Browse files
committed
Do not halt streams twice in Stream.transform/5, closes #13944
1 parent 4a97256 commit f2d8064

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

lib/elixir/lib/stream.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,8 @@ defmodule Stream do
960960
do_transform_user(:lists.reverse(vals), user_acc, :cont, next, inner_acc, funs)
961961

962962
{_, vals} ->
963+
# Do not attempt to call the resource again, it has either done or halted
964+
next = fn _ -> {:done, []} end
963965
do_transform_user(:lists.reverse(vals), user_acc, :last, next, inner_acc, funs)
964966
end
965967
end
@@ -972,7 +974,6 @@ defmodule Stream do
972974
last_fun.(user_acc)
973975
catch
974976
kind, reason ->
975-
next.({:halt, []})
976977
after_fun.(user_acc)
977978
:erlang.raise(kind, reason, __STACKTRACE__)
978979
else

lib/elixir/test/elixir/stream_test.exs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,38 @@ defmodule StreamTest do
10461046
assert Process.get(__MODULE__) == 10
10471047
end
10481048

1049+
test "transform/5 does not halt twice" do
1050+
resource_start = fn -> 0 end
1051+
1052+
resource_next = fn current ->
1053+
if current < 5 do
1054+
{[current], current + 1}
1055+
else
1056+
{:halt, current}
1057+
end
1058+
end
1059+
1060+
resource_after = fn _ ->
1061+
send(self(), {:halted, :resource})
1062+
end
1063+
1064+
transform_next = fn current, index -> {[current + 1], index} end
1065+
transform_last = fn index -> {:halt, index} end
1066+
1067+
transform_after = fn _ ->
1068+
send(self(), {:halted, :transform})
1069+
end
1070+
1071+
Stream.resource(resource_start, resource_next, resource_after)
1072+
|> Stream.transform(fn -> 1 end, transform_next, transform_last, transform_after)
1073+
|> Stream.run()
1074+
1075+
assert_received {:halted, :resource}
1076+
assert_received {:halted, :transform}
1077+
refute_received {:halted, :resource}
1078+
refute_received {:halted, :transform}
1079+
end
1080+
10491081
test "scan/2" do
10501082
stream = Stream.scan(1..5, &(&1 + &2))
10511083
assert lazy?(stream)

0 commit comments

Comments
 (0)