Skip to content

Commit 78dc909

Browse files
committed
Carry a forker in the mempool
1 parent 2037d14 commit 78dc909

File tree

17 files changed

+470
-340
lines changed

17 files changed

+470
-340
lines changed

ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Analysis.hs

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -862,31 +862,29 @@ reproMempoolForge numBlks env = do
862862
<> "1 or 2 blocks at a time, not "
863863
<> show numBlks
864864

865-
mempool <-
866-
Mempool.openMempoolWithoutSyncThread
867-
Mempool.LedgerInterface
868-
{ Mempool.getCurrentLedgerState = ledgerState <$> LedgerDB.getVolatileTip ledgerDB
869-
, Mempool.getLedgerTablesAtFor = \pt keys -> do
870-
frk <- LedgerDB.getForkerAtTarget ledgerDB registry (SpecificPoint pt)
871-
case frk of
872-
Left _ -> pure Nothing
873-
Right fr -> do
874-
tbs <-
875-
Just . castLedgerTables
876-
<$> LedgerDB.forkerReadTables fr (castLedgerTables keys)
877-
LedgerDB.forkerClose fr
878-
pure tbs
879-
}
880-
lCfg
881-
-- one mebibyte should generously accomodate two blocks' worth of txs
882-
( Mempool.MempoolCapacityBytesOverride $
883-
LedgerSupportsMempool.ByteSize32 $
884-
1024 * 1024
885-
)
886-
nullTracer
887-
888-
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
889-
pure Nothing
865+
withRegistry $ \reg -> do
866+
mempool <-
867+
Mempool.openMempoolWithoutSyncThread
868+
reg
869+
Mempool.LedgerInterface
870+
{ Mempool.getCurrentLedgerState = \reg' -> do
871+
st <- LedgerDB.getVolatileTip ledgerDB
872+
pure
873+
( ledgerState st
874+
, fmap (LedgerDB.ledgerStateReadOnlyForker . LedgerDB.readOnlyForker)
875+
<$> LedgerDB.getForkerAtTarget ledgerDB reg' (SpecificPoint (castPoint $ getTip st))
876+
)
877+
}
878+
lCfg
879+
-- one mebibyte should generously accomodate two blocks' worth of txs
880+
( Mempool.MempoolCapacityBytesOverride $
881+
LedgerSupportsMempool.ByteSize32 $
882+
1024 * 1024
883+
)
884+
nullTracer
885+
886+
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
887+
pure Nothing
890888
where
891889
AnalysisEnv
892890
{ cfg
@@ -991,7 +989,7 @@ reproMempoolForge numBlks env = do
991989
LedgerDB.tryFlush ledgerDB
992990

993991
-- this flushes blk from the mempool, since every tx in it is now on the chain
994-
void $ Mempool.syncWithLedger mempool
992+
void $ Mempool.testSyncWithLedger mempool
995993

996994
{-------------------------------------------------------------------------------
997995
Auxiliary: processing all blocks in the DB

ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -628,16 +628,15 @@ runThreadNetwork
628628
HasCallStack =>
629629
OracularClock m ->
630630
SlotNo ->
631-
ResourceRegistry m ->
632631
(SlotNo -> STM m ()) ->
633632
STM m (Point blk) ->
634633
(ResourceRegistry m -> m (ReadOnlyForker' m blk)) ->
635634
Mempool m blk ->
636635
[GenTx blk] ->
637636
-- \^ valid transactions the node should immediately propagate
638-
m ()
639-
forkCrucialTxs clock s0 registry unblockForge getTipPoint mforker mempool txs0 = do
640-
void $ forkLinkedThread registry "crucialTxs" $ withRegistry $ \reg -> do
637+
m (Thread m Void)
638+
forkCrucialTxs clock s0 unblockForge getTipPoint mforker mempool txs0 = do
639+
testForkMempoolThread mempool "crucialTxs" $ withRegistry $ \reg -> do
641640
let loop (slot, mempFp) = do
642641
forker <- mforker reg
643642
extLedger <- atomically $ roforkerGetLedgerState forker
@@ -679,7 +678,7 @@ runThreadNetwork
679678
-- avoid the race in which we wake up before the mempool's
680679
-- background thread wakes up by mimicking it before we do
681680
-- anything else
682-
void $ syncWithLedger mempool
681+
void $ testSyncWithLedger mempool
683682

684683
loop fps'
685684
loop (s0, [])
@@ -1150,15 +1149,17 @@ runThreadNetwork
11501149
--
11511150
-- TODO Is there a risk that this will block because the 'forkTxProducer'
11521151
-- fills up the mempool too quickly?
1153-
forkCrucialTxs
1154-
clock
1155-
joinSlot
1156-
registry
1157-
unblockForge
1158-
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1159-
getForker
1160-
mempool
1161-
txs0
1152+
threadCrucialTxs <-
1153+
forkCrucialTxs
1154+
clock
1155+
joinSlot
1156+
unblockForge
1157+
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1158+
getForker
1159+
mempool
1160+
txs0
1161+
1162+
void $ allocate registry (\_ -> pure threadCrucialTxs) cancelThread
11621163

11631164
forkTxProducer
11641165
coreNodeId
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<!--
2+
A new scriv changelog fragment.
3+
4+
Uncomment the section that is right (remove the HTML comment wrapper).
5+
For top level release notes, leave all the headers commented out.
6+
-->
7+
8+
### Patch
9+
10+
- The mempool will now carry its own forker instead of acquiring one on each
11+
revalidation. This particularly implies that the mempool will no longer
12+
re-sync under the hood while trying to add a transaction, and only the
13+
background thread will perform such a re-sync.
14+
15+
- The ledger state that triggers a re-sync in the background thread will be the
16+
one used to revalidate the mempool. Previously, another read for the new tip
17+
would be performed when starting the sync, which could lead to confusing
18+
(although innocuous) situations in which:
19+
20+
- the tip changes from A to B
21+
- the background thread sees the tip changed from A, so it records the new tip (B) and triggers the re-sync
22+
- the tip changes again from B to C before the syncing process reads the tip again
23+
- the mempool is re-synced with C
24+
- the background thread would now see that the tip changed from B, so it records the new tip (C) and triggers the re-sync
25+
- the mempool is re-synced **AGAIN** with C
26+
27+
This sequence of actions can't happen again.
28+
29+
- The mempool now has its own registry in which it allocates forkers. The
30+
background thread was moved to this inner registry such that it can access the
31+
mempool internal registry, but an action to cancel it will still live in the
32+
outer registry, to ensure the thread is closed before we attempt to close the
33+
mempool internal registry. Otherwise we would run into a race condition if the
34+
background thread would attempt a resync while the internal registry was being
35+
closed.
36+
37+
<!--
38+
### Non-Breaking
39+
40+
- A bullet item for the Non-Breaking category.
41+
42+
-->
43+
44+
### Breaking
45+
46+
- Removed `getLedgerTablesAtFor` from the ChainDB API. Clients now have to
47+
actually open a forker and manage it.

ouroboros-consensus/ouroboros-consensus.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ library unstable-mempool-test-utils
543543
contra-tracer,
544544
deepseq,
545545
ouroboros-consensus,
546+
resource-registry,
546547
strict-stm,
547548

548549
library unstable-tutorials

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/API.hs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE RankNTypes #-}
23
{-# LANGUAGE ScopedTypeVariables #-}
34
{-# LANGUAGE StandaloneDeriving #-}
45
{-# LANGUAGE UndecidableInstances #-}
@@ -34,6 +35,7 @@ module Ouroboros.Consensus.Mempool.API
3435
, zeroTicketNo
3536
) where
3637

38+
import Control.ResourceRegistry
3739
import qualified Data.List.NonEmpty as NE
3840
import Ouroboros.Consensus.Block (ChainHash, SlotNo)
3941
import Ouroboros.Consensus.Ledger.Abstract
@@ -112,7 +114,7 @@ data Mempool m blk = Mempool
112114
-- transactions which were valid in an older ledger state but are invalid
113115
-- in the current ledger state, could exist within the mempool until they
114116
-- are revalidated and dropped from the mempool via a call to
115-
-- 'syncWithLedger' or by the background thread that watches the ledger
117+
-- 'testSyncWithLedger' or by the background thread that watches the ledger
116118
-- for changes.
117119
--
118120
-- This action returns one of two results.
@@ -161,22 +163,6 @@ data Mempool m blk = Mempool
161163
-- persistence.
162164
, removeTxsEvenIfValid :: NE.NonEmpty (GenTxId blk) -> m ()
163165
-- ^ Manually remove the given transactions from the mempool.
164-
, syncWithLedger :: m (MempoolSnapshot blk)
165-
-- ^ Sync the transactions in the mempool with the current ledger state
166-
-- of the 'ChainDB'.
167-
--
168-
-- The transactions that exist within the mempool will be revalidated
169-
-- against the current ledger state. Transactions which are found to be
170-
-- invalid with respect to the current ledger state, will be dropped
171-
-- from the mempool, whereas valid transactions will remain.
172-
--
173-
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
174-
-- of persistence. Additionally, this makes it possible to trace the
175-
-- removal of invalid transactions.
176-
--
177-
-- n.b. in our current implementation, when one opens a mempool, we
178-
-- spawn a thread which performs this action whenever the 'ChainDB' tip
179-
-- point changes.
180166
, getSnapshot :: STM m (MempoolSnapshot blk)
181167
-- ^ Get a snapshot of the current mempool state. This allows for
182168
-- further pure queries on the snapshot.
@@ -212,6 +198,30 @@ data Mempool m blk = Mempool
212198
-- Instead, we treat it the same way as a Mempool which is /at/
213199
-- capacity, i.e., we won't admit new transactions until some have been
214200
-- removed because they have become invalid.
201+
, testSyncWithLedger :: m (MempoolSnapshot blk)
202+
-- ^ ONLY FOR TESTS
203+
--
204+
-- Sync the transactions in the mempool with the current ledger state
205+
-- of the 'ChainDB'.
206+
--
207+
-- The transactions that exist within the mempool will be revalidated
208+
-- against the current ledger state. Transactions which are found to be
209+
-- invalid with respect to the current ledger state, will be dropped
210+
-- from the mempool, whereas valid transactions will remain.
211+
--
212+
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
213+
-- of persistence. Additionally, this makes it possible to trace the
214+
-- removal of invalid transactions.
215+
--
216+
-- n.b. in our current implementation, when one opens a mempool, we
217+
-- spawn a thread which performs this action whenever the 'ChainDB' tip
218+
-- point changes.
219+
, testForkMempoolThread :: forall a. String -> m a -> m (Thread m a)
220+
-- ^ FOR TESTS ONLY
221+
--
222+
-- If we want to run a thread that can perform syncs in the mempool, it needs
223+
-- to be registered in the mempool's internal registry. This function exposes
224+
-- such functionality.
215225
}
216226

217227
{-------------------------------------------------------------------------------

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Impl/Common.hs

Lines changed: 55 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
{-# LANGUAGE DeriveGeneric #-}
44
{-# LANGUAGE FlexibleContexts #-}
55
{-# LANGUAGE NamedFieldPuns #-}
6+
{-# LANGUAGE RankNTypes #-}
67
{-# LANGUAGE ScopedTypeVariables #-}
78
{-# LANGUAGE StandaloneDeriving #-}
89
{-# LANGUAGE TypeApplications #-}
@@ -39,9 +40,9 @@ module Ouroboros.Consensus.Mempool.Impl.Common
3940
, tickLedgerState
4041
) where
4142

42-
import Control.Concurrent.Class.MonadMVar (MVar, newMVar)
4343
import Control.Concurrent.Class.MonadSTM.Strict.TMVar (newTMVarIO)
4444
import Control.Monad.Trans.Except (runExcept)
45+
import Control.ResourceRegistry
4546
import Control.Tracer
4647
import qualified Data.Foldable as Foldable
4748
import qualified Data.List.NonEmpty as NE
@@ -61,8 +62,11 @@ import Ouroboros.Consensus.Mempool.TxSeq (TxSeq (..), TxTicket (..))
6162
import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq
6263
import Ouroboros.Consensus.Storage.ChainDB (ChainDB)
6364
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
65+
import Ouroboros.Consensus.Storage.LedgerDB.Forker
6466
import Ouroboros.Consensus.Util.Enclose (EnclosingTimed)
6567
import Ouroboros.Consensus.Util.IOLike hiding (newMVar)
68+
import Ouroboros.Consensus.Util.NormalForm.StrictMVar
69+
import Ouroboros.Network.Protocol.LocalStateQuery.Type
6670

6771
{-------------------------------------------------------------------------------
6872
Internal State
@@ -188,26 +192,26 @@ initInternalState capacityOverride lastTicketNo cfg slot st =
188192

189193
-- | Abstract interface needed to run a Mempool.
190194
data LedgerInterface m blk = LedgerInterface
191-
{ getCurrentLedgerState :: STM m (LedgerState blk EmptyMK)
192-
-- ^ Get the current tip of the LedgerDB.
193-
, getLedgerTablesAtFor ::
194-
Point blk ->
195-
LedgerTables (LedgerState blk) KeysMK ->
196-
m (Maybe (LedgerTables (LedgerState blk) ValuesMK))
197-
-- ^ Get values at the given point on the chain. Returns Nothing if the
198-
-- anchor moved or if the state is not found on the ledger db.
195+
{ getCurrentLedgerState ::
196+
ResourceRegistry m ->
197+
STM m (LedgerState blk EmptyMK, m (Either GetForkerError (ReadOnlyForker m (LedgerState blk) blk)))
198+
-- ^ Get the current tip of the LedgerDB and an action to get a forker there.
199199
}
200200

201201
-- | Create a 'LedgerInterface' from a 'ChainDB'.
202202
chainDBLedgerInterface ::
203-
IOLike m =>
204-
ChainDB m blk -> LedgerInterface m blk
203+
(IOLike m, IsLedger (LedgerState blk)) =>
204+
ChainDB m blk ->
205+
LedgerInterface m blk
205206
chainDBLedgerInterface chainDB =
206207
LedgerInterface
207-
{ getCurrentLedgerState =
208-
ledgerState <$> ChainDB.getCurrentLedger chainDB
209-
, getLedgerTablesAtFor = \pt keys ->
210-
fmap castLedgerTables <$> ChainDB.getLedgerTablesAtFor chainDB pt (castLedgerTables keys)
208+
{ getCurrentLedgerState = \reg -> do
209+
st <- ChainDB.getCurrentLedger chainDB
210+
pure
211+
( ledgerState st
212+
, fmap (fmap ledgerStateReadOnlyForker) $
213+
ChainDB.getReadOnlyForkerAtPoint chainDB reg (SpecificPoint (castPoint $ getTip st))
214+
)
211215
}
212216

213217
{-------------------------------------------------------------------------------
@@ -219,10 +223,12 @@ chainDBLedgerInterface chainDB =
219223
-- different operations.
220224
data MempoolEnv m blk = MempoolEnv
221225
{ mpEnvLedger :: LedgerInterface m blk
226+
, mpEnvForker :: StrictMVar m (ReadOnlyForker m (LedgerState blk) blk)
222227
, mpEnvLedgerCfg :: LedgerConfig blk
228+
, mpEnvRegistry :: ResourceRegistry m
223229
, mpEnvStateVar :: StrictTMVar m (InternalState blk)
224-
, mpEnvAddTxsRemoteFifo :: MVar m ()
225-
, mpEnvAddTxsAllFifo :: MVar m ()
230+
, mpEnvAddTxsRemoteFifo :: StrictMVar m ()
231+
, mpEnvAddTxsAllFifo :: StrictMVar m ()
226232
, mpEnvTracer :: Tracer m (TraceEventMempool blk)
227233
, mpEnvCapacityOverride :: MempoolCapacityBytesOverride
228234
}
@@ -236,25 +242,39 @@ initMempoolEnv ::
236242
LedgerConfig blk ->
237243
MempoolCapacityBytesOverride ->
238244
Tracer m (TraceEventMempool blk) ->
245+
ResourceRegistry m ->
239246
m (MempoolEnv m blk)
240-
initMempoolEnv ledgerInterface cfg capacityOverride tracer = do
241-
st <- atomically $ getCurrentLedgerState ledgerInterface
242-
let (slot, st') = tickLedgerState cfg (ForgeInUnknownSlot st)
243-
isVar <-
244-
newTMVarIO $
245-
initInternalState capacityOverride TxSeq.zeroTicketNo cfg slot st'
246-
addTxRemoteFifo <- newMVar ()
247-
addTxAllFifo <- newMVar ()
248-
return
249-
MempoolEnv
250-
{ mpEnvLedger = ledgerInterface
251-
, mpEnvLedgerCfg = cfg
252-
, mpEnvStateVar = isVar
253-
, mpEnvAddTxsRemoteFifo = addTxRemoteFifo
254-
, mpEnvAddTxsAllFifo = addTxAllFifo
255-
, mpEnvTracer = tracer
256-
, mpEnvCapacityOverride = capacityOverride
257-
}
247+
initMempoolEnv ledgerInterface cfg capacityOverride tracer registry = do
248+
(_, mpEnvRegistry) <- allocate registry (\_ -> unsafeNewRegistry) closeRegistry
249+
initMempoolEnv' mpEnvRegistry
250+
where
251+
initMempoolEnv' reg = do
252+
(st, meFrk) <- atomically $ getCurrentLedgerState ledgerInterface reg
253+
eFrk <- meFrk
254+
case eFrk of
255+
-- This should happen very rarely, if between getting the state and getting
256+
-- the forker, the ledgerdb has changed. We just retry here.
257+
Left{} -> initMempoolEnv' reg
258+
Right frk -> do
259+
frkMVar <- newMVar frk
260+
let (slot, st') = tickLedgerState cfg (ForgeInUnknownSlot st)
261+
isVar <-
262+
newTMVarIO $
263+
initInternalState capacityOverride TxSeq.zeroTicketNo cfg slot st'
264+
addTxRemoteFifo <- newMVar ()
265+
addTxAllFifo <- newMVar ()
266+
return
267+
MempoolEnv
268+
{ mpEnvLedger = ledgerInterface
269+
, mpEnvLedgerCfg = cfg
270+
, mpEnvForker = frkMVar
271+
, mpEnvRegistry = reg
272+
, mpEnvStateVar = isVar
273+
, mpEnvAddTxsRemoteFifo = addTxRemoteFifo
274+
, mpEnvAddTxsAllFifo = addTxAllFifo
275+
, mpEnvTracer = tracer
276+
, mpEnvCapacityOverride = capacityOverride
277+
}
258278

259279
{-------------------------------------------------------------------------------
260280
Ticking the ledger state

0 commit comments

Comments
 (0)