Skip to content

Commit c94327c

Browse files
author
José Valim
committed
Keep Task.async_stream backwards compatible
1 parent 0193873 commit c94327c

File tree

3 files changed

+17
-17
lines changed

3 files changed

+17
-17
lines changed

lib/elixir/lib/task/supervised.ex

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,8 @@ defmodule Task.Supervised do
210210
# this response when the replying task dies (we'll notice in the :down
211211
# message).
212212
{{^monitor_ref, position}, reply} ->
213-
%{^position => {pid, :running, element}} = waiting
214-
waiting = Map.put(waiting, position, {pid, {:ok, reply}, element})
213+
%{^position => {pid, :running}} = waiting
214+
waiting = Map.put(waiting, position, {pid, {:ok, reply}})
215215
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next, config)
216216

217217
# The task at position "position" died for some reason. We check if it
@@ -223,20 +223,20 @@ defmodule Task.Supervised do
223223
case waiting do
224224
# If the task replied, we don't care whether it went down for timeout
225225
# or for normal reasons.
226-
%{^position => {_, {:ok, _} = ok, _}} ->
226+
%{^position => {_, {:ok, _} = ok}} ->
227227
ok
228228
# If the task exited by itself before replying, we emit {:exit, reason}.
229-
%{^position => {_, :running, element}} when kind == :down ->
230-
{:exit, reason, element}
229+
%{^position => {_, :running}} when kind == :down ->
230+
{:exit, reason}
231231
# If the task timed out before replying, we either exit (on_timeout: :exit)
232232
# or emit {:exit, :timeout} (on_timeout: :kill_task) (note the task is already
233233
# dead at this point).
234-
%{^position => {_, :running, element}} when kind == :timed_out ->
234+
%{^position => {_, :running}} when kind == :timed_out ->
235235
if on_timeout == :exit do
236236
stream_cleanup_inbox(monitor_pid, monitor_ref)
237-
exit({:timeout, {__MODULE__, :stream, [element, timeout]}})
237+
exit({:timeout, {__MODULE__, :stream, [timeout]}})
238238
else
239-
{:exit, :timeout, element}
239+
{:exit, :timeout}
240240
end
241241
end
242242

@@ -364,7 +364,7 @@ defmodule Task.Supervised do
364364
receive do
365365
{:spawned, {^monitor_ref, ^spawned}, pid} ->
366366
send(pid, {self(), {monitor_ref, spawned}})
367-
Map.put(waiting, spawned, {pid, :running, value})
367+
Map.put(waiting, spawned, {pid, :running})
368368
{:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
369369
stream_cleanup_inbox(monitor_pid, monitor_ref)
370370
exit({reason, {__MODULE__, :stream, [timeout]}})

lib/elixir/test/elixir/task/supervisor_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ defmodule Task.SupervisorTest do
221221
assert supervisor
222222
|> Task.Supervisor.async_stream(1..4, &exit(Integer.to_string(&1)), @opts)
223223
|> Enum.to_list ==
224-
[{:exit, "1", 1}, {:exit, "2", 2}, {:exit, "3", 3}, {:exit, "4", 4}]
224+
[exit: "1", exit: "2", exit: "3", exit: "4"]
225225
end
226226

227227
test "shuts down unused tasks", %{supervisor: supervisor} do
@@ -277,7 +277,7 @@ defmodule Task.SupervisorTest do
277277
assert supervisor
278278
|> Task.Supervisor.async_stream_nolink(1..4, &exit/1, @opts)
279279
|> Enum.to_list ==
280-
[{:exit, 1, 1}, {:exit, 2, 2}, {:exit, 3, 3}, {:exit, 4, 4}]
280+
[exit: 1, exit: 2, exit: 3, exit: 4]
281281
end
282282

283283
test "shuts down unused tasks", %{supervisor: supervisor} do

lib/elixir/test/elixir/task_test.exs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ defmodule TaskTest do
492492
describe "async_stream/2" do
493493
test "timeout" do
494494
assert catch_exit([:infinity] |> Task.async_stream(&sleep/1, [timeout: 0]) |> Enum.to_list) ==
495-
{:timeout, {Task.Supervised, :stream, [:infinity, 0]}}
495+
{:timeout, {Task.Supervised, :stream, [0]}}
496496
refute_received _
497497
end
498498

@@ -509,7 +509,7 @@ defmodule TaskTest do
509509
test "streams an enumerable with ordered: false, on_timeout: :kill_task" do
510510
opts = [max_concurrency: 4, ordered: false, on_timeout: :kill_task, timeout: 50]
511511
assert [100, 1, 100, 1] |> Task.async_stream(&sleep/1, opts) |> Enum.to_list() ==
512-
[{:ok, 1}, {:ok, 1}, {:exit, :timeout, 100}, {:exit, :timeout, 100}]
512+
[ok: 1, ok: 1, exit: :timeout, exit: :timeout]
513513
refute_received _
514514
end
515515
end
@@ -543,7 +543,7 @@ defmodule TaskTest do
543543
test "streams an enumerable with exits" do
544544
Process.flag(:trap_exit, true)
545545
assert 1..4 |> Task.async_stream(&exit/1, @opts) |> Enum.to_list ==
546-
[{:exit, 1, 1}, {:exit, 2, 2}, {:exit, 3, 3}, {:exit, 4, 4}]
546+
[exit: 1, exit: 2, exit: 3, exit: 4]
547547
refute_received {:EXIT, _, _}
548548
end
549549

@@ -586,7 +586,7 @@ defmodule TaskTest do
586586
test "with inner halt on failure" do
587587
Process.flag(:trap_exit, true)
588588
assert 1..8 |> Stream.take(4) |> Task.async_stream(&exit/1, @opts) |> Enum.to_list ==
589-
[{:exit, 1, 1}, {:exit, 2, 2}, {:exit, 3, 3}, {:exit, 4, 4}]
589+
[exit: 1, exit: 2, exit: 3, exit: 4]
590590
end
591591

592592
test "with inner halt and slowest first" do
@@ -602,7 +602,7 @@ defmodule TaskTest do
602602
test "with outer halt on failure" do
603603
Process.flag(:trap_exit, true)
604604
assert 1..8 |> Task.async_stream(&exit/1, @opts) |> Enum.take(4) ==
605-
[{:exit, 1, 1}, {:exit, 2, 2}, {:exit, 3, 3}, {:exit, 4, 4}]
605+
[exit: 1, exit: 2, exit: 3, exit: 4]
606606
end
607607

608608
test "with outer halt and slowest first" do
@@ -639,7 +639,7 @@ defmodule TaskTest do
639639
test "with :on_timeout set to :kill_task" do
640640
opts = Keyword.merge(@opts, on_timeout: :kill_task, timeout: 50)
641641
assert [100, 1, 100, 1] |> Task.async_stream(&sleep/1, opts) |> Enum.to_list() ==
642-
[{:exit, :timeout, 100}, {:ok, 1}, {:exit, :timeout, 100}, {:ok, 1}]
642+
[exit: :timeout, ok: 1, exit: :timeout, ok: 1]
643643
refute_received _
644644
end
645645
end

0 commit comments

Comments
 (0)