Skip to content

Commit 69e7e4c

Browse files
committed
Fix Stream.concat/2 bug when used in async_stream
Close #14277
1 parent 8f6cb5d commit 69e7e4c

File tree

2 files changed

+27
-8
lines changed

2 files changed

+27
-8
lines changed

lib/elixir/lib/task/supervised.ex

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -377,16 +377,25 @@ defmodule Task.Supervised do
377377
stream_close(config)
378378
:erlang.raise(kind, reason, __STACKTRACE__)
379379
else
380-
{:suspended, [value], next} ->
381-
waiting = stream_spawn(value, spawned, waiting, config)
382-
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, next, config)
383-
384-
{_, [value]} ->
385-
waiting = stream_spawn(value, spawned, waiting, config)
386-
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, :done, config)
387-
388380
{_, []} ->
389381
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, :done, config)
382+
383+
result ->
384+
{values, next} =
385+
case result do
386+
{:suspended, values = [_ | _], next} -> {values, next}
387+
{_, values = [_ | _]} -> {values, :done}
388+
end
389+
390+
# right fold because values are in reverse order
391+
{waiting, spawned} =
392+
List.foldr(values, {waiting, spawned}, fn value, {waiting, spawned} ->
393+
waiting = stream_spawn(value, spawned, waiting, config)
394+
{waiting, spawned + 1}
395+
end)
396+
397+
max = max - length(values)
398+
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next, config)
390399
end
391400
end
392401

lib/elixir/test/elixir/task_test.exs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,16 @@ defmodule TaskTest do
904904
|> Enum.to_list() == [ok: :ok]
905905
end
906906

907+
test "stream concatenation edge case" do
908+
result =
909+
Stream.take([:foo, :bar], 1)
910+
|> Stream.concat([1, 2])
911+
|> Task.async_stream(& &1)
912+
|> Enum.to_list()
913+
914+
assert result == [ok: :foo, ok: 1, ok: 2]
915+
end
916+
907917
test "with $callers" do
908918
grandparent = self()
909919

0 commit comments

Comments
 (0)