Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ data Future a = Future {-# UNPACK #-} !(IORef (IVar a))
data IVar a
= Full !a
| Blocked !(Seq (a -> IO ()))
| Empty

instance Async Native where
type FutureR Native = Future
Expand All @@ -82,7 +81,7 @@ instance Async Native where

{-# INLINE new #-}
{-# INLINE newFull #-}
new = Future <$> liftIO (newIORef Empty)
new = Future <$> liftIO (newIORef (Blocked Seq.empty))
newFull v = Future <$> liftIO (newIORef (Full v))

{-# INLINE fork #-}
Expand All @@ -95,7 +94,6 @@ instance Async Native where
callCC $ \k -> do
native <- gets llvmTarget
next <- liftIO . atomicModifyIORef' ref $ \case
Empty -> (Blocked (Seq.singleton (evalParIO native . k)), reschedule)
Blocked ks -> (Blocked (ks Seq.|> evalParIO native . k), reschedule)
Full a -> (Full a, return a)
next
Expand All @@ -108,6 +106,21 @@ instance Async Native where
{-# INLINE liftPar #-}
liftPar = Par . lift

{-# INLINE statusHandle #-}
statusHandle (Future ref) = do
emptyFut@(Future emptyIVar) <- new
fullFut <- newFull ()
liftIO $ atomicModifyIORef' ref $ \case
Blocked ks -> (Blocked (ks Seq.|> const (writeIORef emptyIVar (Full ()))), emptyFut)
Full v -> (Full v, fullFut)

{-# INLINE poll #-}
poll (Future ref) = do
ivar <- liftIO $ readIORef ref
case ivar of
Full v -> return (Just v)
_ -> return Nothing

-- | Evaluate a continuation
--
{-# INLINE evalParIO #-}
Expand All @@ -122,7 +135,6 @@ evalParIO native@Native{} work =
putIO :: HasCallStack => Workers -> Future a -> a -> IO ()
putIO workers (Future ref) v = do
ks <- atomicModifyIORef' ref $ \case
Empty -> (Full v, Seq.empty)
Blocked ks -> (Full v, ks)
_ -> internalError "multiple put"
--
Expand Down
1 change: 0 additions & 1 deletion accelerate-llvm-native/src/Language/Haskell/TH/Extra.hs

This file was deleted.

1 change: 1 addition & 0 deletions accelerate-llvm-ptx/accelerate-llvm-ptx.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Library
, deepseq >= 1.3
, directory >= 1.0
, dlist >= 0.6
, exceptions >= 0.10
, file-embed >= 0.0.8
, filepath >= 1.0
, formatting >= 7.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import qualified Data.Array.Accelerate.LLVM.PTX.Debug as Debug
import qualified Foreign.CUDA.Driver as CUDA
import qualified Foreign.CUDA.Driver.Stream as CUDA

import Control.Concurrent.MVar
import Control.Monad
import Control.Monad.Reader
import Data.IORef
Expand Down Expand Up @@ -346,11 +347,11 @@ nonblocking !stream !action = do
ready <- liftIO (query event)
if ready
then do
future <- Future <$> liftIO (newIORef (Full result))
future <- Future <$> liftIO (newMVar (Full result))
return (Nothing, future)

else do
future <- Future <$> liftIO (newIORef (Pending event Nothing result))
future <- Future <$> liftIO (newMVar (Pending event Nothing [] result))
return (Just event, future)

{-# INLINE withLifetime #-}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeSynonymInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# LANGUAGE TupleSections #-}
-- |
-- Module : Data.Array.Accelerate.LLVM.PTX.Execute.Async
-- Copyright : [2014..2020] The Accelerate Team
Expand Down Expand Up @@ -36,9 +37,10 @@ import Data.Array.Accelerate.LLVM.PTX.Link.Object ( FunctionTa
import qualified Data.Array.Accelerate.LLVM.PTX.Execute.Event as Event
import qualified Data.Array.Accelerate.LLVM.PTX.Execute.Stream as Stream

import Control.Concurrent.MVar
import Control.Monad.Catch
import Control.Monad.Reader
import Control.Monad.State
import Data.IORef


-- | Evaluate a parallel computation
Expand All @@ -63,24 +65,24 @@ ptxKernel = snd
-- Implementation
-- --------------

data Future a = Future {-# UNPACK #-} !(IORef (IVar a))
data Future a = Future {-# UNPACK #-} !(MVar (IVar a))

data IVar a
= Full !a
| Pending {-# UNPACK #-} !Event !(Maybe (Lifetime FunctionTable)) !a
| Empty
| Pending {-# UNPACK #-} !Event !(Maybe (Lifetime FunctionTable)) ![Future ()] !a
| Empty ![Future ()]


instance Async PTX where
type FutureR PTX = Future

newtype Par PTX a = Par { runPar :: ReaderT ParState (LLVM PTX) a }
deriving ( Functor, Applicative, Monad, MonadIO, MonadReader ParState, MonadState PTX )
deriving ( Functor, Applicative, Monad, MonadIO, MonadReader ParState, MonadState PTX, MonadThrow, MonadCatch, MonadMask )

{-# INLINEABLE new #-}
{-# INLINEABLE newFull #-}
new = Future <$> liftIO (newIORef Empty)
newFull v = Future <$> liftIO (newIORef (Full v))
new = Future <$> liftIO (newMVar (Empty []))
newFull v = Future <$> liftIO (newMVar (Full v))

{-# INLINEABLE spawn #-}
spawn m = do
Expand All @@ -91,7 +93,7 @@ instance Async PTX where

{-# INLINEABLE fork #-}
fork m = do
s' <- liftPar (Stream.create)
s' <- liftPar Stream.create
() <- local (const (s', Nothing)) m
liftIO (Stream.destroy s')

Expand All @@ -104,59 +106,98 @@ instance Async PTX where
stream <- asks ptxStream
kernel <- asks ptxKernel
event <- liftPar (Event.waypoint stream)
ready <- liftIO (Event.query event)
liftIO . modifyIORef' ref $ \case
Empty -> if ready then Full v
else Pending event kernel v
_ -> internalError "multiple put"
liftIO $ do
ready <- Event.query event
ivar <- readMVar ref
case ivar of
Empty statusHandles ->
if ready then do
modifyMVar_ ref $ const $ pure $ Full v
signalCompletion statusHandles
else
modifyMVar_ ref $ const $ pure $ Pending event kernel statusHandles v
_ -> internalError "multiple put"

-- Get the value of Future. Since the actual cross-stream synchronisation
-- happens on the device, we should never have to block/reschedule the main
-- thread waiting on a value; if we get an empty IVar at this point, something
-- has gone wrong.
--
{-# INLINEABLE get #-}
get (Future ref) = do
get fut@(Future ref) = do
stream <- asks ptxStream
liftIO $ do
ivar <- readIORef ref
ivar <- readMVar ref
case ivar of
Full v -> return v
Pending event k v -> do
Pending event _ _ v -> do
ready <- Event.query event
if ready
then do
writeIORef ref (Full v)
case k of
Just f -> touchLifetime f
Nothing -> return ()
else
Event.after event stream
return v
Empty -> internalError "blocked on an IVar"
if ready then
completePending fut
else do
Event.after event stream
return v
Empty _ -> internalError "blocked on an IVar"

{-# INLINEABLE block #-}
block = liftIO . wait

{-# INLINE liftPar #-}
liftPar = Par . lift

{-# INLINE statusHandle #-}

statusHandle (Future ref) =
liftIO $ modifyMVar ref $ \case
Full v -> (Full v,) . Future <$> newMVar (Full ())
Empty statusHandles -> do
emptyFut <- Future <$> newMVar (Empty [])
pure (Empty (emptyFut:statusHandles), emptyFut)
Pending e k statusHandles v -> do
pendingFut <- Future <$> newMVar (Pending e k [] ())
pure (Pending e k (pendingFut:statusHandles) v, pendingFut)

{-# INLINE poll #-}

poll fut@(Future ref) = liftIO $ do
ivar <- readMVar ref
case ivar of
Full v ->
return (Just v)
Pending event _ _ _ -> do
ready <- Event.query event
if ready then
Just <$> completePending fut
else
pure Nothing
_ ->
return Nothing

-- | Block the calling _host_ thread until the value offered by the future is
-- available.
--
{-# INLINEABLE wait #-}
wait :: Future a -> IO a
wait (Future ref) = do
ivar <- readIORef ref
wait fut@(Future ref) = do
ivar <- readMVar ref
case ivar of
Full v -> return v
Pending event k v -> do
Event.block event
writeIORef ref (Full v)
case k of
Just f -> touchLifetime f
Nothing -> return ()
Full v ->
return v
Empty -> internalError "blocked on an IVar"

Pending event _ _ _-> do
Event.block event
completePending fut
Empty _ ->
internalError "blocked on an IVar"

signalCompletion :: [Future ()] -> IO ()
signalCompletion = mapM_ $ \(Future ref) -> modifyMVar_ ref $ const $ pure $ Full ()

completePending :: Future a -> IO a
completePending (Future ref) =
modifyMVar ref $ \case
Pending _ k statusHandles v -> do
signalCompletion statusHandles
maybe (pure ()) touchLifetime k
pure (Full v, v)
_ ->
internalError "Expected (Pending ...)"
1 change: 0 additions & 1 deletion accelerate-llvm-ptx/src/Language/Haskell/TH/Extra.hs

This file was deleted.

11 changes: 11 additions & 0 deletions accelerate-llvm/src/Data/Array/Accelerate/LLVM/Execute/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ class (Monad (Par arch), MonadIO (Par arch)) => Async arch where
--
liftPar :: HasCallStack => LLVM arch a -> Par arch a

-- | Produce a future which is linked to another future where the completion
-- status of this original future is reflected in the completion status of
-- this status-only future.
--
statusHandle :: HasCallStack => FutureR arch a -> Par arch (FutureR arch ())

-- | Check the completion of a Future without blocking and if it is Full
-- yield its contents, else return Nothing.
--
poll :: HasCallStack => FutureR arch a -> Par arch (Maybe a)

-- | Read a value stored in a future, once it is available. This is blocking
-- with respect to both the host and remote device.
--
Expand Down