Skip to content

Commit 2f8d3ae

Browse files
committed
Revert "Revert scanl types"
This reverts commit 0a7d7c6.
1 parent 9b27c4f commit 2f8d3ae

File tree

1 file changed

+68
-8
lines changed
  • src/Streamly/Internal/Data/Fold/Channel

1 file changed

+68
-8
lines changed

src/Streamly/Internal/Data/Fold/Channel/Type.hs

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ sendExceptionToDriver sv e = do
221221
data FromSVarState m a b =
222222
FromSVarRead (Channel m a b)
223223
| FromSVarLoop (Channel m a b) [ChildEvent a]
224+
| FromSVarStop
224225

225226
{-# INLINE_NORMAL fromInputQueue #-}
226227
fromInputQueue :: MonadIO m => Channel m a b -> D.Stream m a
@@ -246,6 +247,34 @@ fromInputQueue svar = D.Stream step (FromSVarRead svar)
246247
ChildStopChannel -> return D.Stop
247248
_ -> undefined
248249

250+
step _ FromSVarStop = undefined
251+
252+
{-# INLINE_NORMAL fromInputQueueRaw #-}
253+
fromInputQueueRaw :: MonadIO m => Channel m a b -> D.Stream m (ChildEvent a)
254+
fromInputQueueRaw svar = D.Stream step (FromSVarRead svar)
255+
256+
where
257+
258+
{-# INLINE_LATE step #-}
259+
step _ (FromSVarRead sv) = do
260+
list <- readInputQ sv
261+
-- Reversing the output is important to guarantee that we process the
262+
-- outputs in the same order as they were generated by the constituent
263+
-- streams.
264+
return $ D.Skip $ FromSVarLoop sv (Prelude.reverse list)
265+
266+
step _ (FromSVarLoop sv []) = return $ D.Skip $ FromSVarRead sv
267+
step _ (FromSVarLoop sv (ev : es)) = do
268+
case ev of
269+
-- XXX Separate input and output events. Input events cannot have
270+
-- Stop event and output events cannot have ChildStopChannel
271+
-- event.
272+
ChildYield a -> return $ D.Yield (ChildYield a) (FromSVarLoop sv es)
273+
ChildStopChannel -> return $ D.Yield ChildStopChannel FromSVarStop
274+
_ -> undefined
275+
276+
step _ FromSVarStop = pure $ D.Stop
277+
249278
{-# INLINE readInputQChan #-}
250279
readInputQChan :: Channel m a b -> IO ([ChildEvent a], Int)
251280
readInputQChan chan = do
@@ -330,8 +359,14 @@ newChannelWith outq outqDBell modifier f = do
330359
let f1 = Fold.rmapM (void . sendYieldToDriver chan) f
331360
in D.fold f1 $ fromInputQueue chan
332361

362+
data SendChannelRaw s a
363+
= SCREmptyBuffer s
364+
| SCRBuffered s a
365+
| SCRDrain
366+
333367
{-# INLINE scanToChannel #-}
334-
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Scanl m a ()
368+
scanToChannel ::
369+
MonadIO m => Channel m a b -> Scanl m a b -> Scanl m (ChildEvent a) ()
335370
scanToChannel chan (Scanl step initial extract final) =
336371
Scanl step1 initial1 extract1 final1
337372

@@ -343,24 +378,49 @@ scanToChannel chan (Scanl step initial extract final) =
343378
Fold.Partial s -> do
344379
b <- extract s
345380
void $ sendPartialToDriver chan b
346-
return $ Fold.Partial s
381+
return $ Fold.Partial (SCREmptyBuffer s)
347382
Fold.Done b ->
348383
Fold.Done <$> void (sendYieldToDriver chan b)
349384

350-
step1 st x = do
385+
step1 (SCREmptyBuffer st) (ChildYield x) =
386+
return $ Fold.Partial (SCRBuffered st (ChildYield x))
387+
388+
step1 (SCRBuffered st (ChildYield x)) (ChildYield x1) = do
351389
r <- step st x
352390
case r of
353391
Fold.Partial s -> do
354392
b <- extract s
355393
void $ sendPartialToDriver chan b
356-
return $ Fold.Partial s
357-
Fold.Done b ->
358-
Fold.Done <$> void (sendYieldToDriver chan b)
394+
return $ Fold.Partial (SCRBuffered s (ChildYield x1))
395+
Fold.Done b -> do
396+
-- Even if the original scan is Done, we don't end this
397+
-- concurrent fold here as we want to drain all the input in the
398+
-- input buffer.
399+
--
400+
-- This scan only ends when it receives a ChildStopChannel
401+
-- event.
402+
--
403+
void (sendYieldToDriver chan b)
404+
pure $ Fold.Partial SCRDrain
405+
step1 (SCRBuffered st (ChildYield x)) ChildStopChannel = do
406+
r <- step st x
407+
b <-
408+
case r of
409+
Fold.Partial s -> extract s
410+
Fold.Done b0 -> pure b0
411+
Fold.Done <$> void (sendYieldToDriver chan b)
412+
step1 SCRDrain ChildStopChannel = pure $ Fold.Done ()
413+
step1 SCRDrain _ = pure $ Fold.Partial SCRDrain
414+
step1 _ _ = error "scanToChannel: Unsupported constructor"
359415

360416
extract1 _ = return ()
361417

362418
-- XXX Should we not discard the result?
363-
final1 st = void (final st)
419+
final1 (SCREmptyBuffer st) = void (final st)
420+
-- XXX We are losing the input here.
421+
-- XXX Should we consume the input and finalize it instead?
422+
final1 (SCRBuffered st _val) = void (final st)
423+
final1 SCRDrain = pure ()
364424

365425
{-# INLINABLE newChannelWithScan #-}
366426
{-# SPECIALIZE newChannelWithScan ::
@@ -386,7 +446,7 @@ newChannelWithScan outq outqDBell modifier f = do
386446
where
387447

388448
{-# NOINLINE work #-}
389-
work chan = D.drain $ D.scanl (scanToChannel chan f) $ fromInputQueue chan
449+
work chan = D.drain $ D.scanl (scanToChannel chan f) $ fromInputQueueRaw chan
390450

391451
{-# INLINABLE newChannel #-}
392452
{-# SPECIALIZE newChannel ::

0 commit comments

Comments
 (0)