Skip to content

Commit 403a50f

Browse files
author
José Valim
committed
Ensure chunk/4 is haltable
Signed-off-by: José Valim <[email protected]>
1 parent 8e98cd4 commit 403a50f

File tree

2 files changed

+44
-26
lines changed

2 files changed

+44
-26
lines changed

lib/elixir/lib/stream.ex

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -156,18 +156,22 @@ defmodule Stream do
156156
@spec chunk(Enumerable.t, non_neg_integer, non_neg_integer, Enumerable.t | nil) :: Enumerable.t
157157
def chunk(enum, n, step, pad \\ nil) when n > 0 and step > 0 do
158158
limit = :erlang.max(n, step)
159-
lazy enum, {[], 0},
160-
fn(f1) -> R.chunk(n, step, limit, f1) end,
161-
fn(f1) -> &do_chunk(&1, n, pad, f1) end
159+
if is_nil(pad) do
160+
lazy enum, {[], 0}, fn(f1) -> R.chunk(n, step, limit, f1) end
161+
else
162+
lazy enum, {[], 0},
163+
fn(f1) -> R.chunk(n, step, limit, f1) end,
164+
&do_chunk(&1, n, pad, &2)
165+
end
162166
end
163167

164-
defp do_chunk(acc(h, {buffer, count} = old, t) = acc, n, pad, f1) do
165-
if is_nil(pad) || count == 0 do
166-
{:cont, acc}
167-
else
168-
buffer = :lists.reverse(buffer) ++ Enum.take(pad, n - count)
169-
cont_with_acc(f1, buffer, h, old, t)
170-
end
168+
defp do_chunk(acc(_, {_, 0}, _) = acc, _, _, _) do
169+
{:cont, acc}
170+
end
171+
172+
defp do_chunk(acc(h, {buffer, count} = old, t), n, pad, f1) do
173+
buffer = :lists.reverse(buffer) ++ Enum.take(pad, n - count)
174+
cont_with_acc(f1, buffer, h, old, t)
171175
end
172176

173177
@doc """
@@ -186,7 +190,7 @@ defmodule Stream do
186190
def chunk_by(enum, fun) do
187191
lazy enum, nil,
188192
fn(f1) -> R.chunk_by(fun, f1) end,
189-
fn(f1) -> &do_chunk_by(&1, f1) end
193+
&do_chunk_by(&1, &2)
190194
end
191195

192196
defp do_chunk_by(acc(_, nil, _) = acc, _f1) do
@@ -1127,12 +1131,12 @@ defmodule Stream do
11271131

11281132
@compile {:inline, lazy: 2, lazy: 3, lazy: 4}
11291133

1130-
defp lazy(%Stream{funs: funs} = lazy, fun),
1134+
defp lazy(%Stream{done: nil, funs: funs} = lazy, fun),
11311135
do: %{lazy | funs: [fun|funs] }
11321136
defp lazy(enum, fun),
11331137
do: %Stream{enum: enum, funs: [fun]}
11341138

1135-
defp lazy(%Stream{funs: funs, accs: accs} = lazy, acc, fun),
1139+
defp lazy(%Stream{done: nil, funs: funs, accs: accs} = lazy, acc, fun),
11361140
do: %{lazy | funs: [fun|funs], accs: [acc|accs] }
11371141
defp lazy(enum, acc, fun),
11381142
do: %Stream{enum: enum, funs: [fun], accs: [acc]}
@@ -1171,19 +1175,20 @@ defimpl Enumerable, for: Stream do
11711175
case reduce.({command, [acc|accs]}) do
11721176
{:suspended, [acc|accs], continuation} ->
11731177
{:suspended, acc, &do_each(continuation, done, accs, &1)}
1174-
{:halted, [acc|_]} ->
1175-
{:halted, acc}
1176-
{:done, [acc|_] = accs} ->
1177-
case done do
1178-
nil ->
1179-
{:done, acc}
1180-
{done, fun} ->
1181-
case done.(fun).(accs) do
1182-
{:cont, [acc|_]} -> {:done, acc}
1183-
{:halt, [acc|_]} -> {:halted, acc}
1184-
{:suspend, [acc|_]} -> {:suspended, acc, &({:done, elem(&1, 1)})}
1185-
end
1186-
end
1178+
{:halted, accs} ->
1179+
do_done {:halted, accs}, done
1180+
{:done, accs} ->
1181+
do_done {:done, accs}, done
1182+
end
1183+
end
1184+
1185+
defp do_done({reason, [acc|_]}, nil), do: {reason, acc}
1186+
defp do_done({reason, [acc|t]}, {done, fun}) do
1187+
[h|_] = Enum.reverse(t)
1188+
case done.([acc, h], fun) do
1189+
{:cont, [acc|_]} -> {reason, acc}
1190+
{:halt, [acc|_]} -> {:halted, acc}
1191+
{:suspend, [acc|_]} -> {:suspended, acc, &({:done, elem(&1, 1)})}
11871192
end
11881193
end
11891194
end

lib/elixir/test/elixir/stream_test.exs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,19 @@ defmodule StreamTest do
6767
assert Enum.zip(list, list) == Enum.zip(stream, stream)
6868
end
6969

70+
test "chunk/4 is haltable" do
71+
assert 1..10 |> Stream.take(6) |> Stream.chunk(4, 4, [7, 8]) |> Enum.to_list ==
72+
[[1, 2, 3, 4], [5, 6, 7, 8]]
73+
assert 1..10 |> Stream.take(6) |> Stream.chunk(4, 4, [7, 8]) |> Stream.take(3) |> Enum.to_list ==
74+
[[1, 2, 3, 4], [5, 6, 7, 8]]
75+
assert 1..10 |> Stream.take(6) |> Stream.chunk(4, 4, [7, 8]) |> Stream.take(2) |> Enum.to_list ==
76+
[[1, 2, 3, 4], [5, 6, 7, 8]]
77+
assert 1..10 |> Stream.take(6) |> Stream.chunk(4, 4, [7, 8]) |> Stream.take(1) |> Enum.to_list ==
78+
[[1, 2, 3, 4]]
79+
assert 1..6 |> Stream.take(6) |> Stream.chunk(4, 4, [7, 8]) |> Enum.to_list ==
80+
[[1, 2, 3, 4], [5, 6, 7, 8]]
81+
end
82+
7083
test "chunk_by/2" do
7184
stream = Stream.chunk_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1))
7285

0 commit comments

Comments
 (0)