@@ -83,7 +83,7 @@ defmodule Stream do
83
83
like `Stream.cycle/1`, `Stream.unfold/2`, `Stream.resource/3` and more.
84
84
"""
85
85
86
- defrecord Lazy , enum: nil , funs: [ ] , accs: [ ]
86
+ defrecord Lazy , enum: nil , funs: [ ] , accs: [ ] , done: nil
87
87
88
88
defimpl Enumerable , for: Lazy do
89
89
@ compile :inline_list_funs
@@ -103,25 +103,36 @@ defmodule Stream do
103
103
{ :error , __MODULE__ }
104
104
end
105
105
106
- defp do_reduce ( Lazy [ enum : enum , funs: funs , accs: accs ] , acc , fun ) do
106
+ defp do_reduce ( Lazy [ enum : enum , funs: funs , accs: accs , done: done ] , acc , fun ) do
107
107
composed = :lists . foldl ( fn fun , acc -> fun . ( acc ) end , fun , funs )
108
- do_each ( & Enumerable . reduce ( enum , & 1 , composed ) , :lists . reverse ( accs ) , acc )
108
+ do_each ( & Enumerable . reduce ( enum , & 1 , composed ) , done && { done , fun } , :lists . reverse ( accs ) , acc )
109
109
end
110
110
111
- defp do_each ( _reduce , _accs , { :halt , acc } ) do
111
+ defp do_each ( _reduce , _done , _accs , { :halt , acc } ) do
112
112
{ :halted , acc }
113
113
end
114
114
115
- defp do_each ( reduce , accs , { :suspend , acc } ) do
116
- { :suspended , acc , & do_each ( reduce , accs , & 1 ) }
115
+ defp do_each ( reduce , done , accs , { :suspend , acc } ) do
116
+ { :suspended , acc , & do_each ( reduce , done , accs , & 1 ) }
117
117
end
118
118
119
- defp do_each ( reduce , accs , { :cont , acc } ) do
119
+ defp do_each ( reduce , done , accs , { :cont , acc } ) do
120
120
case reduce . ( { :cont , [ acc | accs ] } ) do
121
- { reason , [ acc | _ ] } ->
122
- { reason , acc }
123
121
{ :suspended , [ acc | accs ] , continuation } ->
124
- { :suspended , acc , & do_each ( continuation , accs , & 1 ) }
122
+ { :suspended , acc , & do_each ( continuation , done , accs , & 1 ) }
123
+ { :halted , [ acc | _ ] } ->
124
+ { :halted , acc }
125
+ { :done , [ acc | _ ] = accs } ->
126
+ case done do
127
+ nil ->
128
+ { :done , acc }
129
+ { done , fun } ->
130
+ case done . ( fun ) . ( accs ) do
131
+ { :cont , [ acc | _ ] } -> { :done , acc }
132
+ { :halt , [ acc | _ ] } -> { :halted , acc }
133
+ { :suspend , [ acc | _ ] } -> { :suspended , acc , & ( { :done , & 1 |> elem ( 1 ) } ) }
134
+ end
135
+ end
125
136
end
126
137
end
127
138
end
@@ -151,6 +162,33 @@ defmodule Stream do
151
162
152
163
## Transformers
153
164
165
+ @ doc """
166
+ Chunk the `enum` by buffering elements for which `fun` returns
167
+ the same value and only emit them when `fun` returns a new value
168
+ or the `enum` finishes,
169
+
170
+ ## Examples
171
+
172
+ iex> stream = Stream.chunks_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1))
173
+ iex> Enum.to_list(stream)
174
+ [[1], [2, 2], [3], [4, 4, 6], [7, 7]]
175
+
176
+ """
177
+ @ spec chunks_by ( Enumerable . t , ( element -> any ) ) :: Enumerable . t
178
+ def chunks_by ( enum , fun ) do
179
+ lazy enum , nil ,
180
+ fn ( f1 ) -> R . chunks_by ( fun , f1 ) end ,
181
+ fn ( f1 ) -> & do_chunks_by ( & 1 , f1 ) end
182
+ end
183
+
184
+ defp do_chunks_by ( acc ( _ , nil , _ ) = acc , _f1 ) do
185
+ { :cont , acc }
186
+ end
187
+
188
+ defp do_chunks_by ( acc ( h , { buffer , _ } , t ) , f1 ) do
189
+ cont_with_acc ( f1 , :lists . reverse ( buffer ) , h , nil , t )
190
+ end
191
+
154
192
@ doc """
155
193
Lazily drops the next `n` items from the enumerable.
156
194
@@ -720,23 +758,20 @@ defmodule Stream do
720
758
721
759
## Helpers
722
760
723
- @ compile { :inline , lazy: 2 , lazy: 3 }
761
+ @ compile { :inline , lazy: 2 , lazy: 3 , lazy: 4 }
724
762
725
- defp lazy ( enum , fun ) do
726
- case enum do
727
- Lazy [ funs : funs ] = lazy ->
728
- lazy . funs ( [ fun | funs ] )
729
- _ ->
730
- Lazy [ enum : enum , funs: [ fun ] , accs: [ ] ]
731
- end
732
- end
763
+ defp lazy ( Lazy [ funs : funs ] = lazy , fun ) ,
764
+ do: lazy . funs ( [ fun | funs ] )
765
+ defp lazy ( enum , fun ) ,
766
+ do: Lazy [ enum : enum , funs: [ fun ] ]
733
767
734
- defp lazy ( enum , acc , fun ) do
735
- case enum do
736
- Lazy [ funs : funs , accs: accs ] = lazy ->
737
- lazy . funs ( [ fun | funs ] ) . accs ( [ acc | accs ] )
738
- _ ->
739
- Lazy [ enum : enum , funs: [ fun ] , accs: [ acc ] ]
740
- end
741
- end
768
+ defp lazy ( Lazy [ funs : funs , accs: accs ] = lazy , acc , fun ) ,
769
+ do: lazy . funs ( [ fun | funs ] ) . accs ( [ acc | accs ] )
770
+ defp lazy ( enum , acc , fun ) ,
771
+ do: Lazy [ enum : enum , funs: [ fun ] , accs: [ acc ] ]
772
+
773
+ defp lazy ( Lazy [ done : nil , funs: funs , accs: accs ] = lazy , acc , fun , done ) ,
774
+ do: lazy . funs ( [ fun | funs ] ) . accs ( [ acc | accs ] ) . done ( done )
775
+ defp lazy ( enum , acc , fun , done ) ,
776
+ do: Lazy [ enum : enum , funs: [ fun ] , accs: [ acc ] , done: done ]
742
777
end
0 commit comments