Skip to content

Commit 6c1fe08

Browse files
ericmjjosevalim
authored andcommitted
Add :ordered option to async_stream (#6234)
1 parent b18160f commit 6c1fe08

File tree

5 files changed

+87
-30
lines changed

5 files changed

+87
-30
lines changed

lib/elixir/lib/task.ex

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,10 @@ defmodule Task do
344344
terminates the current process and a failure in the current process
345345
terminates all tasks.
346346
347-
When streamed, each task will emit `{:ok, val}` upon successful
348-
completion or `{:exit, val}` if the caller is trapping exits. Results
349-
are emitted in the same order as the original `enumerable`.
347+
When streamed, each task will emit `{:ok, value}` upon successful
348+
completion or `{:exit, reason, element}` if the caller is trapping
349+
exits, where `element` is the stream element. Results are emitted
350+
in the same order as the original `enumerable`.
350351
351352
The level of concurrency can be controlled via the `:max_concurrency`
352353
option and defaults to `System.schedulers_online/0`. A timeout
@@ -362,13 +363,18 @@ defmodule Task do
362363
363364
* `:max_concurrency` - sets the maximum number of tasks to run
364365
at the same time. Defaults to `System.schedulers_online/0`.
366+
* `:ordered` - whether the results should be returned in the same order
367+
as the input stream. This option is useful when you have large
368+
streams and don't want to buffer results before they are delivered.
369+
Defaults to `true`.
365370
* `:timeout` - the maximum amount of time (in milliseconds) each
366371
task is allowed to execute for. Defaults to `5000`.
367372
* `:on_timeout` - what do to when a task times out. The possible
368373
values are:
369374
* `:exit` (default) - the process that spawned the tasks exits.
370375
* `:kill_task` - the task that timed out is killed. The value
371-
emitted for that task is `{:exit, :timeout}`.
376+
emitted for that task is `{:exit, :timeout, element}`, where
377+
`element` is the element it timed out on.
372378
373379
## Example
374380

lib/elixir/lib/task/supervised.ex

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ defmodule Task.Supervised do
145145
def stream(enumerable, acc, reducer, mfa, options, spawn) do
146146
next = &Enumerable.reduce(enumerable, &1, fn x, acc -> {:suspend, [x | acc]} end)
147147
max_concurrency = Keyword.get(options, :max_concurrency, System.schedulers_online)
148+
ordered? = Keyword.get(options, :ordered, true)
148149
timeout = Keyword.get(options, :timeout, 5000)
149150
on_timeout = Keyword.get(options, :on_timeout, :exit)
150151
parent = self()
@@ -168,6 +169,7 @@ defmodule Task.Supervised do
168169
reducer: reducer,
169170
monitor_pid: monitor_pid,
170171
monitor_ref: monitor_ref,
172+
ordered: ordered?,
171173
timeout: timeout,
172174
on_timeout: on_timeout,
173175
}
@@ -200,44 +202,51 @@ defmodule Task.Supervised do
200202
when max == 0
201203
when next == :done do
202204
%{monitor_pid: monitor_pid, monitor_ref: monitor_ref,
203-
timeout: timeout, on_timeout: on_timeout} = config
205+
timeout: timeout, on_timeout: on_timeout, ordered: ordered?} = config
204206

205207
receive do
206208
# The task at position "position" replied with "value". We put the
207209
# response in the "waiting" map and do nothing, since we'll only act on
208210
# this response when the replying task dies (we'll notice in the :down
209211
# message).
210-
{{^monitor_ref, position}, value} ->
211-
%{^position => {pid, :running}} = waiting
212-
waiting = Map.put(waiting, position, {pid, {:ok, value}})
212+
{{^monitor_ref, position}, reply} ->
213+
%{^position => {pid, :running, element}} = waiting
214+
waiting = Map.put(waiting, position, {pid, {:ok, reply}, element})
213215
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next, config)
214216

215217
# The task at position "position" died for some reason. We check if it
216218
# replied already (then the death is peaceful) or if it's still running
217219
# (then the reply from this task will be {:exit, reason}). This message is
218220
# sent to us by the monitor process, not by the dying task directly.
219221
{kind, {^monitor_ref, position}, reason} when kind in [:down , :timed_out] ->
220-
waiting =
222+
result =
221223
case waiting do
222224
# If the task replied, we don't care whether it went down for timeout
223225
# or for normal reasons.
224-
%{^position => {_, {:ok, _} = ok}} ->
225-
Map.put(waiting, position, {nil, ok})
226+
%{^position => {_, {:ok, _} = ok, _}} ->
227+
ok
226228
# If the task exited by itself before replying, we emit {:exit, reason}.
227-
%{^position => {_, :running}} when kind == :down ->
228-
Map.put(waiting, position, {nil, {:exit, reason}})
229+
%{^position => {_, :running, element}} when kind == :down ->
230+
{:exit, reason, element}
229231
# If the task timed out before replying, we either exit (on_timeout: :exit)
230232
# or emit {:exit, :timeout} (on_timeout: :kill_task) (note the task is already
231233
# dead at this point).
232-
%{^position => {_, :running}} when kind == :timed_out ->
234+
%{^position => {_, :running, element}} when kind == :timed_out ->
233235
if on_timeout == :exit do
234236
stream_cleanup_inbox(monitor_pid, monitor_ref)
235-
exit({:timeout, {__MODULE__, :stream, [timeout]}})
237+
exit({:timeout, {__MODULE__, :stream, [element, timeout]}})
236238
else
237-
Map.put(waiting, position, {nil, {:exit, :timeout}})
239+
{:exit, :timeout, element}
238240
end
239241
end
240-
stream_deliver({:cont, acc}, max + 1, spawned, delivered, waiting, next, config)
242+
243+
if ordered? do
244+
waiting = Map.put(waiting, position, {:done, result})
245+
stream_deliver({:cont, acc}, max + 1, spawned, delivered, waiting, next, config)
246+
else
247+
pair = deliver_now(result, acc, next, config)
248+
stream_reduce(pair, max + 1, spawned, delivered + 1, waiting, next, config)
249+
end
241250

242251
# The monitor process died. We just cleanup the messages from the monitor
243252
# process and exit.
@@ -269,6 +278,20 @@ defmodule Task.Supervised do
269278
end
270279
end
271280

281+
defp deliver_now(reply, acc, next, config) do
282+
%{reducer: reducer, monitor_pid: monitor_pid,
283+
monitor_ref: monitor_ref, timeout: timeout} = config
284+
try do
285+
reducer.(reply, acc)
286+
catch
287+
kind, reason ->
288+
stacktrace = System.stacktrace()
289+
is_function(next) && next.({:halt, []})
290+
stream_close(monitor_pid, monitor_ref, timeout)
291+
:erlang.raise(kind, reason, stacktrace)
292+
end
293+
end
294+
272295
defp stream_deliver({:suspend, acc}, max, spawned, delivered, waiting, next, config) do
273296
continuation = &stream_deliver(&1, max, spawned, delivered, waiting, next, config)
274297
{:suspended, acc, continuation}
@@ -283,7 +306,7 @@ defmodule Task.Supervised do
283306
monitor_ref: monitor_ref, timeout: timeout} = config
284307

285308
case waiting do
286-
%{^delivered => {nil, reply}} ->
309+
%{^delivered => {:done, reply}} ->
287310
try do
288311
reducer.(reply, acc)
289312
catch
@@ -341,7 +364,7 @@ defmodule Task.Supervised do
341364
receive do
342365
{:spawned, {^monitor_ref, ^spawned}, pid} ->
343366
send(pid, {self(), {monitor_ref, spawned}})
344-
Map.put(waiting, spawned, {pid, :running})
367+
Map.put(waiting, spawned, {pid, :running, value})
345368
{:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
346369
stream_cleanup_inbox(monitor_pid, monitor_ref)
347370
exit({reason, {__MODULE__, :stream, [timeout]}})

lib/elixir/lib/task/supervisor.ex

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,10 @@ defmodule Task.Supervisor do
135135
own task. The tasks will be spawned under the given `supervisor` and
136136
linked to the current process, similarly to `async/4`.
137137
138-
When streamed, each task will emit `{:ok, val}` upon successful
139-
completion or `{:exit, val}` if the caller is trapping exits. Results
140-
are emitted in the same order as the original `enumerable`.
138+
When streamed, each task will emit `{:ok, value}` upon successful
139+
completion or `{:exit, reason, element}` if the caller is trapping
140+
exits, where `element` is the stream element. Results are emitted
141+
in the same order as the original `enumerable`.
141142
142143
The level of concurrency can be controlled via the `:max_concurrency`
143144
option and defaults to `System.schedulers_online/0`. A timeout
@@ -152,9 +153,19 @@ defmodule Task.Supervisor do
152153
153154
* `:max_concurrency` - sets the maximum number of tasks to run
154155
at the same time. Defaults to `System.schedulers_online/0`.
156+
* `:ordered` - whether the results should be returned in the same order
157+
as the input stream. This option is useful when you have large
158+
streams and don't want to buffer results before they are delivered.
159+
Defaults to `true`.
155160
* `:timeout` - the maximum amount of time to wait (in milliseconds)
156161
without receiving a task reply (across all running tasks).
157162
Defaults to `5000`.
163+
* `:on_timeout` - what do to when a task times out. The possible
164+
values are:
165+
* `:exit` (default) - the process that spawned the tasks exits.
166+
* `:kill_task` - the task that timed out is killed. The value
167+
emitted for that task is `{:exit, :timeout, element}`, where
168+
`element` is the element it timed out on.
158169
159170
## Examples
160171

lib/elixir/test/elixir/task/supervisor_test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,9 @@ defmodule Task.SupervisorTest do
219219
test "streams an enumerable with exits", %{supervisor: supervisor} do
220220
Process.flag(:trap_exit, true)
221221
assert supervisor
222-
|> Task.Supervisor.async_stream(1..4, &exit/1, @opts)
222+
|> Task.Supervisor.async_stream(1..4, &exit(Integer.to_string(&1)), @opts)
223223
|> Enum.to_list ==
224-
[exit: 1, exit: 2, exit: 3, exit: 4]
224+
[{:exit, "1", 1}, {:exit, "2", 2}, {:exit, "3", 3}, {:exit, "4", 4}]
225225
end
226226

227227
test "shuts down unused tasks", %{supervisor: supervisor} do
@@ -277,7 +277,7 @@ defmodule Task.SupervisorTest do
277277
assert supervisor
278278
|> Task.Supervisor.async_stream_nolink(1..4, &exit/1, @opts)
279279
|> Enum.to_list ==
280-
[exit: 1, exit: 2, exit: 3, exit: 4]
280+
[{:exit, 1, 1}, {:exit, 2, 2}, {:exit, 3, 3}, {:exit, 4, 4}]
281281
end
282282

283283
test "shuts down unused tasks", %{supervisor: supervisor} do

lib/elixir/test/elixir/task_test.exs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,24 @@ defmodule TaskTest do
492492
describe "async_stream/2" do
493493
test "timeout" do
494494
assert catch_exit([:infinity] |> Task.async_stream(&sleep/1, [timeout: 0]) |> Enum.to_list) ==
495-
{:timeout, {Task.Supervised, :stream, [0]}}
495+
{:timeout, {Task.Supervised, :stream, [:infinity, 0]}}
496+
refute_received _
497+
end
498+
499+
test "streams an enumerable with ordered: false" do
500+
opts = [max_concurrency: 1, ordered: false]
501+
assert 4..1 |> Task.async_stream(&sleep(&1 * 100), opts) |> Enum.to_list ==
502+
[ok: 400, ok: 300, ok: 200, ok: 100]
503+
504+
opts = [max_concurrency: 4, ordered: false]
505+
assert 4..1 |> Task.async_stream(&sleep(&1 * 100), opts) |> Enum.to_list ==
506+
[ok: 100, ok: 200, ok: 300, ok: 400]
507+
end
508+
509+
test "streams an enumerable with ordered: false, on_timeout: :kill_task" do
510+
opts = [max_concurrency: 4, ordered: false, on_timeout: :kill_task, timeout: 50]
511+
assert [100, 1, 100, 1] |> Task.async_stream(&sleep/1, opts) |> Enum.to_list() ==
512+
[{:ok, 1}, {:ok, 1}, {:exit, :timeout, 100}, {:exit, :timeout, 100}]
496513
refute_received _
497514
end
498515
end
@@ -526,7 +543,7 @@ defmodule TaskTest do
526543
test "streams an enumerable with exits" do
527544
Process.flag(:trap_exit, true)
528545
assert 1..4 |> Task.async_stream(&exit/1, @opts) |> Enum.to_list ==
529-
[exit: 1, exit: 2, exit: 3, exit: 4]
546+
[{:exit, 1, 1}, {:exit, 2, 2}, {:exit, 3, 3}, {:exit, 4, 4}]
530547
refute_received {:EXIT, _, _}
531548
end
532549

@@ -569,7 +586,7 @@ defmodule TaskTest do
569586
test "with inner halt on failure" do
570587
Process.flag(:trap_exit, true)
571588
assert 1..8 |> Stream.take(4) |> Task.async_stream(&exit/1, @opts) |> Enum.to_list ==
572-
[exit: 1, exit: 2, exit: 3, exit: 4]
589+
[{:exit, 1, 1}, {:exit, 2, 2}, {:exit, 3, 3}, {:exit, 4, 4}]
573590
end
574591

575592
test "with inner halt and slowest first" do
@@ -585,7 +602,7 @@ defmodule TaskTest do
585602
test "with outer halt on failure" do
586603
Process.flag(:trap_exit, true)
587604
assert 1..8 |> Task.async_stream(&exit/1, @opts) |> Enum.take(4) ==
588-
[exit: 1, exit: 2, exit: 3, exit: 4]
605+
[{:exit, 1, 1}, {:exit, 2, 2}, {:exit, 3, 3}, {:exit, 4, 4}]
589606
end
590607

591608
test "with outer halt and slowest first" do
@@ -622,7 +639,7 @@ defmodule TaskTest do
622639
test "with :on_timeout set to :kill_task" do
623640
opts = Keyword.merge(@opts, on_timeout: :kill_task, timeout: 50)
624641
assert [100, 1, 100, 1] |> Task.async_stream(&sleep/1, opts) |> Enum.to_list() ==
625-
[exit: :timeout, ok: 1, exit: :timeout, ok: 1]
642+
[{:exit, :timeout, 100}, {:ok, 1}, {:exit, :timeout, 100}, {:ok, 1}]
626643
refute_received _
627644
end
628645
end

0 commit comments

Comments
 (0)