Skip to content

Commit 38f716c

Browse files
author
José Valim
committed
Ensure suspending flat map does not consume items, closes #3751
1 parent e0c004f commit 38f716c

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

lib/elixir/lib/stream.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,15 @@ defmodule Stream do
687687
do_transform(user_acc.(), user, fun, [], next, inner_acc, inner, after_fun)
688688
end
689689

690+
defp do_transform(user_acc, _user, _fun, _next_acc, _next, {:halt, inner_acc}, _inner, after_fun) do
691+
do_after(after_fun, user_acc)
692+
{:halted, inner_acc}
693+
end
694+
695+
defp do_transform(user_acc, user, fun, next_acc, next, {:suspend, inner_acc}, inner, after_fun) do
696+
{:suspended, inner_acc, &do_transform(user_acc, user, fun, next_acc, next, &1, inner, after_fun)}
697+
end
698+
690699
defp do_transform(user_acc, user, fun, next_acc, next, inner_acc, inner, after_fun) do
691700
case next.({:cont, next_acc}) do
692701
{:suspended, [val|next_acc], next} ->

lib/elixir/test/elixir/stream_test.exs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,22 @@ defmodule StreamTest do
416416
assert Stream.transform(nats, 0, &{[&1, &2], &1 + &2}) |> Enum.take(6) == [1, 0, 2, 1, 3, 3]
417417
end
418418

419+
test "transform/3 with early halt" do
420+
stream = Stream.repeatedly(fn -> throw(:error) end)
421+
|> Stream.transform(nil, &{[&1, &2], &1})
422+
423+
assert {:halted, nil} =
424+
Enumerable.reduce(stream, {:halt, nil}, fn _, _ -> throw(:error) end)
425+
end
426+
427+
test "transform/3 with early suspend" do
428+
stream = Stream.repeatedly(fn -> throw(:error) end)
429+
|> Stream.transform(nil, &{[&1, &2], &1})
430+
431+
assert {:suspended, nil, _} =
432+
Enumerable.reduce(stream, {:suspend, nil}, fn _, _ -> throw(:error) end)
433+
end
434+
419435
test "transform/3 with halt" do
420436
stream = Stream.resource(fn -> 1 end,
421437
fn acc -> {[acc], acc + 1} end,
@@ -570,6 +586,27 @@ defmodule StreamTest do
570586
assert Process.get(:stream_transform)
571587
end
572588

589+
test "transform/4 with early halt" do
590+
stream = Stream.repeatedly(fn -> throw(:error) end)
591+
|> Stream.transform(fn -> nil end, &{[&1, &2], &1},
592+
fn nil -> Process.put(:stream_transform, true) end)
593+
594+
Process.put(:stream_transform, false)
595+
assert {:halted, nil} =
596+
Enumerable.reduce(stream, {:halt, nil}, fn _, _ -> throw(:error) end)
597+
assert Process.get(:stream_transform)
598+
end
599+
600+
test "transform/4 with early suspend" do
601+
stream = Stream.repeatedly(fn -> throw(:error) end)
602+
|> Stream.transform(fn -> nil end, &{[&1, &2], &1},
603+
fn nil -> Process.put(:stream_transform, true) end)
604+
605+
refute Process.get(:stream_transform)
606+
assert {:suspended, nil, _} =
607+
Enumerable.reduce(stream, {:suspend, nil}, fn _, _ -> throw(:error) end)
608+
end
609+
573610
test "transform/4 closes on outer errors" do
574611
stream = Stream.transform(1..10, fn -> 0 end,
575612
fn 3, _ -> throw(:error)

0 commit comments

Comments
 (0)