Skip to content

Commit fd8d3ce

Browse files
committed
simulation: fix filtering of expired votes
1 parent 145d0fa commit fd8d3ce

File tree

3 files changed

+103
-61
lines changed

3 files changed

+103
-61
lines changed

simulation/src/LeiosProtocol/Relay.hs

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
1111
{-# LANGUAGE LambdaCase #-}
1212
{-# LANGUAGE MultiParamTypeClasses #-}
13+
{-# LANGUAGE NondecreasingIndentation #-}
1314
{-# LANGUAGE OverloadedRecordDot #-}
1415
{-# LANGUAGE PolyKinds #-}
1516
{-# LANGUAGE RankNTypes #-}
@@ -495,6 +496,8 @@ data SubmitPolicy = SubmitInOrder | SubmitAll
495496

496497
data RelayConsumerConfig id header body m = RelayConsumerConfig
497498
{ relay :: !RelayConfig
499+
, shouldIgnore :: m (header -> Bool)
500+
-- ^ headers to ignore, e.g. already received or coming too late.
498501
, validateHeaders :: [header] -> m ()
499502
, headerId :: !(header -> id)
500503
, prioritize :: !(Map id header -> [header] -> [header])
@@ -661,9 +664,9 @@ relayConsumerPipelined config sst =
661664
-- \| Takes an STM action for the updated local state, so that
662665
-- requestBodies can update inFlightVar in the same STM tx.
663666
idleM ::
664-
STM m (RelayConsumerLocalState id header body n) ->
667+
m (RelayConsumerLocalState id header body n) ->
665668
m (RelayConsumer id header body n 'StIdle m ())
666-
idleM mlst = atomically $ do
669+
idleM mlst = do
667670
lst <- mlst
668671
let canRequestMoreBodies = not (Map.null lst.available)
669672
case lst.pendingRequests of
@@ -715,39 +718,41 @@ relayConsumerPipelined config sst =
715718
requestBodies ::
716719
forall (n :: N).
717720
RelayConsumerLocalState id header body n ->
718-
STM m (RelayConsumer id header body n 'StIdle m ())
721+
m (RelayConsumer id header body n 'StIdle m ())
719722
requestBodies lst = do
720-
-- New headers are filtered before becoming available, but we have
721-
-- to filter `lst.available` again in the same STM tx that sets them as
722-
-- `inFlight`.
723-
inFlight <- readTVar sst.inFlightVar
724-
relayBuffer <- readTVar sst.relayBufferVar
725-
let (ignored, available1) =
726-
Map.partitionWithKey
727-
( \k _ ->
728-
k `Set.member` inFlight
729-
|| k `RB.member` relayBuffer
730-
)
731-
lst.available
732-
-- Ignored headers are set to Nothing in the buffer so they can be acknowledged later.
733-
let buffer' = lst.buffer <> Map.map (const Nothing) ignored
734-
735-
let hdrsToRequest =
736-
take (fromIntegral config.maxBodiesToRequest) $
737-
config.prioritize available1 (mapMaybe (`Map.lookup` available1) $ Foldable.toList $ lst.window)
738-
let idsToRequest = map config.headerId hdrsToRequest
739-
let idsToRequestSet = Set.fromList idsToRequest
740-
modifyTVar' sst.inFlightVar $ Set.union idsToRequestSet
741-
let available2 = Map.withoutKeys available1 idsToRequestSet
742-
let !lst' = lst{pendingRequests = Succ lst.pendingRequests, available = available2, buffer = buffer'}
743-
return $
744-
TS.YieldPipelined
745-
(MsgRequestBodies idsToRequest)
746-
( TS.ReceiverAwait $ \case
747-
MsgRespondBodies bodies ->
748-
TS.ReceiverDone (CollectBodies hdrsToRequest bodies)
749-
)
750-
(requestHeadersNonBlocking lst')
723+
isIgnored <- config.shouldIgnore
724+
atomically $ do
725+
-- New headers are filtered before becoming available, but we have
726+
-- to filter `lst.available` again in the same STM tx that sets them as
727+
-- `inFlight`.
728+
inFlight <- readTVar sst.inFlightVar
729+
let (ignored, available1) =
730+
Map.partitionWithKey
731+
( \k hd ->
732+
k `Set.member` inFlight
733+
|| isIgnored hd
734+
)
735+
lst.available
736+
737+
-- Ignored headers are set to Nothing in the buffer so they can be acknowledged later.
738+
let buffer' = lst.buffer <> Map.map (const Nothing) ignored
739+
740+
let hdrsToRequest =
741+
take (fromIntegral config.maxBodiesToRequest) $
742+
config.prioritize available1 (mapMaybe (`Map.lookup` available1) $ Foldable.toList $ lst.window)
743+
let idsToRequest = map config.headerId hdrsToRequest
744+
let idsToRequestSet = Set.fromList idsToRequest
745+
modifyTVar' sst.inFlightVar $ Set.union idsToRequestSet
746+
let available2 = Map.withoutKeys available1 idsToRequestSet
747+
let !lst' = lst{pendingRequests = Succ lst.pendingRequests, available = available2, buffer = buffer'}
748+
return $
749+
TS.YieldPipelined
750+
(MsgRequestBodies idsToRequest)
751+
( TS.ReceiverAwait $ \case
752+
MsgRespondBodies bodies ->
753+
TS.ReceiverDone (CollectBodies hdrsToRequest bodies)
754+
)
755+
(requestHeadersNonBlocking lst')
751756

752757
windowAdjust ::
753758
forall (n :: N).
@@ -860,6 +865,7 @@ relayConsumerPipelined config sst =
860865
-- though not all have replies.
861866
buffer1 = lst.buffer <> idsRequestedWithBodiesReceived
862867

868+
!_ = assert (relayConsumerLocalStateInvariant $ lst{buffer = buffer1}) ()
863869
-- We have to update the window here eagerly and not
864870
-- delay it to requestBodies, otherwise we could end up blocking in
865871
-- idle on more pipelined results rather than being able to
@@ -888,7 +894,7 @@ relayConsumerPipelined config sst =
888894
extraToSubmit = case config.submitPolicy of
889895
SubmitInOrder -> []
890896
SubmitAll ->
891-
mapMaybe (\id' -> join (Map.lookup id' buffer1)) $
897+
mapMaybe (\id' -> join (Map.lookup id' buffer2)) $
892898
Foldable.toList window'
893899

894900
-- And set them to `Nothing` in the buffer so they can be
@@ -937,19 +943,19 @@ relayConsumerPipelined config sst =
937943
RelayConsumerLocalState id header body n ->
938944
StrictSeq id ->
939945
Map id header ->
940-
STM m (RelayConsumerLocalState id header body n)
946+
m (RelayConsumerLocalState id header body n)
941947
acknowledgeIds lst idsSeq _ | Seq.null idsSeq = pure lst
942948
acknowledgeIds lst idsSeq idsMap = do
943-
relayBuffer <- readTVar sst.relayBufferVar
944-
inFlight <- readTVar sst.inFlightVar
949+
isIgnored <- config.shouldIgnore
950+
inFlight <- readTVarIO sst.inFlightVar
945951

946952
let
947953
-- Divide the new ids in two: those that are already in the
948954
-- relay buffer or in flight (both locally and shared in-flight)
949955
-- and those that are not. We'll request some bodies from the latter.
950956
(ignoredIds, availableIdsMp) =
951957
Map.partitionWithKey
952-
(\id' _ -> id' `RB.member` relayBuffer || id' `Set.member` inFlight)
958+
(\id' hd -> isIgnored hd || id' `Set.member` inFlight)
953959
idsMap
954960

955961
availableIdsU =

simulation/src/LeiosProtocol/Short/Node.hs

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ data LeiosNodeState m = LeiosNodeState
9999
, relayIBState :: !(RelayIBState m)
100100
, relayEBState :: !(RelayEBState m)
101101
, relayVoteState :: !(RelayVoteState m)
102+
, prunedVoteStateToVar :: !(TVar m SlotNo)
103+
-- ^ TODO: refactor into RelayState.
102104
, ibDeliveryTimesVar :: !(TVar m (Map InputBlockId UTCTime))
103105
, taskQueue :: !(TaskMultiQueue LeiosNodeTask m)
104106
, waitingForRBVar :: !(TVar m (Map (HeaderHash RankingBlock) [STM m ()]))
@@ -107,6 +109,8 @@ data LeiosNodeState m = LeiosNodeState
107109
-- ^ waiting for ledger state of RB block to be validated.
108110
, ledgerStateVar :: !(TVar m (Map (HeaderHash RankingBlock) LedgerState))
109111
, ibsNeededForEBVar :: !(TVar m (Map EndorseBlockId (Set InputBlockId)))
112+
, votesForEBVar :: !(TVar m (Map EndorseBlockId (Map VoteId Word64)))
113+
-- ^ TODO: prune of EBs that won't make it into chain anymore.
110114
}
111115

112116
data LeiosNodeTask
@@ -220,8 +224,9 @@ relayIBConfig ::
220224
LeiosNodeConfig ->
221225
([InputBlockHeader] -> m ()) ->
222226
SubmitBlocks m InputBlockHeader InputBlockBody ->
227+
RelayIBState m ->
223228
RelayConsumerConfig InputBlockId InputBlockHeader InputBlockBody m
224-
relayIBConfig _tracer cfg validateHeaders submitBlocks =
229+
relayIBConfig _tracer cfg validateHeaders submitBlocks st =
225230
RelayConsumerConfig
226231
{ relay = RelayConfig{maxWindowSize = coerce cfg.leios.ibDiffusion.maxWindowSize}
227232
, headerId = (.id)
@@ -231,15 +236,19 @@ relayIBConfig _tracer cfg validateHeaders submitBlocks =
231236
, maxHeadersToRequest = cfg.leios.ibDiffusion.maxHeadersToRequest
232237
, maxBodiesToRequest = cfg.leios.ibDiffusion.maxBodiesToRequest
233238
, submitBlocks
239+
, shouldIgnore = do
240+
buff <- readTVarIO st.relayBufferVar
241+
return $ flip RB.member buff . (.id)
234242
}
235243

236244
relayEBConfig ::
237-
MonadDelay m =>
245+
(MonadDelay m, MonadSTM m) =>
238246
Tracer m LeiosNodeEvent ->
239247
LeiosNodeConfig ->
240248
SubmitBlocks m EndorseBlockId EndorseBlock ->
249+
RelayEBState m ->
241250
RelayConsumerConfig EndorseBlockId (RelayHeader EndorseBlockId) EndorseBlock m
242-
relayEBConfig _tracer cfg submitBlocks =
251+
relayEBConfig _tracer cfg submitBlocks st =
243252
RelayConsumerConfig
244253
{ relay = RelayConfig{maxWindowSize = coerce cfg.leios.ebDiffusion.maxWindowSize}
245254
, headerId = (.id)
@@ -250,15 +259,20 @@ relayEBConfig _tracer cfg submitBlocks =
250259
, maxBodiesToRequest = cfg.leios.ebDiffusion.maxBodiesToRequest
251260
, submitBlocks = \hbs t k ->
252261
submitBlocks (map (first (.id)) hbs) t (k . map (\(i, b) -> (RelayHeader i b.slot, b)))
262+
, shouldIgnore = do
263+
buff <- readTVarIO st.relayBufferVar
264+
return $ flip RB.member buff . (.id)
253265
}
254266

255267
relayVoteConfig ::
256-
MonadDelay m =>
268+
(MonadDelay m, Monad (STM m), MonadSTM m, MonadTime m) =>
257269
Tracer m LeiosNodeEvent ->
258270
LeiosNodeConfig ->
259271
SubmitBlocks m VoteId VoteMsg ->
272+
RelayVoteState m ->
273+
LeiosNodeState m ->
260274
RelayConsumerConfig VoteId (RelayHeader VoteId) VoteMsg m
261-
relayVoteConfig _tracer cfg submitBlocks =
275+
relayVoteConfig _tracer cfg submitBlocks _ leiosState =
262276
RelayConsumerConfig
263277
{ relay = RelayConfig{maxWindowSize = coerce cfg.leios.voteDiffusion.maxWindowSize}
264278
, headerId = (.id)
@@ -269,6 +283,12 @@ relayVoteConfig _tracer cfg submitBlocks =
269283
, maxBodiesToRequest = cfg.leios.voteDiffusion.maxBodiesToRequest
270284
, submitBlocks = \hbs t k ->
271285
submitBlocks (map (first (.id)) hbs) t (k . map (\(i, b) -> (RelayHeader i b.slot, b)))
286+
, shouldIgnore = atomically $ do
287+
buffer <- readTVar leiosState.relayVoteState.relayBufferVar
288+
prunedTo <- readTVar leiosState.prunedVoteStateToVar
289+
return $ \hd ->
290+
hd.slot < prunedTo
291+
|| hd.id `RB.member` buffer
272292
}
273293

274294
queueAndWait :: (MonadSTM m, MonadDelay m) => LeiosNodeState m -> LeiosNodeTask -> [CPUTask] -> m ()
@@ -298,6 +318,8 @@ newLeiosNodeState cfg = do
298318
waitingForRBVar <- newTVarIO Map.empty
299319
waitingForLedgerStateVar <- newTVarIO Map.empty
300320
taskQueue <- atomically $ newTaskMultiQueue cfg.processingQueueBound
321+
prunedVoteStateToVar <- newTVarIO (toEnum 0)
322+
votesForEBVar <- newTVarIO Map.empty
301323
return $ LeiosNodeState{..}
302324

303325
leiosNode ::
@@ -352,21 +374,21 @@ leiosNode tracer cfg followers peers = do
352374

353375
ibThreads <-
354376
setupRelay
355-
(relayIBConfig tracer cfg valHeaderIB submitIB)
377+
(relayIBConfig tracer cfg valHeaderIB submitIB relayIBState)
356378
relayIBState
357379
(map (.protocolIB) followers)
358380
(map (.protocolIB) peers)
359381

360382
ebThreads <-
361383
setupRelay
362-
(relayEBConfig tracer cfg submitEB)
384+
(relayEBConfig tracer cfg submitEB relayEBState)
363385
relayEBState
364386
(map (.protocolEB) followers)
365387
(map (.protocolEB) peers)
366388

367389
voteThreads <-
368390
setupRelay
369-
(relayVoteConfig tracer cfg submitVote)
391+
(relayVoteConfig tracer cfg submitVote relayVoteState leiosState)
370392
relayVoteState
371393
(map (.protocolVote) followers)
372394
(map (.protocolVote) peers)
@@ -419,13 +441,15 @@ pruneVoteBuffer ::
419441
pruneVoteBuffer _tracer cfg st = go (toEnum 0)
420442
where
421443
go p = do
422-
let last_vote_recv = snd $ stageRangeOf cfg.leios p VoteRecv
423444
let last_vote_send = snd $ stageRangeOf cfg.leios p VoteSend
445+
let last_vote_recv = snd $ stageRangeOf cfg.leios p VoteRecv
446+
let pruneTo = succ last_vote_send
424447
_ <- waitNextSlot cfg.slotConfig (succ last_vote_recv)
425448
atomically $ do
426449
modifyTVar' st.relayVoteState.relayBufferVar $
427450
RB.filter $
428-
\RB.EntryWithTicket{value} -> (snd value).slot <= last_vote_send
451+
\RB.EntryWithTicket{value} -> (snd value).slot >= pruneTo
452+
writeTVar st.prunedVoteStateToVar $! pruneTo
429453
go (succ p)
430454

431455
computeLedgerStateThread ::
@@ -479,6 +503,18 @@ adoptEB leiosState eb = do
479503
let ibsNeeded = Map.fromList [(eb.id, Set.fromList eb.inputBlocks Set.\\ ibs)]
480504
modifyTVar' leiosState.ibsNeededForEBVar (`Map.union` ibsNeeded)
481505

506+
adoptVote :: MonadSTM m => LeiosNodeState m -> VoteMsg -> STM m ()
507+
adoptVote leiosState v = do
508+
-- We keep tally for each EB as votes arrive, so the relayVoteBuffer
509+
-- can be pruned without effects on EB certification.
510+
modifyTVar' leiosState.votesForEBVar $
511+
Map.unionWith Map.union $
512+
Map.fromListWith
513+
Map.union
514+
[ (eb, Map.singleton v.id v.votes)
515+
| eb <- v.endorseBlocks
516+
]
517+
482518
dispatchValidation ::
483519
forall m.
484520
(MonadMVar m, MonadFork m, MonadAsync m, MonadSTM m, MonadTime m, MonadDelay m) =>
@@ -509,7 +545,9 @@ dispatchValidation tracer cfg leiosState req =
509545
adoptEB leiosState eb
510546
traceEnterState [eb] EventEB
511547
valVote v completion = labelTask . (ValVote,) . (\p -> cpuTask p cfg.leios.delays.voteMsgValidation v,) $ do
512-
atomically $ completion [v]
548+
atomically $ do
549+
completion [v]
550+
adoptVote leiosState v
513551
traceEnterState [v] EventVote
514552

515553
go :: ValidationRequest m -> STM m [(LeiosNodeTask, (CPUTask, m ()))]
@@ -589,7 +627,11 @@ generator tracer cfg st = do
589627
adoptEB st eb
590628
traceWith tracer (LeiosNodeEvent Generate (EventEB eb))
591629
SomeAction Generate.Vote v -> (GenVote,) $ do
592-
atomically $ modifyTVar' st.relayVoteState.relayBufferVar (RB.snoc v.id (RelayHeader v.id v.slot, v))
630+
atomically $ do
631+
modifyTVar'
632+
st.relayVoteState.relayBufferVar
633+
(RB.snoc v.id (RelayHeader v.id v.slot, v))
634+
adoptVote st v
593635
traceWith tracer (LeiosNodeEvent Generate (EventVote v))
594636
let LeiosNodeConfig{..} = cfg
595637
leiosBlockGenerator $ LeiosGeneratorConfig{submit = mapM_ submitOne, ..}
@@ -602,8 +644,7 @@ mkBuffersView cfg st = BuffersView{..}
602644
-- though it's getting more expensive as we go.
603645
chain <- PraosNode.preferredChain st.praosState
604646
bufferEB <- readTVar st.relayEBState.relayBufferVar
605-
bufferVotes <- map snd . RB.values <$> readTVar st.relayVoteState.relayBufferVar
606-
647+
votesForEB <- readTVar st.votesForEBVar
607648
-- RBs in the same chain should not contain certificates for the same pipeline.
608649
let pipelinesInChain =
609650
Set.fromList $
@@ -612,14 +653,6 @@ mkBuffersView cfg st = BuffersView{..}
612653
, (ebId, _) <- rb.blockBody.endorseBlocks
613654
, Just (_, eb) <- [RB.lookup bufferEB ebId]
614655
]
615-
-- TODO: cache?
616-
let votesForEB =
617-
Map.fromListWith
618-
Map.union
619-
[ (eb, Map.singleton v.id v.votes)
620-
| v <- bufferVotes
621-
, eb <- v.endorseBlocks
622-
]
623656
let totalVotes = fromIntegral . sum . Map.elems
624657
let tryCertify eb = do
625658
votes <- Map.lookup eb.id votesForEB

simulation/src/LeiosProtocol/SimTestRelay.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ relayNode
137137
let relayConsumerConfig =
138138
RelayConsumerConfig
139139
{ relay = relayConfig
140+
, shouldIgnore = atomically $ do
141+
rb <- readTVar relayBufferVar
142+
return $ \hd -> RB.member (testHeaderId hd) rb
140143
, -- sequential validation of headers
141144
validateHeaders = map (const 0.1) >>> sum >>> \d -> when (d >= 0) $ threadDelay d
142145
, headerId = testHeaderId

0 commit comments

Comments
 (0)