Skip to content

Commit dace000

Browse files
committed
Finishing touches on ObjectDiffusion protocol internals
1 parent f6c9525 commit dace000

File tree

8 files changed

+259
-217
lines changed

8 files changed

+259
-217
lines changed

cabal.project

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ allow-newer:
5959
source-repository-package
6060
type: git
6161
location: https://github.com/IntersectMBO/ouroboros-network
62-
tag: 9f94642533c2fbfae0852741f5c9ecd7f3776505
63-
--sha256: 1pb36hlazh0b94x0kp8s2gk2sbknqk2q86yz28d4jsqvg36bfhk9
62+
tag: c2e936f454a0026b9a854e5f230714de81b9965c
63+
--sha256: sha256-139VtT1VJkBqIcqf+vak7h4Fh+Z748dHoHwaCCpKOy4=
6464
subdir:
6565
ouroboros-network
6666
ouroboros-network-protocols

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ mkHandlers
309309
, hPerasCertDiffusionInbound = \version peer ->
310310
objectDiffusionInbound
311311
(contramap (TraceLabelPeer peer) (Node.certDiffusionInboundTracer tracers))
312-
( perasCertDiffusionMaxFifoLength miniProtocolParameters
312+
( perasCertDiffusionMaxFifoLength miniProtocolParameters
313313
, 10 -- TODO: change this to a sensible value
314314
, 10 -- TODO: change this to a sensible value
315315
)

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs

Lines changed: 130 additions & 128 deletions
Large diffs are not rendered by default.

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/API.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
33
, ObjectPoolWriter (..)
44
) where
55

6-
import Data.Word (Word64)
76
import Control.Concurrent.Class.MonadSTM.Strict (STM)
7+
import Data.Word (Word64)
88

99
data ObjectPoolReader objectId object ticketNo m
1010
= ObjectPoolReader

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ makePerasCertPoolReader getCertSnapshot =
2727
, oprZeroTicketNo = PerasCertDB.zeroPerasCertTicketNo
2828
, oprObjectsAfter = \lastKnown limit -> do
2929
certSnapshot <- getCertSnapshot
30-
pure $ take (fromIntegral limit) $
31-
[ (ticketNo, perasCertRound cert, pure cert)
32-
| (cert, ticketNo) <- PerasCertDB.getCertsAfter certSnapshot lastKnown
33-
]
30+
pure $
31+
take (fromIntegral limit) $
32+
[ (ticketNo, perasCertRound cert, pure cert)
33+
| (cert, ticketNo) <- PerasCertDB.getCertsAfter certSnapshot lastKnown
34+
]
3435
}
3536

3637
makePerasCertPoolReaderFromCertDB ::

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Outbound.hs

Lines changed: 74 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound
1212
) where
1313

1414
import Control.Exception (assert)
15-
import Control.Monad (unless, when, forM)
15+
import Control.Monad (forM, unless, when)
1616
import Control.Monad.Class.MonadSTM
1717
import Control.Monad.Class.MonadThrow
1818
import Control.Tracer (Tracer, traceWith)
1919
import Data.List.NonEmpty qualified as NonEmpty
2020
import Data.Sequence.Strict (StrictSeq)
2121
import Data.Sequence.Strict qualified as Seq
22+
import Data.Set qualified as Set
2223
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
2324
import Ouroboros.Network.ControlMessage
2425
( ControlMessage
@@ -28,16 +29,14 @@ import Ouroboros.Network.ControlMessage
2829
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
2930
import Ouroboros.Network.Protocol.ObjectDiffusion.Outbound
3031
import Ouroboros.Network.Protocol.ObjectDiffusion.Type
31-
import qualified Data.Set as Set
32+
33+
-- Note: This module is inspired from TxSubmission outbound side.
3234

3335
data TraceObjectDiffusionOutbound objectId object
34-
=
35-
TraceObjectDiffusionOutboundRecvMsgRequestObjectIds NumObjectIdsReq
36-
|
37-
-- | The IDs to be sent in the response
36+
= TraceObjectDiffusionOutboundRecvMsgRequestObjectIds NumObjectIdsReq
37+
| -- | The IDs to be sent in the response
3838
TraceObjectDiffusionOutboundSendMsgReplyObjectIds [objectId]
39-
|
40-
-- | The IDs of the objects requested.
39+
| -- | The IDs of the objects requested.
4140
TraceObjectDiffusionOutboundRecvMsgRequestObjects
4241
[objectId]
4342
| -- | The objects to be sent in the response.
@@ -100,20 +99,27 @@ objectDiffusionOutbound tracer maxFifoLength ObjectPoolReader{..} _version contr
10099
makeBundle :: OutboundSt objectId object ticketNo -> OutboundStIdle objectId object m ()
101100
makeBundle !st =
102101
OutboundStIdle
103-
{ recvMsgRequestObjectIds = recvMsgRequestObjectIds st
104-
, recvMsgRequestObjects = recvMsgRequestObjects st}
102+
{ recvMsgRequestObjectIds = recvMsgRequestObjectIds st
103+
, recvMsgRequestObjects = recvMsgRequestObjects st
104+
}
105105

106-
updateStNewObjects :: OutboundSt objectId object ticketNo -> [(object, ticketNo)] -> OutboundSt objectId object ticketNo
106+
updateStNewObjects ::
107+
OutboundSt objectId object ticketNo ->
108+
[(object, ticketNo)] ->
109+
OutboundSt objectId object ticketNo
107110
updateStNewObjects !OutboundSt{..} newObjectsWithTicketNos =
108111
-- These objects should all be fresh
109-
assert (all (\(_, ticketNo) -> ticketNo > lastTicketNo) newObjectsWithTicketNos) $
110-
let !outstandingFifo' =
111-
outstandingFifo
112-
<> (Seq.fromList $ fst <$> newObjectsWithTicketNos)
113-
!lastTicketNo'
114-
| null newObjectsWithTicketNos = lastTicketNo
115-
| otherwise = snd $ last newObjectsWithTicketNos
116-
in OutboundSt{ outstandingFifo = outstandingFifo', lastTicketNo = lastTicketNo' }
112+
assert (all (\(_, ticketNo) -> ticketNo > lastTicketNo) newObjectsWithTicketNos) $
113+
let !outstandingFifo' =
114+
outstandingFifo
115+
<> (Seq.fromList $ fst <$> newObjectsWithTicketNos)
116+
!lastTicketNo'
117+
| null newObjectsWithTicketNos = lastTicketNo
118+
| otherwise = snd $ last newObjectsWithTicketNos
119+
in OutboundSt
120+
{ outstandingFifo = outstandingFifo'
121+
, lastTicketNo = lastTicketNo'
122+
}
117123

118124
recvMsgRequestObjectIds ::
119125
forall blocking.
@@ -136,14 +142,15 @@ objectDiffusionOutbound tracer maxFifoLength ObjectPoolReader{..} _version contr
136142
)
137143
$ throwIO (ProtocolErrorRequestedTooManyObjectIds numIdsToReq maxFifoLength)
138144

139-
-- First we update our FIFO to remove the number of objectIds
140-
-- that the inbound peer has acknowledged.
145+
-- First we update our FIFO to remove the number of objectIds that the
146+
-- inbound peer has acknowledged.
141147
let !outstandingFifo' = Seq.drop (fromIntegral numIdsToAck) outstandingFifo
142-
st' :: OutboundSt objectId object ticketNo -- must specify the type here otherwise GHC complains about mismatch objectId types
143-
!st' = st{ outstandingFifo = outstandingFifo' }
148+
-- must specify the type here otherwise GHC complains about mismatch objectId types
149+
st' :: OutboundSt objectId object ticketNo
150+
!st' = st{outstandingFifo = outstandingFifo'}
144151

145-
-- Grab info about any new objects after the last object ticketNo we've seen,
146-
-- up to the number that the peer has requested.
152+
-- Grab info about any new objects after the last object ticketNo we've
153+
-- seen, up to the number that the peer has requested.
147154
case blocking of
148155
-----------------------------------------------------------------------
149156
SingBlocking -> do
@@ -154,28 +161,34 @@ objectDiffusionOutbound tracer maxFifoLength ObjectPoolReader{..} _version contr
154161

155162
mbNewContent <- timeoutWithControlMessage controlMessageSTM $
156163
do
157-
newObjectsWithTicketNos <- oprObjectsAfter lastTicketNo (fromIntegral numIdsToReq)
164+
newObjectsWithTicketNos <-
165+
oprObjectsAfter
166+
lastTicketNo
167+
(fromIntegral numIdsToReq)
158168
check (not $ null newObjectsWithTicketNos)
159169
pure newObjectsWithTicketNos
160170

161171
case mbNewContent of
162172
Nothing -> pure (SendMsgDone ())
163173
Just newContent -> do
164-
newObjectsWithTicketNos <- forM newContent $ \(ticketNo, _, getObject) -> do
165-
object <- getObject
166-
pure (object, ticketNo)
167-
168-
174+
newObjectsWithTicketNos <- forM newContent $
175+
\(ticketNo, _, getObject) -> do
176+
object <- getObject
177+
pure (object, ticketNo)
178+
169179
let !newIds = oprObjectId . fst <$> newObjectsWithTicketNos
170180
st'' = updateStNewObjects st' newObjectsWithTicketNos
171-
181+
172182
traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds)
173183

174-
-- Assert objects is non-empty: we blocked until objects was non-null,
175-
-- and we know reqNo > 0, hence `take reqNo objects` is non-null.
184+
-- Assert objects is non-empty: we blocked until objects was
185+
-- non-null, and we know numIdsToReq > 0, hence
186+
-- `take numIdsToReq objects` is non-null.
176187
assert (not $ null newObjectsWithTicketNos) $
177-
pure (SendMsgReplyObjectIds (BlockingReply (NonEmpty.fromList $ newIds)) (makeBundle st''))
178-
188+
pure $
189+
SendMsgReplyObjectIds
190+
(BlockingReply (NonEmpty.fromList $ newIds))
191+
(makeBundle st'')
179192

180193
-----------------------------------------------------------------------
181194
SingNonBlocking -> do
@@ -184,14 +197,17 @@ objectDiffusionOutbound tracer maxFifoLength ObjectPoolReader{..} _version contr
184197
when (Seq.null outstandingFifo') $
185198
throwIO ProtocolErrorRequestNonBlocking
186199

187-
newContent <- atomically $ oprObjectsAfter lastTicketNo (fromIntegral numIdsToReq)
188-
newObjectsWithTicketNos <- forM newContent $ \(ticketNo, _, getObject) -> do
189-
object <- getObject
190-
pure (object, ticketNo)
200+
newContent <-
201+
atomically $
202+
oprObjectsAfter lastTicketNo (fromIntegral numIdsToReq)
203+
newObjectsWithTicketNos <- forM newContent $
204+
\(ticketNo, _, getObject) -> do
205+
object <- getObject
206+
pure (object, ticketNo)
191207

192208
let !newIds = oprObjectId . fst <$> newObjectsWithTicketNos
193209
st'' = updateStNewObjects st' newObjectsWithTicketNos
194-
210+
195211
traceWith tracer (TraceObjectDiffusionOutboundSendMsgReplyObjectIds newIds)
196212

197213
pure (SendMsgReplyObjectIds (NonBlockingReply newIds) (makeBundle st''))
@@ -203,22 +219,31 @@ objectDiffusionOutbound tracer maxFifoLength ObjectPoolReader{..} _version contr
203219
recvMsgRequestObjects !st@OutboundSt{..} requestedIds = do
204220
traceWith tracer (TraceObjectDiffusionOutboundRecvMsgRequestObjects requestedIds)
205221

206-
-- All the objects correspond to advertised objectIds are already in the outstandingFifo.
207-
-- So we don't need to read from the object pool here.
222+
-- All the objects correspond to advertised objectIds are already in the
223+
-- outstandingFifo. So we don't need to read from the object pool here.
208224

209-
-- TODO: I've improved the search to do only one traversal of 'outstandingFifo'.
225+
-- I've optimized the search to do only one traversal of 'outstandingFifo'.
210226
-- When the 'requestedIds' is exactly the whole 'outstandingFifo', then this
211227
-- should take O(n * log n) time.
212-
-- We will need to revisit the underlying outstandingFifo data structure and search if
213-
-- performance isn't sufficient when we'll use ObjectDiffusion for votes (and not just cert diffusion).
228+
--
229+
-- TODO: We might need to revisit the underlying 'outstandingFifo' data
230+
-- structure and the search if performance isn't sufficient when we'll use
231+
-- ObjectDiffusion for votes diffusion (and not just cert diffusion).
232+
214233
let requestedIdsSet = Set.fromList requestedIds
215234

216235
when (Set.size requestedIdsSet /= length requestedIds) $
217236
throwIO ProtocolErrorRequestedDuplicateObject
218237

219-
let requestedObjects = foldr (\obj acc -> if Set.member (oprObjectId obj) requestedIdsSet
220-
then obj : acc
221-
else acc) [] outstandingFifo
238+
let requestedObjects =
239+
foldr
240+
( \obj acc ->
241+
if Set.member (oprObjectId obj) requestedIdsSet
242+
then obj : acc
243+
else acc
244+
)
245+
[]
246+
outstandingFifo
222247

223248
when (Set.size requestedIdsSet /= length requestedObjects) $
224249
throwIO ProtocolErrorRequestedUnavailableObject

ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ObjectDiffusion/PerasCert/Smoke.hs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,18 @@ prop_smoke (ListWithUniqueIds certs) =
119119
IOSim
120120
s
121121
( ObjectPoolReader PerasRoundNo (PerasCert TestBlock) PerasCertTicketNo (IOSim s)
122-
, ObjectPoolReader PerasRoundNo (PerasCert TestBlock) PerasCertTicketNo (IOSim s)
123122
, ObjectPoolWriter PerasRoundNo (PerasCert TestBlock) (IOSim s)
123+
, (IOSim s) [PerasCert TestBlock]
124124
)
125125
mkPoolInterfaces = do
126126
outboundPool <- newCertDB certs
127127
inboundPool <- newCertDB []
128128

129129
let outboundPoolReader = makePerasCertPoolReaderFromCertDB outboundPool
130-
inboundPoolReader = makePerasCertPoolReaderFromCertDB inboundPool
131130
inboundPoolWriter = makePerasCertPoolWriterFromCertDB inboundPool
131+
getAllInboundPoolContent = do
132+
snap <- atomically $ PerasCertDB.getCertSnapshot inboundPool
133+
let rawContent = PerasCertDB.getCertsAfter snap (PerasCertDB.zeroPerasCertTicketNo)
134+
pure $ fst <$> rawContent
132135

133-
return (outboundPoolReader, inboundPoolReader, inboundPoolWriter)
136+
return (outboundPoolReader, inboundPoolWriter, getAllInboundPoolContent)

0 commit comments

Comments
 (0)