Skip to content

Commit 0e77b7b

Browse files
committed
Refactor ObjectDiffusion protocol internals
1 parent d9ceb01 commit 0e77b7b

File tree

8 files changed

+531
-530
lines changed

8 files changed

+531
-530
lines changed

cabal.project

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ allow-newer:
5959
source-repository-package
6060
type: git
6161
location: https://github.com/IntersectMBO/ouroboros-network
62-
tag: bca44ca1314b5ac2e57f1843d5ab7b848c318c45
63-
--sha256: sha256-EAYEhii5KO8VnyNenef6rquHFJdubp0xckQpAQhtFA4=
62+
tag: c2e936f454a0026b9a854e5f230714de81b9965c
63+
--sha256: sha256-139VtT1VJkBqIcqf+vak7h4Fh+Z748dHoHwaCCpKOy4=
6464
subdir:
6565
ouroboros-network
6666
ouroboros-network-protocols

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,14 +309,16 @@ mkHandlers
309309
, hPerasCertDiffusionInbound = \version peer ->
310310
objectDiffusionInbound
311311
(contramap (TraceLabelPeer peer) (Node.certDiffusionInboundTracer tracers))
312-
(certDiffusionMaxUnacked miniProtocolParameters)
313-
(makePerasCertPoolReaderFromChainDB $ getChainDB)
312+
( perasCertDiffusionMaxFifoLength miniProtocolParameters
313+
, 10 -- TODO: change this to a sensible value
314+
, 10 -- TODO: change this to a sensible value
315+
)
314316
(makePerasCertPoolWriterFromChainDB $ getChainDB)
315317
version
316318
, hPerasCertDiffusionOutbound = \version controlMessageSTM peer ->
317319
objectDiffusionOutbound
318320
(contramap (TraceLabelPeer peer) (Node.certDiffusionOutboundTracer tracers))
319-
(certDiffusionMaxUnacked miniProtocolParameters)
321+
(perasCertDiffusionMaxFifoLength miniProtocolParameters)
320322
(makePerasCertPoolReaderFromChainDB $ getChainDB)
321323
version
322324
controlMessageSTM

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/Inbound.hs

Lines changed: 261 additions & 319 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,29 @@
11
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
22
( ObjectPoolReader (..)
3-
, ObjectPoolSnapshot (..)
43
, ObjectPoolWriter (..)
54
) where
65

7-
import Control.Monad.Class.MonadSTM (STM)
8-
import Ouroboros.Network.SizeInBytes (SizeInBytes)
6+
import Control.Concurrent.Class.MonadSTM.Strict (STM)
7+
import Data.Word (Word64)
98

109
data ObjectPoolReader objectId object ticketNo m
1110
= ObjectPoolReader
12-
{ rdrGetObjectId :: object -> objectId
11+
{ oprObjectId :: object -> objectId
1312
-- ^ Return the id of the specified object
14-
, objectPoolGetSnapshot :: STM m (ObjectPoolSnapshot objectId object ticketNo)
15-
-- ^ Get a snapshot of the object pool
16-
, objectPoolZeroTicketNo :: ticketNo
17-
-- ^ Ticket number before the first item in the pool (so objectPoolObjectsAfter objectPoolZeroTicketNo returns all possible objects)
18-
}
19-
data ObjectPoolSnapshot objectId object ticketNo
20-
= ObjectPoolSnapshot
21-
-- TODO: revisit when we will have to load certificates from disk. This method might not be pure anymore
22-
{ objectPoolObjectsAfter :: ticketNo -> [(object, ticketNo, SizeInBytes)]
23-
-- ^ Get all objects having a ticket number strictly greater than the given one, along with their ticket numbers and sizes
24-
, objectPoolHasObject :: objectId -> Bool
25-
-- ^ Check if the object pool contains an object with the given id
13+
, oprZeroTicketNo :: ticketNo
14+
-- ^ Ticket number before the first item in the pool (so oprObjectsAfter oprZeroTicketNo returns all possible objects)
15+
, oprObjectsAfter :: ticketNo -> Word64 -> STM m [(ticketNo, objectId, m object)]
16+
-- ^ Get the list of objects available in the pool with a ticketNo greater than the specified one. The number of returned objects is capped by the given Word64.
17+
-- Only the IDs and ticketNos of the objects are directly accessible; each actual object must be loaded through a monadic action.
18+
-- TODO: This signature assume that we have all the IDs and ticketNos in memory, but not the actual objects. This might change if IDs must be loaded from disk too.
2619
}
2720

2821
data ObjectPoolWriter objectId object m
2922
= ObjectPoolWriter
30-
{ wrGetObjectId :: object -> objectId
23+
{ opwObjectId :: object -> objectId
3124
-- ^ Return the id of the specified object
32-
, objectPoolAddObjects :: [object] -> m ()
25+
, opwAddObjects :: [object] -> m ()
3326
-- ^ Add a batch of objects to the objectPool.
27+
, opwHasObject :: m (objectId -> Bool)
28+
-- ^ Check if the object pool contains an object with the given id
3429
}

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ObjectDiffusion/ObjectPool/PerasCert.hs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
55
, makePerasCertPoolWriterFromChainDB
66
) where
77

8-
import Data.Functor ((<&>))
98
import Ouroboros.Consensus.Block
109
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
1110
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
@@ -24,18 +23,15 @@ makePerasCertPoolReader ::
2423
ObjectPoolReader PerasRoundNo (PerasCert blk) PerasCertTicketNo m
2524
makePerasCertPoolReader getCertSnapshot =
2625
ObjectPoolReader
27-
{ rdrGetObjectId = perasCertRound
28-
, objectPoolGetSnapshot =
29-
getCertSnapshot <&> \snap ->
30-
ObjectPoolSnapshot
31-
{ objectPoolObjectsAfter = \ticketNo ->
32-
[ (cert, tno, sz)
33-
| (cert, tno) <- PerasCertDB.getCertsAfter snap ticketNo
34-
, let sz = 0 -- TODO
35-
]
36-
, objectPoolHasObject = PerasCertDB.containsCert snap
37-
}
38-
, objectPoolZeroTicketNo = PerasCertDB.zeroPerasCertTicketNo
26+
{ oprObjectId = perasCertRound
27+
, oprZeroTicketNo = PerasCertDB.zeroPerasCertTicketNo
28+
, oprObjectsAfter = \lastKnown limit -> do
29+
certSnapshot <- getCertSnapshot
30+
pure $
31+
take (fromIntegral limit) $
32+
[ (ticketNo, perasCertRound cert, pure cert)
33+
| (cert, ticketNo) <- PerasCertDB.getCertsAfter certSnapshot lastKnown
34+
]
3935
}
4036

4137
makePerasCertPoolReaderFromCertDB ::
@@ -45,13 +41,16 @@ makePerasCertPoolReaderFromCertDB perasCertDB =
4541
makePerasCertPoolReader (PerasCertDB.getCertSnapshot perasCertDB)
4642

4743
makePerasCertPoolWriterFromCertDB ::
48-
(StandardHash blk, Monad m) =>
44+
(StandardHash blk, MonadSTM m) =>
4945
PerasCertDB m blk -> ObjectPoolWriter PerasRoundNo (PerasCert blk) m
5046
makePerasCertPoolWriterFromCertDB perasCertDB =
5147
ObjectPoolWriter
52-
{ wrGetObjectId = perasCertRound
53-
, objectPoolAddObjects =
48+
{ opwObjectId = perasCertRound
49+
, opwAddObjects =
5450
mapM_ $ PerasCertDB.addCert perasCertDB
51+
, opwHasObject = do
52+
certSnapshot <- atomically $ PerasCertDB.getCertSnapshot perasCertDB
53+
pure $ PerasCertDB.containsCert certSnapshot
5554
}
5655

5756
makePerasCertPoolReaderFromChainDB ::
@@ -61,11 +60,14 @@ makePerasCertPoolReaderFromChainDB chainDB =
6160
makePerasCertPoolReader (ChainDB.getPerasCertSnapshot chainDB)
6261

6362
makePerasCertPoolWriterFromChainDB ::
64-
(StandardHash blk, Monad m) =>
63+
(StandardHash blk, MonadSTM m) =>
6564
ChainDB m blk -> ObjectPoolWriter PerasRoundNo (PerasCert blk) m
6665
makePerasCertPoolWriterFromChainDB chainDB =
6766
ObjectPoolWriter
68-
{ wrGetObjectId = perasCertRound
69-
, objectPoolAddObjects =
67+
{ opwObjectId = perasCertRound
68+
, opwAddObjects =
7069
mapM_ $ ChainDB.addPerasCertAsync chainDB
70+
, opwHasObject = do
71+
certSnapshot <- atomically $ ChainDB.getPerasCertSnapshot chainDB
72+
pure $ PerasCertDB.containsCert certSnapshot
7173
}

0 commit comments

Comments
 (0)