Skip to content

Commit 2646799

Browse files
committed
Carry a forker in the mempool
1 parent 2037d14 commit 2646799

File tree

17 files changed

+442
-318
lines changed

17 files changed

+442
-318
lines changed

ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Analysis.hs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -862,31 +862,29 @@ reproMempoolForge numBlks env = do
862862
<> "1 or 2 blocks at a time, not "
863863
<> show numBlks
864864

865-
mempool <-
866-
Mempool.openMempoolWithoutSyncThread
867-
Mempool.LedgerInterface
868-
{ Mempool.getCurrentLedgerState = ledgerState <$> LedgerDB.getVolatileTip ledgerDB
869-
, Mempool.getLedgerTablesAtFor = \pt keys -> do
870-
frk <- LedgerDB.getForkerAtTarget ledgerDB registry (SpecificPoint pt)
871-
case frk of
872-
Left _ -> pure Nothing
873-
Right fr -> do
874-
tbs <-
875-
Just . castLedgerTables
876-
<$> LedgerDB.forkerReadTables fr (castLedgerTables keys)
877-
LedgerDB.forkerClose fr
878-
pure tbs
879-
}
880-
lCfg
881-
-- one mebibyte should generously accomodate two blocks' worth of txs
882-
( Mempool.MempoolCapacityBytesOverride $
883-
LedgerSupportsMempool.ByteSize32 $
884-
1024 * 1024
885-
)
886-
nullTracer
887-
888-
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
889-
pure Nothing
865+
withRegistry $ \reg -> do
866+
mempool <-
867+
Mempool.openMempoolWithoutSyncThread
868+
reg
869+
Mempool.LedgerInterface
870+
{ Mempool.getCurrentLedgerState = \reg' -> do
871+
st <- LedgerDB.getVolatileTip ledgerDB
872+
pure
873+
( ledgerState st
874+
, fmap (LedgerDB.ledgerStateReadOnlyForker . LedgerDB.readOnlyForker)
875+
<$> LedgerDB.getForkerAtTarget ledgerDB reg' (SpecificPoint (castPoint $ getTip st))
876+
)
877+
}
878+
lCfg
879+
-- one mebibyte should generously accomodate two blocks' worth of txs
880+
( Mempool.MempoolCapacityBytesOverride $
881+
LedgerSupportsMempool.ByteSize32 $
882+
1024 * 1024
883+
)
884+
nullTracer
885+
886+
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
887+
pure Nothing
890888
where
891889
AnalysisEnv
892890
{ cfg

ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -628,16 +628,15 @@ runThreadNetwork
628628
HasCallStack =>
629629
OracularClock m ->
630630
SlotNo ->
631-
ResourceRegistry m ->
632631
(SlotNo -> STM m ()) ->
633632
STM m (Point blk) ->
634633
(ResourceRegistry m -> m (ReadOnlyForker' m blk)) ->
635634
Mempool m blk ->
636635
[GenTx blk] ->
637636
-- \^ valid transactions the node should immediately propagate
638-
m ()
639-
forkCrucialTxs clock s0 registry unblockForge getTipPoint mforker mempool txs0 = do
640-
void $ forkLinkedThread registry "crucialTxs" $ withRegistry $ \reg -> do
637+
m (Thread m Void)
638+
forkCrucialTxs clock s0 unblockForge getTipPoint mforker mempool txs0 = do
639+
testForkMempoolThread mempool "crucialTxs" $ withRegistry $ \reg -> do
641640
let loop (slot, mempFp) = do
642641
forker <- mforker reg
643642
extLedger <- atomically $ roforkerGetLedgerState forker
@@ -1150,15 +1149,17 @@ runThreadNetwork
11501149
--
11511150
-- TODO Is there a risk that this will block because the 'forkTxProducer'
11521151
-- fills up the mempool too quickly?
1153-
forkCrucialTxs
1154-
clock
1155-
joinSlot
1156-
registry
1157-
unblockForge
1158-
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1159-
getForker
1160-
mempool
1161-
txs0
1152+
threadCrucialTxs <-
1153+
forkCrucialTxs
1154+
clock
1155+
joinSlot
1156+
unblockForge
1157+
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1158+
getForker
1159+
mempool
1160+
txs0
1161+
1162+
void $ allocate registry (\_ -> pure threadCrucialTxs) cancelThread
11621163

11631164
forkTxProducer
11641165
coreNodeId
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<!--
2+
A new scriv changelog fragment.
3+
4+
Uncomment the section that is right (remove the HTML comment wrapper).
5+
For top level release notes, leave all the headers commented out.
6+
-->
7+
8+
### Patch
9+
10+
- The mempool will now carry its own forker instead of acquiring one on each
11+
revalidation. This particularly implies that the mempool will no longer
12+
re-sync under the hood while trying to add a transaction, and only the
13+
background thread will perform such a re-sync.
14+
15+
- The ledger state that triggers a re-sync in the background thread will be the
16+
one used to revalidate the mempool. Previously, another read for the new tip
17+
would be performed when starting the sync, which could lead to confusing
18+
(although innocuous) situations in which:
19+
20+
- the tip changes from A to B
21+
- the background thread sees the tip changed from A, so it records the new tip (B) and triggers the re-sync
22+
- the tip changes again from B to C before the syncing process reads the tip again
23+
- the mempool is re-synced with C
24+
- the background thread would now see that the tip changed from B, so it records the new tip (C) and triggers the re-sync
25+
- the mempool is re-synced **AGAIN** with C
26+
27+
This sequence of actions can't happen again.
28+
29+
- The mempool now has its own registry in which it allocates forkers. The
30+
background thread was moved to this inner registry such that it can access the
31+
mempool internal registry, but an action to cancel it will still live in the
32+
outer registry, to ensure the thread is closed before we attempt to close the
33+
mempool internal registry. Otherwise we would run into a race condition if the
34+
background thread would attempt a resync while the internal registry was being
35+
closed.
36+
37+
<!--
38+
### Non-Breaking
39+
40+
- A bullet item for the Non-Breaking category.
41+
42+
-->
43+
<!--
44+
### Breaking
45+
46+
- A bullet item for the Breaking category.
47+
48+
-->

ouroboros-consensus/ouroboros-consensus.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ library unstable-mempool-test-utils
540540
exposed-modules: Test.Consensus.Mempool.Mocked
541541
build-depends:
542542
base,
543+
resource-registry,
543544
contra-tracer,
544545
deepseq,
545546
ouroboros-consensus,

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/API.hs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE RankNTypes #-}
23
{-# LANGUAGE ScopedTypeVariables #-}
34
{-# LANGUAGE StandaloneDeriving #-}
45
{-# LANGUAGE UndecidableInstances #-}
@@ -34,6 +35,7 @@ module Ouroboros.Consensus.Mempool.API
3435
, zeroTicketNo
3536
) where
3637

38+
import Control.ResourceRegistry
3739
import qualified Data.List.NonEmpty as NE
3840
import Ouroboros.Consensus.Block (ChainHash, SlotNo)
3941
import Ouroboros.Consensus.Ledger.Abstract
@@ -212,6 +214,12 @@ data Mempool m blk = Mempool
212214
-- Instead, we treat it the same way as a Mempool which is /at/
213215
-- capacity, i.e., we won't admit new transactions until some have been
214216
-- removed because they have become invalid.
217+
, testForkMempoolThread :: forall a. String -> m a -> m (Thread m a)
218+
-- ^ FOR TESTS ONLY
219+
--
220+
-- If we want to run a thread that can perform syncs in the mempool, it needs
221+
-- to be registered in the mempool's internal registry. This function exposes
222+
-- such functionality.
215223
}
216224

217225
{-------------------------------------------------------------------------------

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Impl/Common.hs

Lines changed: 55 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
{-# LANGUAGE DeriveGeneric #-}
44
{-# LANGUAGE FlexibleContexts #-}
55
{-# LANGUAGE NamedFieldPuns #-}
6+
{-# LANGUAGE RankNTypes #-}
67
{-# LANGUAGE ScopedTypeVariables #-}
78
{-# LANGUAGE StandaloneDeriving #-}
89
{-# LANGUAGE TypeApplications #-}
@@ -39,9 +40,9 @@ module Ouroboros.Consensus.Mempool.Impl.Common
3940
, tickLedgerState
4041
) where
4142

42-
import Control.Concurrent.Class.MonadMVar (MVar, newMVar)
4343
import Control.Concurrent.Class.MonadSTM.Strict.TMVar (newTMVarIO)
4444
import Control.Monad.Trans.Except (runExcept)
45+
import Control.ResourceRegistry
4546
import Control.Tracer
4647
import qualified Data.Foldable as Foldable
4748
import qualified Data.List.NonEmpty as NE
@@ -61,8 +62,11 @@ import Ouroboros.Consensus.Mempool.TxSeq (TxSeq (..), TxTicket (..))
6162
import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq
6263
import Ouroboros.Consensus.Storage.ChainDB (ChainDB)
6364
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
65+
import Ouroboros.Consensus.Storage.LedgerDB.Forker
6466
import Ouroboros.Consensus.Util.Enclose (EnclosingTimed)
6567
import Ouroboros.Consensus.Util.IOLike hiding (newMVar)
68+
import Ouroboros.Consensus.Util.NormalForm.StrictMVar
69+
import Ouroboros.Network.Protocol.LocalStateQuery.Type
6670

6771
{-------------------------------------------------------------------------------
6872
Internal State
@@ -188,26 +192,26 @@ initInternalState capacityOverride lastTicketNo cfg slot st =
188192

189193
-- | Abstract interface needed to run a Mempool.
190194
data LedgerInterface m blk = LedgerInterface
191-
{ getCurrentLedgerState :: STM m (LedgerState blk EmptyMK)
192-
-- ^ Get the current tip of the LedgerDB.
193-
, getLedgerTablesAtFor ::
194-
Point blk ->
195-
LedgerTables (LedgerState blk) KeysMK ->
196-
m (Maybe (LedgerTables (LedgerState blk) ValuesMK))
197-
-- ^ Get values at the given point on the chain. Returns Nothing if the
198-
-- anchor moved or if the state is not found on the ledger db.
195+
{ getCurrentLedgerState ::
196+
ResourceRegistry m ->
197+
STM m (LedgerState blk EmptyMK, m (Either GetForkerError (ReadOnlyForker m (LedgerState blk) blk)))
198+
-- ^ Get the current tip of the LedgerDB and an action to get a forker there.
199199
}
200200

201201
-- | Create a 'LedgerInterface' from a 'ChainDB'.
202202
chainDBLedgerInterface ::
203-
IOLike m =>
204-
ChainDB m blk -> LedgerInterface m blk
203+
(IOLike m, IsLedger (LedgerState blk)) =>
204+
ChainDB m blk ->
205+
LedgerInterface m blk
205206
chainDBLedgerInterface chainDB =
206207
LedgerInterface
207-
{ getCurrentLedgerState =
208-
ledgerState <$> ChainDB.getCurrentLedger chainDB
209-
, getLedgerTablesAtFor = \pt keys ->
210-
fmap castLedgerTables <$> ChainDB.getLedgerTablesAtFor chainDB pt (castLedgerTables keys)
208+
{ getCurrentLedgerState = \reg -> do
209+
st <- ChainDB.getCurrentLedger chainDB
210+
pure
211+
( ledgerState st
212+
, fmap (fmap ledgerStateReadOnlyForker) $
213+
ChainDB.getReadOnlyForkerAtPoint chainDB reg (SpecificPoint (castPoint $ getTip st))
214+
)
211215
}
212216

213217
{-------------------------------------------------------------------------------
@@ -219,10 +223,12 @@ chainDBLedgerInterface chainDB =
219223
-- different operations.
220224
data MempoolEnv m blk = MempoolEnv
221225
{ mpEnvLedger :: LedgerInterface m blk
226+
, mpEnvForker :: StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
222227
, mpEnvLedgerCfg :: LedgerConfig blk
228+
, mpEnvRegistry :: ResourceRegistry m
223229
, mpEnvStateVar :: StrictTMVar m (InternalState blk)
224-
, mpEnvAddTxsRemoteFifo :: MVar m ()
225-
, mpEnvAddTxsAllFifo :: MVar m ()
230+
, mpEnvAddTxsRemoteFifo :: StrictMVar m ()
231+
, mpEnvAddTxsAllFifo :: StrictMVar m ()
226232
, mpEnvTracer :: Tracer m (TraceEventMempool blk)
227233
, mpEnvCapacityOverride :: MempoolCapacityBytesOverride
228234
}
@@ -236,25 +242,39 @@ initMempoolEnv ::
236242
LedgerConfig blk ->
237243
MempoolCapacityBytesOverride ->
238244
Tracer m (TraceEventMempool blk) ->
245+
ResourceRegistry m ->
239246
m (MempoolEnv m blk)
240-
initMempoolEnv ledgerInterface cfg capacityOverride tracer = do
241-
st <- atomically $ getCurrentLedgerState ledgerInterface
242-
let (slot, st') = tickLedgerState cfg (ForgeInUnknownSlot st)
243-
isVar <-
244-
newTMVarIO $
245-
initInternalState capacityOverride TxSeq.zeroTicketNo cfg slot st'
246-
addTxRemoteFifo <- newMVar ()
247-
addTxAllFifo <- newMVar ()
248-
return
249-
MempoolEnv
250-
{ mpEnvLedger = ledgerInterface
251-
, mpEnvLedgerCfg = cfg
252-
, mpEnvStateVar = isVar
253-
, mpEnvAddTxsRemoteFifo = addTxRemoteFifo
254-
, mpEnvAddTxsAllFifo = addTxAllFifo
255-
, mpEnvTracer = tracer
256-
, mpEnvCapacityOverride = capacityOverride
257-
}
247+
initMempoolEnv ledgerInterface cfg capacityOverride tracer registry = do
248+
(_, mpEnvRegistry) <- allocate registry (\_ -> unsafeNewRegistry) closeRegistry
249+
initMempoolEnv' mpEnvRegistry
250+
where
251+
initMempoolEnv' reg = do
252+
(st, meFrk) <- atomically $ getCurrentLedgerState ledgerInterface reg
253+
eFrk <- meFrk
254+
case eFrk of
255+
-- This should happen very rarely, if between getting the state and getting
256+
-- the forker, the ledgerdb has changed. We just retry here.
257+
Left{} -> initMempoolEnv' reg
258+
Right frk -> do
259+
frkMVar <- newMVar frk
260+
let (slot, st') = tickLedgerState cfg (ForgeInUnknownSlot st)
261+
isVar <-
262+
newTMVarIO $
263+
initInternalState capacityOverride TxSeq.zeroTicketNo cfg slot st'
264+
addTxRemoteFifo <- newMVar ()
265+
addTxAllFifo <- newMVar ()
266+
return
267+
MempoolEnv
268+
{ mpEnvLedger = ledgerInterface
269+
, mpEnvLedgerCfg = cfg
270+
, mpEnvForker = frkMVar
271+
, mpEnvRegistry = reg
272+
, mpEnvStateVar = isVar
273+
, mpEnvAddTxsRemoteFifo = addTxRemoteFifo
274+
, mpEnvAddTxsAllFifo = addTxAllFifo
275+
, mpEnvTracer = tracer
276+
, mpEnvCapacityOverride = capacityOverride
277+
}
258278

259279
{-------------------------------------------------------------------------------
260280
Ticking the ledger state

0 commit comments

Comments
 (0)