diff --git a/accelerate-llvm-native/src/Data/Array/Accelerate/LLVM/Native/Execute/Async.hs b/accelerate-llvm-native/src/Data/Array/Accelerate/LLVM/Native/Execute/Async.hs index a951d358a..2a41f5b9b 100644 --- a/accelerate-llvm-native/src/Data/Array/Accelerate/LLVM/Native/Execute/Async.hs +++ b/accelerate-llvm-native/src/Data/Array/Accelerate/LLVM/Native/Execute/Async.hs @@ -73,7 +73,6 @@ data Future a = Future {-# UNPACK #-} !(IORef (IVar a)) data IVar a = Full !a | Blocked !(Seq (a -> IO ())) - | Empty instance Async Native where type FutureR Native = Future @@ -82,7 +81,7 @@ instance Async Native where {-# INLINE new #-} {-# INLINE newFull #-} - new = Future <$> liftIO (newIORef Empty) + new = Future <$> liftIO (newIORef (Blocked Seq.empty)) newFull v = Future <$> liftIO (newIORef (Full v)) {-# INLINE fork #-} @@ -95,7 +94,6 @@ instance Async Native where callCC $ \k -> do native <- gets llvmTarget next <- liftIO . atomicModifyIORef' ref $ \case - Empty -> (Blocked (Seq.singleton (evalParIO native . k)), reschedule) Blocked ks -> (Blocked (ks Seq.|> evalParIO native . k), reschedule) Full a -> (Full a, return a) next @@ -108,6 +106,21 @@ instance Async Native where {-# INLINE liftPar #-} liftPar = Par . lift + {-# INLINE statusHandle #-} + statusHandle (Future ref) = do + emptyFut@(Future emptyIVar) <- new + fullFut <- newFull () + liftIO $ atomicModifyIORef' ref $ \case + Blocked ks -> (Blocked (ks Seq.|> const (writeIORef emptyIVar (Full ()))), emptyFut) + Full v -> (Full v, fullFut) + + {-# INLINE poll #-} + poll (Future ref) = do + ivar <- liftIO $ readIORef ref + case ivar of + Full v -> return (Just v) + _ -> return Nothing + -- | Evaluate a continuation -- {-# INLINE evalParIO #-} @@ -122,7 +135,6 @@ evalParIO native@Native{} work = putIO :: HasCallStack => Workers -> Future a -> a -> IO () putIO workers (Future ref) v = do ks <- atomicModifyIORef' ref $ \case - Empty -> (Full v, Seq.empty) Blocked ks -> (Full v, ks) _ -> internalError "multiple put" -- diff --git a/accelerate-llvm-native/src/Language/Haskell/TH/Extra.hs b/accelerate-llvm-native/src/Language/Haskell/TH/Extra.hs deleted file mode 120000 index 8554157f4..000000000 --- a/accelerate-llvm-native/src/Language/Haskell/TH/Extra.hs +++ /dev/null @@ -1 +0,0 @@ -../../../../../accelerate-llvm/src/Language/Haskell/TH/Extra.hs \ No newline at end of file diff --git a/accelerate-llvm-ptx/accelerate-llvm-ptx.cabal b/accelerate-llvm-ptx/accelerate-llvm-ptx.cabal index 60ea48748..0d4f19cf4 100644 --- a/accelerate-llvm-ptx/accelerate-llvm-ptx.cabal +++ b/accelerate-llvm-ptx/accelerate-llvm-ptx.cabal @@ -101,6 +101,7 @@ Library , deepseq >= 1.3 , directory >= 1.0 , dlist >= 0.6 + , exceptions >= 0.10 , file-embed >= 0.0.8 , filepath >= 1.0 , formatting >= 7.0 diff --git a/accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Array/Prim.hs b/accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Array/Prim.hs index 2b901dc44..820bc3d3f 100644 --- a/accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Array/Prim.hs +++ b/accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Array/Prim.hs @@ -53,6 +53,7 @@ import qualified Data.Array.Accelerate.LLVM.PTX.Debug as Debug import qualified Foreign.CUDA.Driver as CUDA import qualified Foreign.CUDA.Driver.Stream as CUDA +import Control.Concurrent.MVar import Control.Monad import Control.Monad.Reader import Data.IORef @@ -346,11 +347,11 @@ nonblocking !stream !action = do ready <- liftIO (query event) if ready then do - future <- Future <$> liftIO (newIORef (Full result)) + future <- Future <$> liftIO (newMVar (Full result)) return (Nothing, future) else do - future <- Future <$> liftIO (newIORef (Pending event Nothing result)) + future <- Future <$> liftIO (newMVar (Pending event Nothing [] result)) return (Just event, future) {-# INLINE withLifetime #-} diff --git a/accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Execute/Async.hs b/accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Execute/Async.hs index 9a70619b8..8e729c132 100644 --- a/accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Execute/Async.hs +++ b/accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Execute/Async.hs @@ -6,6 +6,7 @@ {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeSynonymInstances #-} {-# OPTIONS_GHC -fno-warn-orphans #-} +{-# LANGUAGE TupleSections #-} -- | -- Module : Data.Array.Accelerate.LLVM.PTX.Execute.Async -- Copyright : [2014..2020] The Accelerate Team @@ -36,9 +37,10 @@ import Data.Array.Accelerate.LLVM.PTX.Link.Object ( FunctionTa import qualified Data.Array.Accelerate.LLVM.PTX.Execute.Event as Event import qualified Data.Array.Accelerate.LLVM.PTX.Execute.Stream as Stream +import Control.Concurrent.MVar +import Control.Monad.Catch import Control.Monad.Reader import Control.Monad.State -import Data.IORef -- | Evaluate a parallel computation @@ -63,24 +65,24 @@ ptxKernel = snd -- Implementation -- -------------- -data Future a = Future {-# UNPACK #-} !(IORef (IVar a)) +data Future a = Future {-# UNPACK #-} !(MVar (IVar a)) data IVar a = Full !a - | Pending {-# UNPACK #-} !Event !(Maybe (Lifetime FunctionTable)) !a - | Empty + | Pending {-# UNPACK #-} !Event !(Maybe (Lifetime FunctionTable)) ![Future ()] !a + | Empty ![Future ()] instance Async PTX where type FutureR PTX = Future newtype Par PTX a = Par { runPar :: ReaderT ParState (LLVM PTX) a } - deriving ( Functor, Applicative, Monad, MonadIO, MonadReader ParState, MonadState PTX ) + deriving ( Functor, Applicative, Monad, MonadIO, MonadReader ParState, MonadState PTX, MonadThrow, MonadCatch, MonadMask ) {-# INLINEABLE new #-} {-# INLINEABLE newFull #-} - new = Future <$> liftIO (newIORef Empty) - newFull v = Future <$> liftIO (newIORef (Full v)) + new = Future <$> liftIO (newMVar (Empty [])) + newFull v = Future <$> liftIO (newMVar (Full v)) {-# INLINEABLE spawn #-} spawn m = do @@ -91,7 +93,7 @@ instance Async PTX where {-# INLINEABLE fork #-} fork m = do - s' <- liftPar (Stream.create) + s' <- liftPar Stream.create () <- local (const (s', Nothing)) m liftIO (Stream.destroy s') @@ -104,11 +106,17 @@ instance Async PTX where stream <- asks ptxStream kernel <- asks ptxKernel event <- liftPar (Event.waypoint stream) - ready <- liftIO (Event.query event) - liftIO . modifyIORef' ref $ \case - Empty -> if ready then Full v - else Pending event kernel v - _ -> internalError "multiple put" + liftIO $ do + ready <- Event.query event + ivar <- readMVar ref + case ivar of + Empty statusHandles -> + if ready then do + modifyMVar_ ref $ const $ pure $ Full v + signalCompletion statusHandles + else + modifyMVar_ ref $ const $ pure $ Pending event kernel statusHandles v + _ -> internalError "multiple put" -- Get the value of Future. Since the actual cross-stream synchronisation -- happens on the device, we should never have to block/reschedule the main @@ -116,24 +124,20 @@ instance Async PTX where -- has gone wrong. -- {-# INLINEABLE get #-} - get (Future ref) = do + get fut@(Future ref) = do stream <- asks ptxStream liftIO $ do - ivar <- readIORef ref + ivar <- readMVar ref case ivar of Full v -> return v - Pending event k v -> do + Pending event _ _ v -> do ready <- Event.query event - if ready - then do - writeIORef ref (Full v) - case k of - Just f -> touchLifetime f - Nothing -> return () - else - Event.after event stream - return v - Empty -> internalError "blocked on an IVar" + if ready then + completePending fut + else do + Event.after event stream + return v + Empty _ -> internalError "blocked on an IVar" {-# INLINEABLE block #-} block = liftIO . wait @@ -141,22 +145,59 @@ instance Async PTX where {-# INLINE liftPar #-} liftPar = Par . lift + {-# INLINE statusHandle #-} + + statusHandle (Future ref) = + liftIO $ modifyMVar ref $ \case + Full v -> (Full v,) . Future <$> newMVar (Full ()) + Empty statusHandles -> do + emptyFut <- Future <$> newMVar (Empty []) + pure (Empty (emptyFut:statusHandles), emptyFut) + Pending e k statusHandles v -> do + pendingFut <- Future <$> newMVar (Pending e k [] ()) + pure (Pending e k (pendingFut:statusHandles) v, pendingFut) + + {-# INLINE poll #-} + + poll fut@(Future ref) = liftIO $ do + ivar <- readMVar ref + case ivar of + Full v -> + return (Just v) + Pending event _ _ _ -> do + ready <- Event.query event + if ready then + Just <$> completePending fut + else + pure Nothing + _ -> + return Nothing -- | Block the calling _host_ thread until the value offered by the future is -- available. -- {-# INLINEABLE wait #-} wait :: Future a -> IO a -wait (Future ref) = do - ivar <- readIORef ref +wait fut@(Future ref) = do + ivar <- readMVar ref case ivar of - Full v -> return v - Pending event k v -> do - Event.block event - writeIORef ref (Full v) - case k of - Just f -> touchLifetime f - Nothing -> return () + Full v -> return v - Empty -> internalError "blocked on an IVar" - + Pending event _ _ _-> do + Event.block event + completePending fut + Empty _ -> + internalError "blocked on an IVar" + +signalCompletion :: [Future ()] -> IO () +signalCompletion = mapM_ $ \(Future ref) -> modifyMVar_ ref $ const $ pure $ Full () + +completePending :: Future a -> IO a +completePending (Future ref) = + modifyMVar ref $ \case + Pending _ k statusHandles v -> do + signalCompletion statusHandles + maybe (pure ()) touchLifetime k + pure (Full v, v) + _ -> + internalError "Expected (Pending ...)" \ No newline at end of file diff --git a/accelerate-llvm-ptx/src/Language/Haskell/TH/Extra.hs b/accelerate-llvm-ptx/src/Language/Haskell/TH/Extra.hs deleted file mode 120000 index 8554157f4..000000000 --- a/accelerate-llvm-ptx/src/Language/Haskell/TH/Extra.hs +++ /dev/null @@ -1 +0,0 @@ -../../../../../accelerate-llvm/src/Language/Haskell/TH/Extra.hs \ No newline at end of file diff --git a/accelerate-llvm/src/Data/Array/Accelerate/LLVM/Execute/Async.hs b/accelerate-llvm/src/Data/Array/Accelerate/LLVM/Execute/Async.hs index 1b56cfde7..4190ad1a1 100644 --- a/accelerate-llvm/src/Data/Array/Accelerate/LLVM/Execute/Async.hs +++ b/accelerate-llvm/src/Data/Array/Accelerate/LLVM/Execute/Async.hs @@ -62,6 +62,17 @@ class (Monad (Par arch), MonadIO (Par arch)) => Async arch where -- liftPar :: HasCallStack => LLVM arch a -> Par arch a + -- | Produce a future which is linked to another future where the completion + -- status of this original future is reflected in the completion status of + -- this status-only future. + -- + statusHandle :: HasCallStack => FutureR arch a -> Par arch (FutureR arch ()) + + -- | Check the completion of a Future without blocking and if it is Full + -- yield its contents, else return Nothing. + -- + poll :: HasCallStack => FutureR arch a -> Par arch (Maybe a) + -- | Read a value stored in a future, once it is available. This is blocking -- with respect to both the host and remote device. --