Skip to content

Commit 0f1200f

Browse files
committed
Add a note on async_stream+take
1 parent cec1a71 commit 0f1200f

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

lib/elixir/lib/task.ex

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,38 @@ defmodule Task do
491491
stream = Task.async_stream(collection, Mod, :expensive_fun, [], ordered: false)
492492
Stream.run(stream)
493493
494+
## Attention: async + take
495+
496+
Given items in an async stream are processed concurrently, doing
497+
`async_stream` followed by `Enum.take/2` may cause more items than
498+
requested to be processed. Let's see an example:
499+
500+
1..100
501+
|> Task.async_stream(fn i ->
502+
Process.sleep(100)
503+
IO.puts(to_string(i))
504+
end)
505+
|> Enum.take(10)
506+
507+
For a machine with 8 cores, the above will process 16 items instead
508+
of 10. The reason is that `async_stream/5` always have 8 elements
509+
processing at once. So by the time `Enum` says it got all elements
510+
it needed, there are still 6 elements left to be processed.
511+
512+
The solution here is to use `Stream.take/2` instead of `Enum.take/2`
513+
to filter elements before-hand:
514+
515+
1..100
516+
|> Stream.take(10)
517+
|> Task.async_stream(fn i ->
518+
Process.sleep(100)
519+
IO.puts(to_string(i))
520+
end)
521+
|> Enum.to_list()
522+
523+
If for some reason you cannot take the elements before hand,
524+
you can use `:max_concurrency` to limit how many elements
525+
may be over processed at the cost of reducing concurrency.
494526
"""
495527
@doc since: "1.4.0"
496528
@spec async_stream(Enumerable.t(), module, atom, [term], keyword) :: Enumerable.t()

0 commit comments

Comments
 (0)