@@ -66,9 +66,9 @@ private[flow] final class StreamSubscriber[F[_], A] private (
6666 * since they are always done on the effect run after the state update took place.
6767 * Meaning this should be correct if the Producer is well-behaved.
6868 */
69- private var inOnNextLoop : Boolean = _
69+ private var inOnNextLoop : Boolean = false
7070 private var buffer : Array [Any ] = null
71- private var index : Int = _
71+ private var index : Int = 0
7272
7373 /** Receives the next record from the upstream reactive-streams system. */
7474 override final def onNext (a : A ): Unit = {
@@ -164,10 +164,10 @@ private[flow] final class StreamSubscriber[F[_], A] private (
164164 state -> run {
165165 // We do the updates here,
166166 // to ensure they happen after we have secured the state.
167- inOnNextLoop = true
168- index = 1
169167 buffer = new Array (chunkSize)
170168 buffer(0 ) = a
169+ index = 1
170+ inOnNextLoop = true
171171 }
172172 }
173173
@@ -188,15 +188,18 @@ private[flow] final class StreamSubscriber[F[_], A] private (
188188 Idle (s) -> run {
189189 // We do the updates here,
190190 // to ensure they happen after we have secured the state.
191- cb.apply( Right ( Some ( Chunk .array(buffer))) )
191+ val chunk = Chunk .array(buffer)
192192 inOnNextLoop = false
193193 buffer = null
194+ cb.apply(Right (Some (chunk)))
194195 }
195196
196197 case state =>
197198 Failed (
198199 new InvalidStateException (operation = s " Received record [ ${buffer.last}] " , state)
199200 ) -> run {
201+ // We do the updates here,
202+ // to ensure they happen after we have secured the state.
200203 inOnNextLoop = false
201204 buffer = null
202205 }
@@ -206,19 +209,15 @@ private[flow] final class StreamSubscriber[F[_], A] private (
206209 case Uninitialized (Some (cb)) =>
207210 Terminal -> run {
208211 cb.apply(Left (ex))
209- // We do the updates here,
210- // to ensure they happen after we have secured the state.
211- inOnNextLoop = false
212- buffer = null
213212 }
214213
215214 case WaitingOnUpstream (cb, _) =>
216215 Terminal -> run {
217- cb.apply(Left (ex))
218216 // We do the updates here,
219217 // to ensure they happen after we have secured the state.
220218 inOnNextLoop = false
221219 buffer = null
220+ cb.apply(Left (ex))
222221 }
223222
224223 case _ =>
@@ -240,17 +239,21 @@ private[flow] final class StreamSubscriber[F[_], A] private (
240239
241240 case WaitingOnUpstream (cb, s) =>
242241 Terminal -> run {
242+ // We do the updates here,
243+ // to ensure they happen after we have secured the state.
243244 if (canceled) {
244245 s.cancel()
246+ inOnNextLoop = false
247+ buffer = null
245248 cb.apply(Right (None ))
246- } else if (index == 0 ) {
249+ } else if (buffer eq null ) {
250+ inOnNextLoop = false
247251 cb.apply(Right (None ))
248252 } else {
249- cb.apply(Right (Some (Chunk .array(buffer, offset = 0 , length = index))))
250- // We do the updates here,
251- // to ensure they happen after we have secured the state.
253+ val chunk = Chunk .array(buffer, offset = 0 , length = index)
252254 inOnNextLoop = false
253255 buffer = null
256+ cb.apply(Right (Some (chunk)))
254257 }
255258 }
256259
@@ -268,10 +271,6 @@ private[flow] final class StreamSubscriber[F[_], A] private (
268271 case Idle (s) =>
269272 WaitingOnUpstream (cb, s) -> run {
270273 s.request(chunkSize.toLong)
271- // We do the updates here,
272- // to ensure they happen after we have secured the state.
273- inOnNextLoop = false
274- index = 0
275274 }
276275
277276 case state @ Uninitialized (Some (otherCB)) =>
@@ -283,14 +282,14 @@ private[flow] final class StreamSubscriber[F[_], A] private (
283282
284283 case state @ WaitingOnUpstream (otherCB, s) =>
285284 Terminal -> run {
286- s.cancel()
287- val ex = Left (new InvalidStateException (operation = " Received request" , state))
288- otherCB.apply(ex)
289- cb.apply(ex)
290285 // We do the updates here,
291286 // to ensure they happen after we have secured the state.
292287 inOnNextLoop = false
293288 buffer = null
289+ s.cancel()
290+ val ex = Left (new InvalidStateException (operation = " Received request" , state))
291+ otherCB.apply(ex)
292+ cb.apply(ex)
294293 }
295294
296295 case Failed (ex) =>
0 commit comments