Skip to content

Commit 7f014ca

Browse files
committed
Fix a bug in parDistributeScan and block on buffer in sendToWorker_
1 parent c192480 commit 7f014ca

File tree

6 files changed

+79
-5
lines changed

6 files changed

+79
-5
lines changed

src/Streamly/Internal/Data/Fold/Channel/Type.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,10 +510,12 @@ sendToWorker_ chan a = go
510510
(inputItemDoorBell chan)
511511
(ChildYield a)
512512
else do
513-
error "sendToWorker_: No space available in the buffer"
513+
-- XXX: Is there a reason for us to error out here instead of
514+
-- blocking?
515+
-- error "sendToWorker_: No space available in the buffer"
514516
-- Block for space
515-
-- () <- liftIO $ takeMVar (inputSpaceDoorBell chan)
516-
-- go
517+
() <- liftIO $ takeMVar (inputSpaceDoorBell chan)
518+
go
517519

518520
-- XXX Cleanup the fold if the stream is interrupted. Add a GC hook.
519521

src/Streamly/Internal/Data/Fold/Concurrent.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ parDistributeScan cfg getFolds (Stream sstep state) =
401401
then do
402402
liftIO $ takeMVar db
403403
return $ Skip (ScanDrain q db running)
404-
else return $ Yield outputs (ScanDrain q db running)
404+
else return $ Yield outputs ScanStop
405405
step _ ScanStop = return Stop
406406

407407
{-# ANN type DemuxState Fuse #-}

src/Streamly/Internal/Data/Scanl/Concurrent.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ parDistributeScan cfg getFolds (Stream sstep state) =
242242
then do
243243
liftIO $ takeMVar db
244244
return $ Skip (ScanDrain q db running)
245-
else return $ Yield outputs (ScanDrain q db running)
245+
else return $ Yield outputs ScanStop
246+
246247
step _ ScanStop = return Stop
247248

248249
{-# ANN type DemuxState Fuse #-}

streamly.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ extra-source-files:
136136
test/Streamly/Test/Prelude/*.hs
137137
test/Streamly/Test/Unicode/*.hs
138138
test/Streamly/Test/Serialize/*.hs
139+
test/Streamly/Test/Data/Scanl/*.hs
139140
test/Streamly/Test/Data/Fold/*.hs
140141
test/lib/Streamly/Test/Common.hs
141142
test/lib/Streamly/Test/Prelude/Common.hs
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
-- |
2+
-- Module : Streamly.Test.Data.Scanl.Concurrent
3+
-- Copyright : (c) 2020 Composewell Technologies
4+
--
5+
-- License : BSD-3-Clause
6+
-- Maintainer : [email protected]
7+
-- Stability : experimental
8+
-- Portability : GHC
9+
10+
module Streamly.Test.Data.Scanl.Concurrent (main) where
11+
12+
import Control.Concurrent (threadDelay)
13+
import Data.Function ( (&) )
14+
import Data.IORef (newIORef, atomicModifyIORef')
15+
import Data.List (sort)
16+
import Streamly.Data.Scanl (Scanl)
17+
import Test.Hspec as H
18+
19+
import qualified Streamly.Data.Fold as Fold
20+
import qualified Streamly.Data.Stream as Stream
21+
import qualified Streamly.Data.Stream.Prelude as Stream
22+
import qualified Streamly.Internal.Data.Scanl as Scanl
23+
import qualified Streamly.Internal.Data.Scanl.Prelude as Scanl
24+
25+
moduleName :: String
26+
moduleName = "Data.Scanl.Concurrent"
27+
28+
---------------------------------------------------------------------------
29+
-- Main
30+
---------------------------------------------------------------------------
31+
32+
evenScan :: Scanl IO Int (Maybe Int)
33+
evenScan =
34+
Scanl.filtering even
35+
& Scanl.lmapM (\x -> threadDelay 100 >> pure x)
36+
37+
oddScan :: Scanl IO Int (Maybe Int)
38+
oddScan =
39+
Scanl.filtering odd
40+
& Scanl.lmapM (\x -> threadDelay 100 >> pure x)
41+
42+
parDistributeScanTest :: (Stream.Config -> Stream.Config) -> IO ()
43+
parDistributeScanTest concOpts = do
44+
ref <- newIORef [evenScan, oddScan]
45+
let gen = atomicModifyIORef' ref (\xs -> ([], xs))
46+
inpStream = Stream.enumerateFromTo 1 10_000
47+
res1 <-
48+
Scanl.parDistributeScan concOpts gen inpStream
49+
& Stream.concatMap Stream.fromList
50+
& Stream.catMaybes
51+
& Stream.fold Fold.toList
52+
sort res1 `shouldBe` [1..9999]
53+
54+
main :: IO ()
55+
main = hspec
56+
$ H.parallel
57+
#ifdef COVERAGE_BUILD
58+
$ modifyMaxSuccess (const 10)
59+
#endif
60+
$ describe moduleName $ do
61+
it "parDistributeScan (maxBuffer 1)"
62+
$ parDistributeScanTest (Stream.maxBuffer 1)

test/streamly-tests.cabal

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,14 @@ test-suite Data.RingArray
304304
main-is: Streamly/Test/Data/RingArray.hs
305305
ghc-options: -main-is Streamly.Test.Data.RingArray.main
306306

307+
test-suite Data.Scanl.Concurrent
308+
import: test-options
309+
type: exitcode-stdio-1.0
310+
main-is: Streamly/Test/Data/Scanl/Concurrent.hs
311+
ghc-options: -main-is Streamly.Test.Data.Scanl.Concurrent.main
312+
if flag(use-streamly-core)
313+
buildable: False
314+
307315
-- XXX Rename to MutByteArray
308316
test-suite Data.Serialize
309317
import: test-options

0 commit comments

Comments
 (0)