@@ -26,9 +26,8 @@ import Chan (Chan)
2626import ChanDriver (ProtocolMessage , chanDriver )
2727import Control.Exception (assert )
2828import Control.Monad (forM , forever , guard , join , unless , void , when , (<=<) )
29- import Control.Tracer (Contravariant ( contramap ), Tracer , traceWith )
29+ import Control.Tracer (Tracer , traceWith )
3030import Data.Bifunctor (second )
31- import Data.Foldable (forM_ )
3231import Data.Kind (Type )
3332import qualified Data.List as List
3433import Data.Map.Strict (Map )
@@ -47,7 +46,6 @@ import Network.TypedProtocol (
4746import Network.TypedProtocol.Driver (runPeerWithDriver )
4847import qualified Network.TypedProtocol.Peer.Client as TC
4948import qualified Network.TypedProtocol.Peer.Server as TS
50- import Numeric.Natural (Natural )
5149import PraosProtocol.Common
5250import qualified PraosProtocol.Common.AnchoredFragment as AnchoredFragment
5351import qualified PraosProtocol.Common.Chain as Chain
@@ -602,82 +600,30 @@ initBlockFetchConsumerStateForPeerId tracer peerId blockFetchControllerState sub
602600
603601setupValidatorThreads ::
604602 (MonadSTM m , MonadDelay m ) =>
605- Tracer m (PraosNodeEvent BlockBody ) ->
606603 PraosConfig BlockBody ->
607604 BlockFetchControllerState BlockBody m ->
608- -- | bound on queue length.
609- Natural ->
605+ ((CPUTask , m () ) -> STM m () ) ->
610606 m ([m () ], Block BlockBody -> m () -> m () )
611- setupValidatorThreads tracer cfg st n = do
612- queue <- newTBQueueIO n
613- (waitingVar, processWaitingThread) <- setupProcessWaitingThread (contramap PraosNodeEventCPU tracer) (Just 1 ) st. blocksVar
614- let doTask (cpuTask, m) = do
615- traceWith tracer . PraosNodeEventCPU $ cpuTask
616- threadDelay cpuTask. cpuTaskDuration
617- m
618-
619- -- if we have the previous block, we process the task sequentially to provide back pressure on the queue.
620- let waitForPrev block task = case blockPrevHash block of
621- GenesisHash -> doTask task
607+ setupValidatorThreads cfg st queue = do
608+ waitingVar <- newTVarIO Map. empty
609+ let processWaitingThread = processWaiting' st. blocksVar waitingVar
610+
611+ let waitForPrev block task = atomically $ case blockPrevHash block of
612+ GenesisHash -> queue task
622613 BlockHash prev -> do
623- havePrev <- Map. member prev <$> readTVarIO st. blocksVar
624- -- Note: for pure praos this also means we have the ledger state.
625- if havePrev
626- then doTask task
627- else atomically $ modifyTVar' waitingVar (Map. insertWith (++) prev [task])
628- fetch = forever $ do
629- (block, completion) <- atomically $ readTBQueue queue
614+ modifyTVar' waitingVar (Map. insertWith (++) prev [queue task])
615+ add block completion = do
630616 assert (blockInvariant block) $ do
631617 waitForPrev block $
632618 let ! cpuTask = CPUTask (cfg. blockValidationDelay block) (T. pack $ " Validate " ++ show (blockHash block))
633619 in (cpuTask, completion)
634- add block completion = atomically $ writeTBQueue queue (block, completion)
635- return ([fetch, processWaitingThread], add)
636-
637- setupProcessWaitingThread ::
638- forall m a b .
639- (MonadSTM m , MonadDelay m ) =>
640- Tracer m CPUTask ->
641- -- | how many waiting to process in parallel
642- Maybe Int ->
643- TVar m (Map ConcreteHeaderHash a ) ->
644- m (TVar m (Map ConcreteHeaderHash [(CPUTask , m b )]), m () )
645- setupProcessWaitingThread tracer npar blocksVar = do
646- waitingVar <- newTVarIO Map. empty
647- return (waitingVar, processWaiting tracer npar blocksVar waitingVar)
648-
649- processWaiting ::
650- forall m a b .
651- (MonadSTM m , MonadDelay m ) =>
652- Tracer m CPUTask ->
653- -- | how many waiting to process in parallel
654- Maybe Int ->
655- TVar m (Map ConcreteHeaderHash a ) ->
656- TVar m (Map ConcreteHeaderHash [(CPUTask , m b )]) ->
657- m ()
658- processWaiting tracer npar blocksVar waitingVar = go
659- where
660- parallelDelay xs = do
661- let ! d = maximum $ map (cpuTaskDuration . fst ) xs
662- forM_ xs $ traceWith tracer . fst
663- threadDelay d
664- mapM_ snd xs
665- go = forever $ join $ atomically $ do
666- waiting <- readTVar waitingVar
667- when (Map. null waiting) retry
668- blocks <- readTVar blocksVar
669- let toValidate = Map. intersection waiting blocks
670- when (Map. null toValidate) retry
671- writeTVar waitingVar $! waiting Map. \\ toValidate
672- let chunks Nothing xs = [xs]
673- chunks (Just m) xs = map (take m) . takeWhile (not . null ) . iterate (drop m) $ xs
674- return . mapM_ parallelDelay . chunks npar . concat . Map. elems $ toValidate
620+ return ([processWaitingThread], add)
675621
676622processWaiting' ::
677623 forall m a b .
678624 (MonadSTM m , MonadDelay m ) =>
679625 TVar m (Map ConcreteHeaderHash a ) ->
680- TVar m (Map ConcreteHeaderHash [m b ]) ->
626+ TVar m (Map ConcreteHeaderHash [STM m b ]) ->
681627 m ()
682628processWaiting' blocksVar waitingVar = go
683629 where
@@ -688,4 +634,4 @@ processWaiting' blocksVar waitingVar = go
688634 let toValidate = Map. intersection waiting blocks
689635 when (Map. null toValidate) retry
690636 writeTVar waitingVar $! waiting Map. \\ toValidate
691- return . sequence_ . concat . Map. elems $ toValidate
637+ return . mapM_ atomically . concat . Map. elems $ toValidate
0 commit comments