diff --git a/simulation/docs/SimulatorModel.md b/simulation/docs/SimulatorModel.md index 95ff3cf07..95cb50222 100644 --- a/simulation/docs/SimulatorModel.md +++ b/simulation/docs/SimulatorModel.md @@ -182,30 +182,69 @@ IBs, VBs, and EBs are each diffused via a corresponding instance of the Relay mi This is a generalization of the TxSubmission mini protocol and the Mempool in `ouroboros-network` and `ouroboros-consensus`. Each Relay instance involves one thread per inbound connection (aka "peers") and one thread per outbound connection (aka "followers"). -For an inbound connection, the node is (aggressively/rapidly) pulling IB headers (ie merely IDs for VBs and EBs paired with a slot) and then selectively pulling the IB body (ie VBs and EBs) it wants in a configurable order/prioritization, which is usually FreshestFirst. -It is also configurable which of the peers offering the same body the node fetches it from, which is either just the first or all---all can sometimes reduce latency. -(TODO the real node will likely request from the second peer if the first hasn't yet replied but not the third.) -For an outbound connection, the roles are switched. +For an inbound connection, the node is (aggressively/rapidly) pulling IB headers (ie merely IDs paired with a slot for VBs and EBs) and then selectively pulling the IB body (ie VBs and EBs) it wants in a configurable order/prioritization, which is usually FreshestFirst. +For an outbound connection, the roles are simply switched. *Remark*. The reason RBs do not diffuse via Relay is because they form a chain, so one block can't be validated without its predecessors: an otherwise-valid block is invalid if it extends an invalid block. -TODO discuss the other Relay parameters, backpressure, pipelining, etc? - When an IB header arrives, its validation task is enqueued on the model CPU---for VBs and EBs it's just an ID, not a header, so there's no validation. -Once that finishes, the Relay logic will decide whether it needs to fetch the body. +Once that finishes, the Relay logic will decide whether it needs to request the body. -- An IB body is not fetched if it exists earlier than it should, it's being offered later than it should be, or if it's already in the buffer. -- An EB is not fetched if it's older than the slot to which the buffer has already been pruned, it's too old to be included by an RB (see `maxEndorseBlockAgeSlots`), or if it's already in the buffer. -- A VB is not fetched if it's older than the slot to which the buffer has already been pruned or if it's already in the buffer. +- An IB body is not requested if it exists earlier than it should, it's being offered later than it should be, or if it's already in the buffer. +- An EB is not requested if it's older than the slot to which the buffer has already been pruned, it's too old to be included by an RB (see `maxEndorseBlockAgeSlots`), or if it's already in the buffer. +- A VB is not requested if it's older than the slot to which the buffer has already been pruned or if it's already in the buffer. +- If `relayStrategy` is set to `maxPeersPerBody` for the IB/EB/VB relay, then such an object that has not yet arrived is not requested _yet_ if it's already been requested from another peer. -Different objects are handled differently when the arrived. +Different objects are handled differently when they arrive. - When an IB that extends the genesis block arrives, its validate-and-adopt task is enqueued on the model CPU. - When an IB that extends a non-genesis RB arrives, its validate-and-adopt task is added to `waitingForLedgerStateVar`. - When an EB arrives, its validate-and-adopt task is enqueued on the model CPU. - When a VB arrives, its validate-and-adopt task is enqueued on the model CPU. +Relay includes some-but-not-all of the parameters of the TxSubmission mini protocol in the `ouroboros-network` library. +TxSubmission has the following parameters (TODO well, these will be true when that repo's imminent PR merges). + +- `maxWindowSize`, a strict inclusive upper bound on how many more headers the node has requested from a peer than it has acknowledged to that peer; the node will not request the next body if doing so would exceed this bound. +- `maxHeadersPerMsg`, strict inclusive upper bound on how many headers the node will request with a single message. +- `maxBodiesPerMsg`, strict inclusive upper bound on how many bodies the node will request with a single message. +- `maxBytesInFlightPerPeer`, a lax inclusive upper bound on how many more bytes the node has requested from a peer than it has received corresponding replies for from that peer; the node will not send request while this bound has been exceeded---ie the node will exceed the bound by at most _one_ request. + - If the received body has a different number of bytes than requested, the node disconnects from the peer. + - This were instead a strict upper bound, peers sending many small bodies might starve a peer that needs to send a large body. + - The mux layer maintains a receive buffer for each peer, which resizes to accomodate the possibility that all of the expected responses will arrive before the consumer logic dequeues the first. + The buffer size will never exceed this bound by more than one body. +- `maxBytesInFlightAllPeers`, a lax inclusive upper bound on how many more bytes the node has requested from all peers than it has received correspond replies for from those peers. + The sum of the peers' buffers' sizes will never exceed this bound by more than one body. + - If the node disconnects from a peer, its requested bytes are removed from this limit. + - Since each peer's buffer resizes dynamically, this puts a bound on how much memory the node reserves for all such buffers. + - If this bound were `N*B` where `B` is the per-peer buffer limit and `N` is the total number of peers that could submit bodies, then this parameter would be redundant. + However, the node operator might want to allow any `M < N` peers to have maximally-sized buffers simultaneously but not `M + 1 ≤ N`. +- `maxBytesPerBody`, a strict inclusive upper bound on the size of a single body. +- `maxPeersPerBody`, a strict inclusive upper bound on the number of peers that will deliver the (correct) body for a specific header in the window; the node will not request that body from an additional peer if receiving the body from that peer could exceed this bound. + - It is crucial that the real node will not acknowledge the corresponding header to this peer until it acquires the body (even from a different peer), despite not (yet) requesting it from this peer. +- `timeouts`, timeouts for each response from the peer. + +The simulator's Relay mini protocol currently only includes the following subset of those parameters. + +- `maxWindowSize` is configurable. +- `maxHeadersPerMsg` (aka `maxHeadersToRequest`) and `maxBodiesPerMsg` (aka `maxBodiesToRequest`) are configurable. +- `maxPeersPerBody` is either `RequestFromFirst` or `RequestFromAll`. + Currently, the only way a peer in the simulator would omit a requested body is if it expired before the request arrived, which will be rare in most simulations. + So `RequestFromFirst` is almost always equivalent to "request from exactly one", which is too optimistic for a robust node in the real world. + TODO Other small values would be more robust, especially _two_ or maybe even _three_. + - Unlike the real node, the simulated node instead immediately acknowledges a header whose body has already been requested from another peer. + TODO that's a risk even without adversaries: no peer will intentionally omit a block, but it _is_ possible that the block will expire before the request arrives! + +All of the excluded parameters are effectively unbounded. + +- Unbounded `maxBytesInFlightPerPeer` and `maxBytesInFlightAllPeers` are unrealistic and might cause the simulator's throughput/latency to be overly-optimistic when a swell of data converges on a node. + TODO add these and also characterize their reasonable values +- An unbounded `maxBytesPerBody` is probably harmless, as long as the vast majority of the simulator's txs are reasonably sized. +- An unbounded `timeouts` is reasonable until the simulator involves adversarial peers that stall their replies. + Before then, realistic values would be great enough no simulated peer would timeout anyway. + This is especially true when `maxPeersPerBody` is `RequestFromAll`. + ## Praos diffusion threads TODO it's ChainSync and BlockFetch, but how much of `ouroboros-network` and `ouroboros-consensus` was left out? diff --git a/simulation/ouroboros-leios-sim.cabal b/simulation/ouroboros-leios-sim.cabal index a6ddfee44..fd6a2e7b3 100644 --- a/simulation/ouroboros-leios-sim.cabal +++ b/simulation/ouroboros-leios-sim.cabal @@ -142,6 +142,7 @@ library , leios-trace-hs , linear , mtl + , multiset , nothunks , ouroboros-network-api , ouroboros-network-mock diff --git a/simulation/src/LeiosProtocol/Common.hs b/simulation/src/LeiosProtocol/Common.hs index ebd598761..c8b02da5c 100644 --- a/simulation/src/LeiosProtocol/Common.hs +++ b/simulation/src/LeiosProtocol/Common.hs @@ -120,6 +120,7 @@ data InputBlockHeader = InputBlockHeader , rankingBlock :: !(ChainHash RankingBlock) -- ^ points to ledger state for validation. , size :: !Bytes + -- ^ size of this header, not the full block } deriving stock (Eq, Show) diff --git a/simulation/src/LeiosProtocol/Relay.hs b/simulation/src/LeiosProtocol/Relay.hs index 2fa3a0408..6305ad4c9 100644 --- a/simulation/src/LeiosProtocol/Relay.hs +++ b/simulation/src/LeiosProtocol/Relay.hs @@ -40,6 +40,8 @@ import Data.Map (Map) import qualified Data.Map.Strict as Map import Data.Maybe (isJust, isNothing, mapMaybe) import Data.Monoid (Sum (..)) +import Data.MultiSet (MultiSet) +import qualified Data.MultiSet as MultiSet import Data.Sequence.Strict (StrictSeq) import qualified Data.Sequence.Strict as Seq import Data.Set (Set) @@ -211,6 +213,7 @@ data RelayProtocolError | RequestedUnknownId | IdsNotRequested | BodiesNotRequested + | WrongSizeBody deriving (Show) instance Exception RelayProtocolError @@ -243,37 +246,37 @@ instance Protocol (RelayState id header body) where data Message (RelayState id header body) from to where MsgInit :: Message (RelayState id header body) StInit StIdle - -- \| Request a non-empty list of block identifiers from the client, - -- and confirm a number of outstanding block identifiers. + -- \| Request a block identifiers from the client, and confirm a number of + -- outstanding block identifiers. -- - -- With 'TokBlocking' this is a a blocking operation: the response will - -- always have at least one block identifier, and it does not expect - -- a prompt response: there is no timeout. This covers the case when there - -- is nothing else to do but wait. For example this covers leaf nodes that + -- With 'TokBlocking' this is a a blocking operation; the response will + -- always have at least one block identifier, and it does not expect a + -- prompt response: there is no timeout. This covers the case when there is + -- nothing else to do but wait. For example this covers leaf nodes that -- rarely, if ever, create and submit a block. -- - -- With 'TokNonBlocking' this is a non-blocking operation: the response - -- may be an empty list and this does expect a prompt response. This - -- covers high throughput use cases where we wish to pipeline, by - -- interleaving requests for additional block identifiers with - -- requests for blocks, which requires these requests not block. + -- With 'TokNonBlocking' this is a non-blocking operation: the response may + -- be an empty list and this does expect a prompt response. This covers + -- high throughput use cases where we wish to pipeline, by interleaving + -- requests for additional block identifiers with requests for blocks, + -- which requires these requests not block. -- - -- The request gives the maximum number of block identifiers that - -- can be accepted in the response. This must be greater than zero in the - -- 'TokBlocking' case. In the 'TokNonBlocking' case either the numbers + -- The 'WindowShrink' argument is the number of outstanding block + -- identifiers that are acknowledged by this message. Which specific + -- identifiers are being acknowledged is known to the peer based on the + -- FIFO order in which the peer provided them. + -- + -- The 'WindowExpand' argument is the maximum number of block identifiers + -- that can be accepted in the response. This must be greater than zero in + -- the 'TokBlocking' case. In the 'TokNonBlocking' case either the numbers -- acknowledged or the number requested must be non-zero. In either case, -- the number requested must not put the total outstanding over the fixed -- protocol limit. -- - -- The request also gives the number of outstanding block - -- identifiers that can now be acknowledged. The actual blocks - -- to acknowledge are known to the peer based on the FIFO order in which - -- they were provided. - -- -- There is no choice about when to use the blocking case versus the - -- non-blocking case, it depends on whether there are any remaining - -- unacknowledged blocks (after taking into account the ones - -- acknowledged in this message): + -- non-blocking case, it is determined by whether there are any remaining + -- unacknowledged blocks (after taking into account the ones acknowledged + -- in this message): -- -- \* The blocking case must be used when there are zero remaining -- unacknowledged blocks. @@ -288,7 +291,7 @@ instance Protocol (RelayState id header body) where -- \| Reply with a list of block identifiers for available -- blocks, along with metadata for each block. -- - -- The list must not be longer than the maximum number requested. + -- The list must not be longer than the request's 'WindowExpand' argument. -- -- In the 'StBlkIds' 'Blocking' state the list must be non-empty while -- in the 'StBlkIds' 'NonBlocking' state the list may be empty. @@ -319,14 +322,11 @@ instance Protocol (RelayState id header body) where Message (RelayState id header body) StIdle StBodies -- \| Reply with the requested blocks, or implicitly discard. -- - -- Blocks can become invalid between the time the block - -- identifier was sent and the block being requested. Invalid - -- (including committed) blocks do not need to be sent. - -- - -- Any block identifiers requested but not provided in this reply - -- should be considered as if this peer had never announced them. (Note - -- that this is no guarantee that the block is invalid, it may still - -- be valid and available from another peer). + -- Blocks can become invalid between the time the block identifier was sent + -- and the block was requested. Therefore, any block identifiers requested + -- but not provided in this reply should be considered as if this peer had + -- never announced them. Note that this is no guarantee that the block is + -- invalid, it may still be valid and available from another peer. MsgRespondBodies :: [(id, body)] -> Message (RelayState id header body) StBodies StIdle @@ -385,8 +385,16 @@ newtype WindowSize = WindowSize {value :: Word16} deriving (Monoid) via (Sum Word16) deriving (Show) via (Quiet WindowSize) -newtype RelayConfig = RelayConfig +-- | Configuration parameters for an instance of the Relay mini protocol +-- +-- Both of the in-flight byte limits ought to be significantly larger than the +-- greatest acceptable size of a body, as enforced by 'shouldNotRequest', +-- otherwise the consumer thread for a peer that's serving a maximum sized body +-- might get stuck via starvation. +data RelayConfig = RelayConfig { maxWindowSize :: WindowSize + , maxInFlightPerPeer :: Bytes + , maxInFlightAllPeers :: Bytes } -------------------------------- @@ -406,13 +414,15 @@ runRelayProducer :: runRelayProducer config sst chan = void $ runPeerWithDriver (chanDriver decideRelayState chan) (relayProducer config sst) +-- | The heap footprint is bounded by `maxWindowSize` data RelayProducerLocalState id = RelayProducerLocalState { window :: !(StrictSeq (id, RB.Ticket)) + , windowCounts :: !(MultiSet id) , lastTicket :: !RB.Ticket } initRelayProducerLocalState :: RelayProducerLocalState id -initRelayProducerLocalState = RelayProducerLocalState Seq.empty minBound +initRelayProducerLocalState = RelayProducerLocalState Seq.empty MultiSet.empty minBound type RelayProducer id header body st m a = TC.Client (RelayState id header body) 'NonPipelined st m a @@ -430,7 +440,7 @@ relayProducer config sst = TC.Yield MsgInit $ idle initRelayProducerLocalState -- Validate the request: -- 1. shrink <= windowSize let windowSize = fromIntegral (Seq.length lst.window) - when @m (shrink.value > windowSize.value) $ do + when (shrink.value > windowSize.value) $ do throw $ ShrankTooMuch windowSize shrink -- 2. windowSize - shrink + expand <= maxWindowSize let newWindowSize = WindowSize $ windowSize.value - shrink.value + expand.value @@ -445,22 +455,26 @@ relayProducer config sst = TC.Yield MsgInit $ idle initRelayProducerLocalState -- Find the new entries: newEntries <- readNewEntries sst blocking expand lst.lastTicket -- Expand the window: - let newValues = Seq.fromList [(key, ticket) | RB.EntryWithTicket{..} <- Foldable.toList newEntries] + let newEntriesList = [(key, ticket) | RB.EntryWithTicket{..} <- Foldable.toList newEntries] + let newValues = Seq.fromList newEntriesList let window' = keptValues <> newValues + let windowCounts' = lst.windowCounts MultiSet.\\ MultiSet.fromList (map fst newEntriesList) let lastTicket' = case newValues of Seq.Empty -> lst.lastTicket _ Seq.:|> (_, ticket) -> ticket - let !lst' = lst{window = window', lastTicket = lastTicket'} + let !lst' = RelayProducerLocalState { + window = window', + windowCounts = windowCounts', + lastTicket = lastTicket' + } let responseList = fmap (fst . (.value)) newEntries -- Yield the new entries: withSingIBlockingStyle blocking $ do return $ TC.Yield (MsgRespondHeaders responseList) (idle lst') MsgRequestBodies ids -> TC.Effect $ do -- Check that all ids are in the window: - -- NOTE: This is O(n^2) which is acceptable only if maxWindowSize is small. - -- TODO: Andrea: is a maxWindowSize of 10 large enough for freshest first? forM_ ids $ \id' -> do - when (isNothing (Seq.findIndexL ((== id') . fst) lst.window)) $ do + when (not $ MultiSet.member id' lst.windowCounts) $ do throw RequestedUnknownId -- Read the bodies from the RelayBuffer: relayBuffer <- atomically $ readReadOnlyTVar sst.relayBufferVar @@ -501,38 +515,72 @@ data RelayConsumerConfig id header body m = RelayConsumerConfig , shouldNotRequest :: m (header -> Bool) -- ^ headers to ignore, e.g. already received or coming too late. , validateHeaders :: [header] -> m () + , bodySizeHeader :: !(header -> Bytes) + -- ^ the size of the body according to the header + , bodySize :: !(body -> Bytes) , headerId :: !(header -> id) , prioritize :: !(Map id header -> [header] -> [header]) -- ^ returns a subset of headers, in order of what should be fetched first. -- Note: `prioritize` is given the map of ids in the `window` but -- not in-flight or fetched yet (the `available` field of the shared state). -- - -- TODO: For policies like `freshest first` we might need to - -- expand of the `window` more aggressively, to make sufficiently - -- fresh ids available. + -- TODO: For policies like `freshest first` we might need to expand the + -- window more aggressively, to make sufficiently many fresh ids available. , submitBlocks :: !([(header, body)] -> UTCTime -> ([(header, body)] -> STM m ()) -> m ()) - -- ^ sends blocks to be validated/added to the buffer. Allowed to be - -- blocking, but relayConsumer does not assume the blocks made it - -- into the relayBuffer. Also takes a delivery time (relevant for - -- e.g. IB endorsement) and a callback that expects a subset of - -- validated blocks. Callback might be called more than once, with - -- different subsets. + -- ^ relayConsumer applies this function to bodies when they arrive + -- + -- The relayConsumer will not be able to process additional replies from this + -- peer until this function returns. + -- + -- The actual arguments also include the bodies' delivery time (relevant for + -- e.g. IB endorsement) and a callback that should be called once the + -- processing of a body is complete, ie when the relayConsumer can forget + -- that it has already requested this body from this peer (eg once the body's + -- validation terminates, regardless of success). The "completion" callback + -- can be called more than once, with different subsets. , submitPolicy :: !SubmitPolicy , maxHeadersToRequest :: !Word16 + -- ^ limit for each 'MsgRequestHeaders' , maxBodiesToRequest :: !Word16 + -- ^ limit for each 'MsgRequestBodies' } data RelayConsumerSharedState id header body m = RelayConsumerSharedState { relayBufferVar :: TVar m (RelayBuffer id (header, body)) - , inFlightVar :: TVar m (Set id) - -- ^ Set of ids for which a consumer requested bodies, until they are validated and added to the buffer. - -- Ids are also removed if the bodies are not included in the reply or do not validate. + , inFlightVar :: TVar m Bytes + -- ^ how many bytes of body have been requested but not yet received from this peer + -- + -- The real implementation doesn't even use this value, though it should: + -- . TODO I'm + -- guessing it'll be easier to leverage in this simulator, since eg the + -- latency and bandwidth are static. + , sharedInFlightVar :: TVar m Bytes + -- ^ the sum of all peers' 'inFlightVar's + , doNotRequestVar :: TVar m (Set id) + -- ^ IDs whose bodies the consumer has requested but not yet finished + -- processing. The processing of a body finishes immediately if the peer's + -- reply skipped it or else when its validation finishes (whether + -- successful or not). It's consider in-flight while validating so that + -- it's not redundantly fetched again if another peer offers it while it's + -- being validated. If it's valid, the common outcome is that it'll be + -- added to the relayBuffer and so that'll continue preventing it from + -- being fetched again from any peer even after the node is done processing + -- it. TODO should also remember which bodies that were already fetched and + -- found to be invalid, but there's currently no need since every block in + -- the simulator is valid. -- - -- Current handling not fault tolerant: - -- * other consumers will ignore those ids and push them out of - -- their window, so they might not be asked for again. - -- * if a consumer exits with an exception between requesting bodies and the correponding - -- submitBlocks those ids will not be cleared from the in-flight set. + -- TODO The maintenacne of this variable is currently not fault tolerant. + -- + -- * Other consumers will ignore those ids and push them out of their + -- window, so they might not be asked for again. This can "lose" a body + -- if, eg, the eventual reply skips the body but other peers' + -- hypothetical replies wouldn't have. TODO: How does the real + -- TxSubmission handle this? + -- + -- * If a consumer exits with an exception between requesting bodies and + -- the correponding submitBlocks those ids will not be cleared from the + -- in-flight set. Crucially, this variable is sometimes shared by many + -- threads. } runRelayConsumer :: @@ -669,8 +717,8 @@ relayConsumerPipelined config sst = RelayConsumer id header body n 'StIdle m () idle = TS.Effect . idleM . return - -- \| Takes an STM action for the updated local state, so that - -- requestBodies can update inFlightVar in the same STM tx. + -- \| Takes an STM action for the updated local state, so that requestBodies + -- can update 'inFlightVar' and 'doNotRequestVar' the same STM tx. idleM :: m (RelayConsumerLocalState id header body n) -> m (RelayConsumer id header body n 'StIdle m ()) @@ -765,16 +813,20 @@ relayConsumerPipelined config sst = atomically $ do -- New headers are filtered before becoming available, but we have -- to filter `lst.available` again in the same STM tx that sets them as - -- `inFlight`. + -- `doNotRequest`. + doNotRequest <- readTVar sst.doNotRequestVar inFlight <- readTVar sst.inFlightVar + sharedInFlight <- readTVar sst.sharedInFlightVar let !lst = dropFromLST ( \k hd -> - k `Set.member` inFlight + k `Set.member` doNotRequest || isIgnored hd ) lst0 - let hdrsToRequest = + let (inFlightSummand, hdrsToRequest) = + takeSize + ((config.relay.maxInFlightPerPeer - inFlight) `min` (config.relay.maxInFlightAllPeers - sharedInFlight)) $ take (fromIntegral config.maxBodiesToRequest) $ config.prioritize lst.available (mapMaybe (`Map.lookup` lst.available) $ Foldable.toList $ lst.window) let idsToRequest = map config.headerId hdrsToRequest @@ -783,7 +835,9 @@ relayConsumerPipelined config sst = then return (idle lst) else do let available2 = Map.withoutKeys lst.available idsToRequestSet - modifyTVar' sst.inFlightVar $ Set.union idsToRequestSet + modifyTVar' sst.inFlightVar (+ inFlightSummand) + modifyTVar' sst.sharedInFlightVar (+ inFlightSummand) + modifyTVar' sst.doNotRequestVar $ Set.union idsToRequestSet let !lst2 = lst{pendingRequests = Succ $! lst.pendingRequests, available = available2} return $ TS.YieldPipelined @@ -794,6 +848,20 @@ relayConsumerPipelined config sst = ) (requestHeadersNonBlocking lst2) + -- INVARIANT: @fst@ will less than the given limit + -- + -- INVARIANT: @snd@ will be a prefix of the given headers + -- + -- INVARIANT: @fst@ will be the sum of @map 'bodySizeHeader' . snd@ + takeSize :: Bytes -> [header] -> (Bytes, [header]) + takeSize limit = go 0 id + where + go !acc1 acc2 = \case + h:hs | let sz = config.bodySizeHeader h + , sz <= limit - acc1 + -> go (acc1 + sz) (acc2 . (h:)) hs + _ -> (acc1, acc2 []) + windowAdjust :: forall (n :: N). RelayConsumerLocalState id header body n -> @@ -885,13 +953,20 @@ relayConsumerPipelined config sst = idsReceived = Map.keysSet bodiesMap idsRequested = Map.keysSet requestedMap + atomically $ do + let expectedBytes = sum $ fmap config.bodySizeHeader hdrs + modifyTVar' sst.inFlightVar (\x -> x - expectedBytes) + modifyTVar' sst.sharedInFlightVar (\x -> x - expectedBytes) + let notReceived = idsRequested `Set.difference` idsReceived + unless (null notReceived) $ do + modifyTVar' sst.doNotRequestVar (`Set.difference` notReceived) + + unless (and $ Map.intersectionWith (\h b -> config.bodySizeHeader h == config.bodySize b) requestedMap bodiesMap) $ + throw WrongSizeBody + unless (idsReceived `Set.isSubsetOf` idsRequested) $ throw BodiesNotRequested - let notReceived = idsRequested `Set.difference` idsReceived - unless (Set.null notReceived) $ do - atomically $ modifyTVar' sst.inFlightVar (`Set.difference` notReceived) - -- We can match up all the txids we requested, with those we -- received. let idsRequestedWithBodiesReceived :: Map id (Maybe (header, body)) @@ -960,14 +1035,12 @@ relayConsumerPipelined config sst = -- if lst.window has duplicated ids, we might submit duplicated blocks. unless (null bodiesToSubmit) $ do now <- getCurrentTime - config.submitBlocks bodiesToSubmit now $ \validated -> do + config.submitBlocks bodiesToSubmit now $ \completed -> do -- Note: here we could set a flag to drop this producer if not -- all blocks validated. - -- TODO: the validation logic should be the one inserting into relayBuffer. - modifyTVar' sst.relayBufferVar $ - flip (Foldable.foldl' (\buf blk@(h, _) -> RB.snocIfNew (config.headerId h) blk buf)) validated - -- TODO: won't remove from inFlight blocks not validated. - modifyTVar' sst.inFlightVar (`Set.difference` Set.fromList (map (config.headerId . fst) validated)) + modifyTVar' + sst.doNotRequestVar + (`Set.difference` Set.fromList (map (config.headerId . fst) completed)) return $ idle @@ -988,7 +1061,7 @@ relayConsumerPipelined config sst = acknowledgeIds lst idsSeq _ | Seq.null idsSeq = pure lst acknowledgeIds lst idsSeq idsMap = do isIgnored <- config.shouldNotRequest - inFlight <- readTVarIO sst.inFlightVar + doNotRequest <- readTVarIO sst.doNotRequestVar let lst1 = assertRelayConsumerLocalStateInvariant $ @@ -996,7 +1069,7 @@ relayConsumerPipelined config sst = { window = lst.window <> idsSeq , available = lst.available <> idsMap } - let lst' = dropFromLST (\id' hd -> isIgnored hd || id' `Set.member` inFlight) lst1 + let lst' = dropFromLST (\id' hd -> isIgnored hd || id' `Set.member` doNotRequest) lst1 -- Return the next local state return lst' diff --git a/simulation/src/LeiosProtocol/Short/Node.hs b/simulation/src/LeiosProtocol/Short/Node.hs index 9c40dd3e0..254e49ef4 100644 --- a/simulation/src/LeiosProtocol/Short/Node.hs +++ b/simulation/src/LeiosProtocol/Short/Node.hs @@ -3,6 +3,7 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NondecreasingIndentation #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} @@ -207,11 +208,16 @@ data ValidationRequest m --- Messages -------------------------------------------------------------- -data RelayHeader id = RelayHeader {id :: !id, slot :: !SlotNo} +data RelayHeader id = RelayHeader + {id :: !id, + slot :: !SlotNo, + size :: !Bytes + -- ^ size of the body, not the size of this header + } deriving (Show) instance MessageSize id => MessageSize (RelayHeader id) where - messageSizeBytes (RelayHeader x y) = messageSizeBytes x + messageSizeBytes y + messageSizeBytes (RelayHeader x y z) = messageSizeBytes x + messageSizeBytes y + const 4 z {- size -} type RelayIBMessage = RelayMessage InputBlockId InputBlockHeader InputBlockBody type RelayEBMessage = RelayMessage EndorseBlockId (RelayHeader EndorseBlockId) EndorseBlock @@ -274,13 +280,23 @@ setupRelay :: m [m ()] setupRelay leiosConfig cfg st followers peers = do let producerSST = RelayProducerSharedState{relayBufferVar = asReadOnly st.relayBufferVar} + let n = length peers + sharedInFlightVar <- newTVarIO 0 + inFlightVars <- replicateM n $ newTVarIO 0 ssts <- do case leiosConfig.relayStrategy of RequestFromFirst -> do - inFlightVar <- newTVarIO Set.empty - return $ repeat $ RelayConsumerSharedState{relayBufferVar = st.relayBufferVar, inFlightVar} + doNotRequestVar <- newTVarIO Set.empty -- shared by all the peers + return $ + (\inFlightVar -> RelayConsumerSharedState{relayBufferVar = st.relayBufferVar, sharedInFlightVar, inFlightVar, doNotRequestVar}) + `map` inFlightVars RequestFromAll -> do - (fmap . fmap) (RelayConsumerSharedState st.relayBufferVar) . replicateM (length peers) $ newTVarIO Set.empty + let zap = zipWith ($) + doNotRequestVars <- replicateM n $ newTVarIO Set.empty + return $ + (\inFlightVar doNotRequestVar -> RelayConsumerSharedState{relayBufferVar = st.relayBufferVar, sharedInFlightVar, inFlightVar, doNotRequestVar}) + `map` inFlightVars + `zap` doNotRequestVars let consumers = map (uncurry $ runRelayConsumer cfg) (zip ssts peers) let producers = map (runRelayProducer cfg.relay producerSST) followers return $ consumers ++ producers @@ -301,9 +317,15 @@ relayIBConfig :: RelayConsumerConfig InputBlockId InputBlockHeader InputBlockBody m relayIBConfig _tracer cfg validateHeaders submitBlocks st = RelayConsumerConfig - { relay = RelayConfig{maxWindowSize = coerce cfg.leios.ibDiffusion.maxWindowSize} + { relay = RelayConfig + {maxWindowSize = coerce cfg.leios.ibDiffusion.maxWindowSize, + maxInFlightPerPeer = 0, -- TODO + maxInFlightAllPeers = 0 -- TODO + } , headerId = (.id) , validateHeaders + , bodySizeHeader = const 0 -- TODO + , bodySize = const 0 -- TODO , prioritize = prioritize cfg.leios.ibDiffusion.strategy (.slot) , submitPolicy = SubmitAll , maxHeadersToRequest = cfg.leios.ibDiffusion.maxHeadersToRequest @@ -325,15 +347,21 @@ relayEBConfig :: RelayConsumerConfig EndorseBlockId (RelayHeader EndorseBlockId) EndorseBlock m relayEBConfig _tracer cfg@LeiosNodeConfig{leios = LeiosConfig{pipeline = (_ :: SingPipeline p)}} submitBlocks st leiosState = RelayConsumerConfig - { relay = RelayConfig{maxWindowSize = coerce cfg.leios.ebDiffusion.maxWindowSize} + { relay = RelayConfig + {maxWindowSize = coerce cfg.leios.ebDiffusion.maxWindowSize, + maxInFlightPerPeer = 1_000_000, -- TODO + maxInFlightAllPeers = 10_000_000 -- TODO + } , headerId = (.id) , validateHeaders = const $ return () + , bodySizeHeader = (.size) + , bodySize = (.size) , prioritize = prioritize cfg.leios.ebDiffusion.strategy (.slot) , submitPolicy = SubmitAll , maxHeadersToRequest = cfg.leios.ebDiffusion.maxHeadersToRequest , maxBodiesToRequest = cfg.leios.ebDiffusion.maxBodiesToRequest , submitBlocks = \hbs t k -> - submitBlocks (map (first (.id)) hbs) t (k . map (\(i, b) -> (RelayHeader i b.slot, b))) + submitBlocks (map (first (.id)) hbs) t (k . map (\(i, b) -> (RelayHeader i b.slot b.size, b))) , shouldNotRequest = do -- We possibly prune certified EBs (not referenced in the -- chain) after maxEndorseBlockAgeSlots, so we should not end @@ -366,15 +394,21 @@ relayVoteConfig :: RelayConsumerConfig VoteId (RelayHeader VoteId) VoteMsg m relayVoteConfig _tracer cfg submitBlocks _ leiosState = RelayConsumerConfig - { relay = RelayConfig{maxWindowSize = coerce cfg.leios.voteDiffusion.maxWindowSize} + { relay = RelayConfig + { maxWindowSize = coerce cfg.leios.voteDiffusion.maxWindowSize, + maxInFlightPerPeer = 1_000_000, -- TODO + maxInFlightAllPeers = 10_000_000 -- TODO + } , headerId = (.id) , validateHeaders = const $ return () + , bodySizeHeader = (.size) + , bodySize = (.size) , prioritize = prioritize cfg.leios.voteDiffusion.strategy (.slot) , submitPolicy = SubmitAll , maxHeadersToRequest = cfg.leios.voteDiffusion.maxHeadersToRequest , maxBodiesToRequest = cfg.leios.voteDiffusion.maxBodiesToRequest , submitBlocks = \hbs t k -> - submitBlocks (map (first (.id)) hbs) t (k . map (\(i, b) -> (RelayHeader i b.slot, b))) + submitBlocks (map (first (.id)) hbs) t (k . map (\(i, b) -> (RelayHeader i b.slot b.size, b))) , shouldNotRequest = atomically $ do buffer <- readTVar leiosState.relayVoteState.relayBufferVar prunedTo <- readTVar leiosState.prunedVoteStateToVar @@ -821,21 +855,29 @@ adoptIB cfg leiosState ib deliveryStage = do modifyTVar' leiosState.iBsForEBsAndVotesVar (Map.insertWith (Map.unionWith min) p $ Map.singleton ib.id deliveryStage) - -- TODO: likely needs optimization, although EBs also grow slowly. modifyTVar' leiosState.ibsNeededForEBVar (Map.map (Set.delete ib.id)) + modifyTVar' + leiosState.relayIBState.relayBufferVar + (RB.snocIfNew ib.header.id (ib.header, ib.body)) adoptEB :: MonadSTM m => LeiosNodeState m -> EndorseBlock -> STM m () adoptEB leiosState eb = do ibs <- Set.unions . Map.map Map.keysSet <$> readTVar leiosState.iBsForEBsAndVotesVar let ibsNeeded = Map.fromList [(eb.id, Set.fromList eb.inputBlocks Set.\\ ibs)] modifyTVar' leiosState.ibsNeededForEBVar (`Map.union` ibsNeeded) + modifyTVar' + leiosState.relayEBState.relayBufferVar + (RB.snocIfNew eb.id (RelayHeader eb.id eb.slot eb.size, eb)) adoptVote :: MonadSTM m => LeiosConfig -> LeiosNodeState m -> VoteMsg -> UTCTime -> STM m () adoptVote leios leiosState v deliveryTime = do -- We keep tally for each EB as votes arrive, so the relayVoteBuffer -- can be pruned without effects on EB certification. modifyTVar' leiosState.votesForEBVar $ addVote leios v deliveryTime + modifyTVar' + leiosState.relayVoteState.relayBufferVar + (RB.snocIfNew v.id (RelayHeader v.id v.slot v.size, v)) dispatchValidation :: forall m. @@ -954,23 +996,14 @@ generator tracer cfg st = do traceWith tracer (PraosNodeEvent (PraosNodeEventGenerate rb)) traceWith tracer (PraosNodeEvent (PraosNodeEventNewTip newChain)) SomeAction Generate.Propose{} ib -> (GenIB,) $ do - atomically $ do - -- TODO should not be added to 'relayIBState' before it's validated - modifyTVar' st.relayIBState.relayBufferVar (RB.snocIfNew ib.header.id (ib.header, ib.body)) - adoptIB cfg.leios st ib IbDuringProposeOrDeliver1 + atomically $ adoptIB cfg.leios st ib IbDuringProposeOrDeliver1 traceWith tracer (LeiosNodeEvent Generate (EventIB ib)) SomeAction Generate.Endorse eb -> (GenEB,) $ do - atomically $ do - modifyTVar' st.relayEBState.relayBufferVar (RB.snocIfNew eb.id (RelayHeader eb.id eb.slot, eb)) - adoptEB st eb + atomically $ adoptEB st eb traceWith tracer (LeiosNodeEvent Generate (EventEB eb)) SomeAction Generate.Vote v -> (GenVote,) $ do now <- getCurrentTime - atomically $ do - modifyTVar' - st.relayVoteState.relayBufferVar - (RB.snocIfNew v.id (RelayHeader v.id v.slot, v)) - adoptVote cfg.leios st v now + atomically $ adoptVote cfg.leios st v now traceWith tracer (LeiosNodeEvent Generate (EventVote v)) let LeiosNodeConfig{..} = cfg leiosBlockGenerator $ LeiosGeneratorConfig{submit = mapM_ submitOne, ..} diff --git a/simulation/src/LeiosProtocol/SimTestRelay.hs b/simulation/src/LeiosProtocol/SimTestRelay.hs index 86b7684a9..0ce3b1506 100644 --- a/simulation/src/LeiosProtocol/SimTestRelay.hs +++ b/simulation/src/LeiosProtocol/SimTestRelay.hs @@ -128,13 +128,15 @@ relayNode inchans outchans = do buffer <- newTVarIO RB.empty - inFlightVar <- newTVarIO Set.empty + sharedInFlightVar <- newTVarIO 0 + inFlightVar <- newTVarIO 0 + doNotRequestVar <- newTVarIO Set.empty submitq <- newTQueueIO :: m (TQueue m ([TestBlock], [TestBlock] -> STM m ())) let relayBufferVar = buffer - let consumerSST = RelayConsumerSharedState{relayBufferVar, inFlightVar} + let consumerSST = RelayConsumerSharedState{relayBufferVar, doNotRequestVar, sharedInFlightVar, inFlightVar} let producerSST = RelayProducerSharedState{relayBufferVar = asReadOnly relayBufferVar} - let relayConfig = RelayConfig{maxWindowSize = 100} + let relayConfig = RelayConfig{maxWindowSize = 100, maxInFlightPerPeer = 0, maxInFlightAllPeers = 0} let relayConsumerConfig = RelayConsumerConfig { relay = relayConfig @@ -143,6 +145,8 @@ relayNode return $ \hd -> RB.member (testHeaderId hd) rb , -- sequential validation of headers validateHeaders = map (const 0.1) >>> sum >>> \d -> when (d >= 0) $ threadDelay d + , bodySizeHeader = const 0 + , bodySize = const 0 , headerId = testHeaderId , prioritize = \m _ -> sortOn (Down . testHeaderExpiry) . Map.elems $ m , submitPolicy = SubmitAll