Skip to content

Commit ff90cd1

Browse files
committed
Ensure inner halt in Stream.transform does not override user intention, closes #14278
1 parent 8f6cb5d commit ff90cd1

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

lib/elixir/lib/stream.ex

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,17 +1067,17 @@ defmodule Stream do
10671067
{_, _, _, _, after_fun} = funs
10681068

10691069
try do
1070-
reduce.({op, [:outer | inner_acc]})
1070+
reduce.({op, [:cont | inner_acc]})
10711071
catch
10721072
kind, reason ->
10731073
next.({:halt, []})
10741074
after_fun.(user_acc)
10751075
:erlang.raise(kind, reason, __STACKTRACE__)
10761076
else
1077-
# Only take into account outer halts when the op is not halt itself.
1078-
# Otherwise, we were the ones wishing to halt, so we should just stop.
1079-
{:halted, [:outer | acc]} when op != :halt ->
1080-
do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs)
1077+
# The user wanted to cont/suspend but the stream halted,
1078+
# so we continue with the user intention.
1079+
{:halted, [inner_op | acc]} when op != :halt and inner_op != :halt ->
1080+
do_transform_user(vals, user_acc, next_op, next, {inner_op, acc}, funs)
10811081

10821082
{:halted, [_ | acc]} ->
10831083
next.({:halt, []})
@@ -1093,10 +1093,9 @@ defmodule Stream do
10931093
end
10941094
end
10951095

1096-
defp do_transform_each(x, [:outer | acc], f) do
1096+
defp do_transform_each(x, [:cont | acc], f) do
10971097
case f.(x, acc) do
1098-
{:halt, res} -> {:halt, [:inner | res]}
1099-
{op, res} -> {op, [:outer | res]}
1098+
{op, res} -> {op, [op | res]}
11001099
end
11011100
end
11021101

@@ -1632,17 +1631,14 @@ defmodule Stream do
16321631

16331632
defp do_enum_resource(next_acc, next_fun, {op, acc}, fun, after_fun, reduce) do
16341633
try do
1635-
reduce.({op, [:outer | acc]})
1634+
reduce.({op, [:cont | acc]})
16361635
catch
16371636
kind, reason ->
16381637
after_fun.(next_acc)
16391638
:erlang.raise(kind, reason, __STACKTRACE__)
16401639
else
1641-
{:halted, [:outer | acc]} ->
1642-
do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun)
1643-
1644-
{:halted, [:inner | acc]} ->
1645-
do_resource(next_acc, next_fun, {:halt, acc}, fun, after_fun)
1640+
{:halted, [inner_op | acc]} ->
1641+
do_resource(next_acc, next_fun, {inner_op, acc}, fun, after_fun)
16461642

16471643
{:done, [_ | acc]} ->
16481644
do_resource(next_acc, next_fun, {:cont, acc}, fun, after_fun)
@@ -1652,10 +1648,9 @@ defmodule Stream do
16521648
end
16531649
end
16541650

1655-
defp do_resource_each(x, [:outer | acc], f) do
1651+
defp do_resource_each(x, [:cont | acc], f) do
16561652
case f.(x, acc) do
1657-
{:halt, res} -> {:halt, [:inner | res]}
1658-
{op, res} -> {op, [:outer | res]}
1653+
{op, res} -> {op, [op | res]}
16591654
end
16601655
end
16611656

lib/elixir/test/elixir/task_test.exs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,16 @@ defmodule TaskTest do
930930
assert_receive 2
931931
assert_receive 3
932932
end
933+
934+
test "wrapping a flat_map/concat with a haltable stream" do
935+
result =
936+
Stream.take([:foo, :bar], 1)
937+
|> Stream.concat([1, 2])
938+
|> Task.async_stream(& &1)
939+
|> Enum.to_list()
940+
941+
assert result == [ok: :foo, ok: 1, ok: 2]
942+
end
933943
end
934944

935945
for {desc, concurrency} <- [==: 4, <: 2, >: 8] do

0 commit comments

Comments
 (0)