Skip to content

Commit 3426ed0

Browse files
committed
Introduce FoldEOF and tackle the cleanup after the scan terminates
1 parent 0a7d7c6 commit 3426ed0

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

src/Streamly/Internal/Data/Fold/Channel/Type.hs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ data OutEvent b =
6666
FoldException ThreadId SomeException
6767
| FoldPartial b
6868
| FoldDone ThreadId b
69+
| FoldEOF ThreadId
6970

7071
-- | The fold driver thread queues the input of the fold in the 'inputQueue'
7172
-- The driver rings the doorbell when the queue transitions from empty to
@@ -212,6 +213,11 @@ sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m ()
212213
sendPartialToDriver sv res = liftIO $ do
213214
void $ sendToDriver sv (FoldPartial res)
214215

216+
sendEOFToDriver :: MonadIO m => Channel m a b -> m ()
217+
sendEOFToDriver sv = liftIO $ do
218+
tid <- myThreadId
219+
void $ sendToDriver sv (FoldEOF tid)
220+
215221
{-# NOINLINE sendExceptionToDriver #-}
216222
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
217223
sendExceptionToDriver sv e = do
@@ -331,9 +337,9 @@ newChannelWith outq outqDBell modifier f = do
331337
in D.fold f1 $ fromInputQueue chan
332338

333339
{-# INLINE scanToChannel #-}
334-
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Scanl m a ()
340+
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Fold m a ()
335341
scanToChannel chan (Scanl step initial extract final) =
336-
Scanl step1 initial1 extract1 final1
342+
Fold step1 initial1 extract1 final1
337343

338344
where
339345

@@ -386,7 +392,10 @@ newChannelWithScan outq outqDBell modifier f = do
386392
where
387393

388394
{-# NOINLINE work #-}
389-
work chan = D.drain $ D.scanl (scanToChannel chan f) $ fromInputQueue chan
395+
work chan = do
396+
(_, next) <- D.foldBreak (scanToChannel chan f) (fromInputQueue chan)
397+
sendEOFToDriver chan
398+
D.drain next
390399

391400
{-# INLINABLE newChannel #-}
392401
{-# SPECIALIZE newChannel ::
@@ -441,6 +450,7 @@ checkFoldStatus sv = do
441450
case ev of
442451
FoldException _ e -> throwM e
443452
FoldDone _ b -> return (Just b)
453+
FoldEOF _ -> return Nothing
444454
FoldPartial _ -> undefined
445455

446456
{-# INLINE isBufferAvailable #-}

src/Streamly/Internal/Data/Scanl/Concurrent.hs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ parDistributeScan cfg getFolds (Stream sstep state) =
190190
Stream.fold (spanChans tid) (Stream.fromList chans)
191191
Prelude.mapM_ (finalize . fst) chToClose
192192
processOutputs ch xs (b:done)
193+
FoldEOF tid -> do
194+
-- We have to send ChildStopChannel to all the folds
195+
-- that are done to stop the manager fold.
196+
(ch, chToClose) <-
197+
Stream.fold (spanChans tid) (Stream.fromList chans)
198+
Prelude.mapM_ (finalize . fst) chToClose
199+
processOutputs ch xs done
193200
FoldPartial b ->
194201
processOutputs chans xs (b:done)
195202

0 commit comments

Comments
 (0)