Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions dunai/src/Data/MonadicStreamFunction/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,45 @@
-- i.e. that change the speed at which data enters or leaves the 'MSF'.
module Data.MonadicStreamFunction.Async where

-- base
import Control.Monad (ap)
import Data.Either (fromRight)

-- Internal imports
import Data.MonadicStreamFunction.InternalCore (MSF(MSF, unMSF))
import Control.Monad.Trans.MSF.Maybe
import Control.Monad.Trans.MSF.Except (step, once, MSFExcept (MSFExcept, runMSFExcept), listToMSFExcept, runExceptT)
import Data.MonadicStreamFunction (morphS, liftTransS, MSF)
import Data.MonadicStreamFunction.InternalCore (MSF(unMSF))
import Data.MonadicStreamFunction.Util (MStream)

-- | Execute an 'MSF' for an unknown number of steps.
newtype MSFAsync i m a = MSFAsync { unMSFAsync :: MSFExcept m i a () }

instance Monad m => Functor (MSFAsync i m) where
fmap f = MSFAsync . MSFExcept . fmap f . runMSFExcept . unMSFAsync

instance Monad m => Applicative (MSFAsync i m) where
pure a = MSFAsync $ step $ const $ return (a, ())
(<*>) = ap

instance Monad m => Monad (MSFAsync i m) where
MSFAsync msf0 >>= f = MSFAsync $ go $ runMSFExcept msf0
where
go msf = do
output <- once $ runExceptT <$> unMSF msf
case output of
Right (a, msf') -> do
unMSFAsync $ f a
go msf'
Left () -> return ()

-- |
-- Concatenates a monadic stream of lists to a monadic stream.
-- The stream of lists will be called exactly when new data is needed.
--
-- Example:
--
-- >>> let intstream = constS $ putStrLn "Enter a list of Ints:" >> readLn :: MStream IO [Int]
-- >>> let intstream = constM $ putStrLn "Enter a list of Ints:" >> readLn :: MStream IO [Int]
-- >>> reactimate $ concatS intstream >>> arrM print
-- Enter a list of Ints:
-- [1, 2, 33]
Expand Down Expand Up @@ -57,9 +85,10 @@ import Data.MonadicStreamFunction.Util (MStream)
-- "Yes"
-- ^CInterrupted.
concatS :: Monad m => MStream m [b] -> MStream m b
concatS msf = MSF $ \_ -> tick msf []
where
tick msf' (b:bs) = return (b, MSF $ \_ -> tick msf' bs)
tick msf' [] = do
(bs, msf'') <- unMSF msf' ()
tick msf'' bs
concatS msf = morphS (fmap (fromRight (error "concatS: internal error")) . runExceptT) $ runMSFExcept $ unMSFAsync $ liftAsync (liftTransS msf) >>= listToMSFAsync

listToMSFAsync :: Monad m => [a] -> MSFAsync i m a
listToMSFAsync = MSFAsync . listToMSFExcept

liftAsync :: Monad m => MSF (MaybeT m) a b -> MSFAsync a m b
liftAsync = MSFAsync . MSFExcept . maybeToExceptS