Skip to content

Commit 0727eaf

Browse files
author
José Valim
committed
Do not invoke after_fun in chunk_while if emitted element halts, closes #7112
1 parent cb5bfbc commit 0727eaf

File tree

3 files changed

+48
-18
lines changed

3 files changed

+48
-18
lines changed

lib/elixir/lib/stream.ex

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,16 +231,37 @@ defmodule Stream do
231231
def chunk_while(enum, acc, chunk_fun, after_fun) do
232232
lazy(
233233
enum,
234-
acc,
235-
fn f1 -> R.chunk_while(chunk_fun, f1) end,
236-
&after_chunk_while(&1, &2, after_fun)
234+
[acc | after_fun],
235+
fn f1 -> chunk_while_fun(chunk_fun, f1) end,
236+
&after_chunk_while/2
237237
)
238238
end
239239

240-
defp after_chunk_while(acc(h, acc, t), f1, after_fun) do
240+
defp chunk_while_fun(callback, fun) do
241+
fn entry, acc(head, [acc | after_fun], tail) ->
242+
case callback.(entry, acc) do
243+
{:cont, emit, acc} ->
244+
# If we emit an item and then we have to halt,
245+
# we need to disable the after_fun callback to
246+
# avoid emitting even more items.
247+
case next(fun, emit, [head | tail]) do
248+
{:halt, [head | tail]} -> {:halt, acc(head, [acc | &{:cont, &1}], tail)}
249+
{command, [head | tail]} -> {command, acc(head, [acc | after_fun], tail)}
250+
end
251+
252+
{:cont, acc} ->
253+
skip(acc(head, [acc | after_fun], tail))
254+
255+
{:halt, acc} ->
256+
{:halt, acc(head, [acc | after_fun], tail)}
257+
end
258+
end
259+
end
260+
261+
defp after_chunk_while(acc(h, [acc | after_fun], t), f1) do
241262
case after_fun.(acc) do
242-
{:cont, emit, acc} -> next_with_acc(f1, emit, h, acc, t)
243-
{:cont, acc} -> {:cont, acc(h, acc, t)}
263+
{:cont, emit, acc} -> next_with_acc(f1, emit, h, [acc | after_fun], t)
264+
{:cont, acc} -> {:cont, acc(h, [acc | after_fun], t)}
244265
end
245266
end
246267

lib/elixir/lib/stream/reducers.ex

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,6 @@ defmodule Stream.Reducers do
5555
chunk_by.(enumerable, nil, chunk_fun, after_fun)
5656
end
5757

58-
defmacro chunk_while(callback, fun \\ nil) do
59-
quote do
60-
fn entry, acc(head, acc, tail) ->
61-
case unquote(callback).(entry, acc) do
62-
{:cont, emit, acc} -> next_with_acc(unquote(fun), emit, head, acc, tail)
63-
{:cont, acc} -> skip(acc(head, acc, tail))
64-
{:halt, acc} -> {:halt, acc(head, acc, tail)}
65-
end
66-
end
67-
end
68-
end
69-
7058
defmacro dedup(callback, fun \\ nil) do
7159
quote do
7260
fn entry, acc(head, prev, tail) = acc ->

lib/elixir/test/elixir/stream_test.exs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,27 @@ defmodule StreamTest do
157157
[[5, 7, 9]]
158158
end
159159

160+
test "chunk_while/4 with inner halt" do
161+
chunk_fun = fn
162+
i, [] ->
163+
{:cont, [i]}
164+
165+
i, chunk ->
166+
if rem(i, 2) == 0 do
167+
{:cont, Enum.reverse(chunk), [i]}
168+
else
169+
{:cont, [i | chunk]}
170+
end
171+
end
172+
173+
after_fun = fn
174+
[] -> {:cont, []}
175+
chunk -> {:cont, Enum.reverse(chunk), []}
176+
end
177+
178+
assert Stream.chunk_while([1, 2, 3, 4, 5], [], chunk_fun, after_fun) |> Enum.at(0) == [1]
179+
end
180+
160181
test "concat/1" do
161182
stream = Stream.concat([1..3, [], [4, 5, 6], [], 7..9])
162183
assert is_function(stream)

0 commit comments

Comments
 (0)