diff --git a/cabal.project b/cabal.project index 17c612c568..ca8cbfbbc4 100644 --- a/cabal.project +++ b/cabal.project @@ -59,8 +59,8 @@ allow-newer: source-repository-package type: git location: https://github.com/IntersectMBO/ouroboros-network - tag: peras-staging/pr-5202 - --sha256: sha256-nTbjunQaqt6/syzSKw24Lne50083dI2SZFirG2/1T9U= + tag: peras-staging/pr-5202-v2 + --sha256: sha256-vEO721Xab0RTVKFQFKal5VCV5y+OUzELo8+7Z8TETJQ= subdir: ouroboros-network ouroboros-network-protocols diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs index a368682c40..3649213d09 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs @@ -146,7 +146,7 @@ objectDiffusionInbound _version controlMessageSTM state = - ObjectDiffusionInboundPipelined $ do + ObjectDiffusionInboundPipelined $ continueWithStateM (go Zero) initialInboundSt where canRequestMoreObjects :: InboundSt k object -> Bool @@ -320,9 +320,10 @@ objectDiffusionInbound -- request. let st' = st{numIdsInFlight = numIdsInFlight st - numIdsRequested} poolHasObject <- atomically $ opwHasObject - continueWithStateM - (go n) - (preAcknowledge st' poolHasObject collectedIds) + pure $ + continueWithStateM + (go n) + (preAcknowledge st' poolHasObject collectedIds) CollectObjects requestedIds collectedObjects -> do let requestedIdsSet = Set.fromList requestedIds obtainedIdsSet = Set.fromList (opwObjectId <$> collectedObjects) @@ -368,15 +369,16 @@ objectDiffusionInbound traceWith tracer $ TraceObjectDiffusionProcessed (NumObjectsProcessed (fromIntegral $ length objectsToAck)) - continueWithStateM - (go n) - st - { pendingObjects = pendingObjects'' - , outstandingFifo = outstandingFifo' - , numToAckOnNextReq = - numToAckOnNextReq st - + fromIntegral (Seq.length objectIdsToAck) - } + pure $ + continueWithStateM + (go n) + st + { pendingObjects = pendingObjects'' + , outstandingFifo = outstandingFifo' + , numToAckOnNextReq = + numToAckOnNextReq st + + fromIntegral (Seq.length objectIdsToAck) + } goReqObjectIdsBlocking :: Stateful (InboundSt objectId object) 'Z objectId object m goReqObjectIdsBlocking = Stateful $ \st -> do @@ -392,20 +394,21 @@ objectDiffusionInbound $ SendMsgRequestObjectIdsBlocking (numToAckOnNextReq st) numIdsToRequest - ( \neCollectedIds -> do + ( \neCollectedIds -> WithEffect $ do -- We just got some new object id's, so we are no longer idling -- -- NOTE this change of state should be made explicit: -- https://github.com/tweag/cardano-peras/issues/144 Idling.idlingStop (odisvIdling state) traceWith tracer TraceObjectInboundStoppedIdling - collectAndContinueWithState - (goCollect Zero) - st - { numToAckOnNextReq = 0 - , numIdsInFlight = numIdsToRequest - } - (CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds)) + pure $ + collectAndContinueWithState + (goCollect Zero) + st + { numToAckOnNextReq = 0 + , numIdsInFlight = numIdsToRequest + } + (CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds)) ) goReqObjectsAndObjectIdsPipelined :: @@ -433,7 +436,7 @@ objectDiffusionInbound let numIdsToRequest = numIdsToReq st if numIdsToRequest <= 0 - then continueWithStateM (go n) st + then pure $ continueWithStateM (go n) st else pure $ SendMsgRequestObjectIdsPipelined @@ -454,8 +457,8 @@ objectDiffusionInbound terminateAfterDrain :: Nat n -> InboundStIdle n objectId object m () terminateAfterDrain = \case - Zero -> SendMsgDone (pure ()) - Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> pure $ terminateAfterDrain n + Zero -> SendMsgDone () + Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n ------------------------------------------------------------------------------- -- Utilities to deal with stateful continuations (copied from TX-submission) @@ -487,9 +490,9 @@ continueWithStateM :: NoThunks s => StatefulM s n objectId object m -> s -> - m (InboundStIdle n objectId object m ()) + InboundStIdle n objectId object m () continueWithStateM (StatefulM f) !st = - checkInvariant (show <$> unsafeNoThunks st) (f st) + checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st) {-# NOINLINE continueWithStateM #-} -- | A variant of 'continueWithState' to be more easily utilized with @@ -499,7 +502,7 @@ collectAndContinueWithState :: StatefulCollect s n objectId object m -> s -> Collect objectId object -> - m (InboundStIdle n objectId object m ()) + InboundStIdle n objectId object m () collectAndContinueWithState (StatefulCollect f) !st c = - checkInvariant (show <$> unsafeNoThunks st) (f st c) + checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st c) {-# NOINLINE collectAndContinueWithState #-}