Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ allow-newer:
source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: peras-staging/pr-5202
--sha256: sha256-nTbjunQaqt6/syzSKw24Lne50083dI2SZFirG2/1T9U=
tag: peras-staging/pr-5202-v2
--sha256: sha256-vEO721Xab0RTVKFQFKal5VCV5y+OUzELo8+7Z8TETJQ=
subdir:
ouroboros-network
ouroboros-network-protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ objectDiffusionInbound
_version
controlMessageSTM
state =
ObjectDiffusionInboundPipelined $ do
ObjectDiffusionInboundPipelined $
continueWithStateM (go Zero) initialInboundSt
where
canRequestMoreObjects :: InboundSt k object -> Bool
Expand Down Expand Up @@ -320,9 +320,10 @@ objectDiffusionInbound
-- request.
let st' = st{numIdsInFlight = numIdsInFlight st - numIdsRequested}
poolHasObject <- atomically $ opwHasObject
continueWithStateM
(go n)
(preAcknowledge st' poolHasObject collectedIds)
pure $
continueWithStateM
(go n)
(preAcknowledge st' poolHasObject collectedIds)
CollectObjects requestedIds collectedObjects -> do
let requestedIdsSet = Set.fromList requestedIds
obtainedIdsSet = Set.fromList (opwObjectId <$> collectedObjects)
Expand Down Expand Up @@ -368,15 +369,16 @@ objectDiffusionInbound
traceWith tracer $
TraceObjectDiffusionProcessed
(NumObjectsProcessed (fromIntegral $ length objectsToAck))
continueWithStateM
(go n)
st
{ pendingObjects = pendingObjects''
, outstandingFifo = outstandingFifo'
, numToAckOnNextReq =
numToAckOnNextReq st
+ fromIntegral (Seq.length objectIdsToAck)
}
pure $
continueWithStateM
(go n)
st
{ pendingObjects = pendingObjects''
, outstandingFifo = outstandingFifo'
, numToAckOnNextReq =
numToAckOnNextReq st
+ fromIntegral (Seq.length objectIdsToAck)
}

goReqObjectIdsBlocking :: Stateful (InboundSt objectId object) 'Z objectId object m
goReqObjectIdsBlocking = Stateful $ \st -> do
Expand All @@ -392,20 +394,21 @@ objectDiffusionInbound
$ SendMsgRequestObjectIdsBlocking
(numToAckOnNextReq st)
numIdsToRequest
( \neCollectedIds -> do
( \neCollectedIds -> WithEffect $ do
-- We just got some new object id's, so we are no longer idling
--
-- NOTE this change of state should be made explicit:
-- https://github.com/tweag/cardano-peras/issues/144
Idling.idlingStop (odisvIdling state)
traceWith tracer TraceObjectInboundStoppedIdling
collectAndContinueWithState
(goCollect Zero)
st
{ numToAckOnNextReq = 0
, numIdsInFlight = numIdsToRequest
}
(CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
pure $
collectAndContinueWithState
(goCollect Zero)
st
{ numToAckOnNextReq = 0
, numIdsInFlight = numIdsToRequest
}
(CollectObjectIds numIdsToRequest (NonEmpty.toList neCollectedIds))
)

goReqObjectsAndObjectIdsPipelined ::
Expand Down Expand Up @@ -433,7 +436,7 @@ objectDiffusionInbound
let numIdsToRequest = numIdsToReq st

if numIdsToRequest <= 0
then continueWithStateM (go n) st
then pure $ continueWithStateM (go n) st
else
pure $
SendMsgRequestObjectIdsPipelined
Expand All @@ -454,8 +457,8 @@ objectDiffusionInbound
terminateAfterDrain ::
Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain = \case
Zero -> SendMsgDone (pure ())
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> pure $ terminateAfterDrain n
Zero -> SendMsgDone ()
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n

-------------------------------------------------------------------------------
-- Utilities to deal with stateful continuations (copied from TX-submission)
Expand Down Expand Up @@ -487,9 +490,9 @@ continueWithStateM ::
NoThunks s =>
StatefulM s n objectId object m ->
s ->
m (InboundStIdle n objectId object m ())
InboundStIdle n objectId object m ()
continueWithStateM (StatefulM f) !st =
checkInvariant (show <$> unsafeNoThunks st) (f st)
checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st)
{-# NOINLINE continueWithStateM #-}

-- | A variant of 'continueWithState' to be more easily utilized with
Expand All @@ -499,7 +502,7 @@ collectAndContinueWithState ::
StatefulCollect s n objectId object m ->
s ->
Collect objectId object ->
m (InboundStIdle n objectId object m ())
InboundStIdle n objectId object m ()
collectAndContinueWithState (StatefulCollect f) !st c =
checkInvariant (show <$> unsafeNoThunks st) (f st c)
checkInvariant (show <$> unsafeNoThunks st) (WithEffect $! f st c)
{-# NOINLINE collectAndContinueWithState #-}