Skip to content

Commit d6eff84

Browse files
authored
Carry a forker in the mempool (#1565)
# Description - The mempool will now carry its own forker instead of acquiring one on each revalidation. This particularly implies that the mempool will no longer re-sync under the hood while trying to add a transaction, and only the background thread will perform such a re-sync. - The mempool now has its own registry in which it allocates forkers. The background thread was moved to this inner registry such that it can access the mempool internal registry, but an action to cancel it will still live in the outer registry, to ensure the thread is closed before we attempt to close the mempool internal registry. Otherwise we would run into a race condition if the background thread would attempt a resync while the internal registry was being closed.
2 parents fdd5049 + 1940671 commit d6eff84

File tree

17 files changed

+592
-478
lines changed

17 files changed

+592
-478
lines changed

ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Analysis.hs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
7777
)
7878
import Ouroboros.Consensus.Ledger.Tables.Utils
7979
import qualified Ouroboros.Consensus.Mempool as Mempool
80+
import Ouroboros.Consensus.Mempool.Impl.Common
8081
import Ouroboros.Consensus.Protocol.Abstract (LedgerView)
8182
import Ouroboros.Consensus.Storage.Common (BlockComponent (..))
8283
import Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDB)
@@ -862,31 +863,30 @@ reproMempoolForge numBlks env = do
862863
<> "1 or 2 blocks at a time, not "
863864
<> show numBlks
864865

865-
mempool <-
866-
Mempool.openMempoolWithoutSyncThread
867-
Mempool.LedgerInterface
868-
{ Mempool.getCurrentLedgerState = ledgerState <$> LedgerDB.getVolatileTip ledgerDB
869-
, Mempool.getLedgerTablesAtFor = \pt keys -> do
870-
frk <- LedgerDB.getForkerAtTarget ledgerDB registry (SpecificPoint pt)
871-
case frk of
872-
Left _ -> pure Nothing
873-
Right fr -> do
874-
tbs <-
875-
Just . castLedgerTables
876-
<$> LedgerDB.forkerReadTables fr (castLedgerTables keys)
877-
LedgerDB.forkerClose fr
878-
pure tbs
879-
}
880-
lCfg
881-
-- one mebibyte should generously accomodate two blocks' worth of txs
882-
( Mempool.MempoolCapacityBytesOverride $
883-
LedgerSupportsMempool.ByteSize32 $
884-
1024 * 1024
885-
)
886-
nullTracer
887-
888-
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
889-
pure Nothing
866+
withRegistry $ \reg -> do
867+
mempool <-
868+
Mempool.openMempoolWithoutSyncThread
869+
reg
870+
Mempool.LedgerInterface
871+
{ Mempool.getCurrentLedgerState = \reg' -> do
872+
st <- LedgerDB.getVolatileTip ledgerDB
873+
pure $
874+
MempoolLedgerDBView
875+
(ledgerState st)
876+
( fmap (LedgerDB.ledgerStateReadOnlyForker . LedgerDB.readOnlyForker)
877+
<$> LedgerDB.getForkerAtTarget ledgerDB reg' (SpecificPoint (castPoint $ getTip st))
878+
)
879+
}
880+
lCfg
881+
-- one mebibyte should generously accomodate two blocks' worth of txs
882+
( Mempool.MempoolCapacityBytesOverride $
883+
LedgerSupportsMempool.ByteSize32 $
884+
1024 * 1024
885+
)
886+
nullTracer
887+
888+
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
889+
pure Nothing
890890
where
891891
AnalysisEnv
892892
{ cfg
@@ -991,7 +991,7 @@ reproMempoolForge numBlks env = do
991991
LedgerDB.tryFlush ledgerDB
992992

993993
-- this flushes blk from the mempool, since every tx in it is now on the chain
994-
void $ Mempool.syncWithLedger mempool
994+
void $ Mempool.testSyncWithLedger mempool
995995

996996
{-------------------------------------------------------------------------------
997997
Auxiliary: processing all blocks in the DB

ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -628,16 +628,15 @@ runThreadNetwork
628628
HasCallStack =>
629629
OracularClock m ->
630630
SlotNo ->
631-
ResourceRegistry m ->
632631
(SlotNo -> STM m ()) ->
633632
STM m (Point blk) ->
634633
(ResourceRegistry m -> m (ReadOnlyForker' m blk)) ->
635634
Mempool m blk ->
636635
[GenTx blk] ->
637636
-- \^ valid transactions the node should immediately propagate
638-
m ()
639-
forkCrucialTxs clock s0 registry unblockForge getTipPoint mforker mempool txs0 = do
640-
void $ forkLinkedThread registry "crucialTxs" $ withRegistry $ \reg -> do
637+
m (Thread m Void)
638+
forkCrucialTxs clock s0 unblockForge getTipPoint mforker mempool txs0 = do
639+
testForkMempoolThread mempool "crucialTxs" $ withRegistry $ \reg -> do
641640
let loop (slot, mempFp) = do
642641
forker <- mforker reg
643642
extLedger <- atomically $ roforkerGetLedgerState forker
@@ -679,7 +678,7 @@ runThreadNetwork
679678
-- avoid the race in which we wake up before the mempool's
680679
-- background thread wakes up by mimicking it before we do
681680
-- anything else
682-
void $ syncWithLedger mempool
681+
void $ testSyncWithLedger mempool
683682

684683
loop fps'
685684
loop (s0, [])
@@ -1150,15 +1149,17 @@ runThreadNetwork
11501149
--
11511150
-- TODO Is there a risk that this will block because the 'forkTxProducer'
11521151
-- fills up the mempool too quickly?
1153-
forkCrucialTxs
1154-
clock
1155-
joinSlot
1156-
registry
1157-
unblockForge
1158-
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1159-
getForker
1160-
mempool
1161-
txs0
1152+
threadCrucialTxs <-
1153+
forkCrucialTxs
1154+
clock
1155+
joinSlot
1156+
unblockForge
1157+
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1158+
getForker
1159+
mempool
1160+
txs0
1161+
1162+
void $ allocate registry (\_ -> pure threadCrucialTxs) cancelThread
11621163

11631164
forkTxProducer
11641165
coreNodeId
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<!--
2+
A new scriv changelog fragment.
3+
4+
Uncomment the section that is right (remove the HTML comment wrapper).
5+
For top level release notes, leave all the headers commented out.
6+
-->
7+
8+
### Patch
9+
10+
- The mempool will now carry its own forker instead of acquiring one on each
11+
revalidation. This particularly implies that the mempool will no longer
12+
re-sync under the hood while trying to add a transaction, and only the
13+
background thread will perform such a re-sync.
14+
15+
- The mempool now has its own registry in which it allocates forkers. The
16+
background thread was moved to this inner registry such that it can access the
17+
mempool internal registry, but an action to cancel it will still live in the
18+
outer registry, to ensure the thread is closed before we attempt to close the
19+
mempool internal registry. Otherwise we would run into a race condition if the
20+
background thread would attempt a resync while the internal registry was being
21+
closed.
22+
23+
<!--
24+
### Non-Breaking
25+
26+
- A bullet item for the Non-Breaking category.
27+
28+
-->
29+
30+
### Breaking
31+
32+
- Removed `getLedgerTablesAtFor` from the ChainDB API. Clients now have to
33+
actually open a forker and manage it.

ouroboros-consensus/ouroboros-consensus.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ library unstable-mempool-test-utils
543543
contra-tracer,
544544
deepseq,
545545
ouroboros-consensus,
546+
resource-registry,
546547
strict-stm,
547548

548549
library unstable-tutorials

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/API.hs

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE RankNTypes #-}
23
{-# LANGUAGE ScopedTypeVariables #-}
34
{-# LANGUAGE StandaloneDeriving #-}
45
{-# LANGUAGE UndecidableInstances #-}
@@ -34,8 +35,9 @@ module Ouroboros.Consensus.Mempool.API
3435
, zeroTicketNo
3536
) where
3637

38+
import Control.ResourceRegistry
3739
import qualified Data.List.NonEmpty as NE
38-
import Ouroboros.Consensus.Block (ChainHash, SlotNo)
40+
import Ouroboros.Consensus.Block (ChainHash, Point, SlotNo)
3941
import Ouroboros.Consensus.Ledger.Abstract
4042
import Ouroboros.Consensus.Ledger.SupportsMempool
4143
import qualified Ouroboros.Consensus.Mempool.Capacity as Cap
@@ -109,11 +111,11 @@ data Mempool m blk = Mempool
109111
--
110112
-- Note that transactions that are invalid will /never/ be added to the
111113
-- mempool. However, it is possible that, at a given point in time,
112-
-- transactions which were valid in an older ledger state but are invalid
113-
-- in the current ledger state, could exist within the mempool until they
114-
-- are revalidated and dropped from the mempool via a call to
115-
-- 'syncWithLedger' or by the background thread that watches the ledger
116-
-- for changes.
114+
-- transactions which were valid in an older ledger state but are invalid in
115+
-- the current ledger state, could exist within the mempool until they are
116+
-- revalidated and dropped from the mempool via a call to by the background
117+
-- thread that watches the ledger for changes or by 'testSyncWithLedger' in
118+
-- testing scenarios.
117119
--
118120
-- This action returns one of two results.
119121
--
@@ -161,22 +163,6 @@ data Mempool m blk = Mempool
161163
-- persistence.
162164
, removeTxsEvenIfValid :: NE.NonEmpty (GenTxId blk) -> m ()
163165
-- ^ Manually remove the given transactions from the mempool.
164-
, syncWithLedger :: m (MempoolSnapshot blk)
165-
-- ^ Sync the transactions in the mempool with the current ledger state
166-
-- of the 'ChainDB'.
167-
--
168-
-- The transactions that exist within the mempool will be revalidated
169-
-- against the current ledger state. Transactions which are found to be
170-
-- invalid with respect to the current ledger state, will be dropped
171-
-- from the mempool, whereas valid transactions will remain.
172-
--
173-
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
174-
-- of persistence. Additionally, this makes it possible to trace the
175-
-- removal of invalid transactions.
176-
--
177-
-- n.b. in our current implementation, when one opens a mempool, we
178-
-- spawn a thread which performs this action whenever the 'ChainDB' tip
179-
-- point changes.
180166
, getSnapshot :: STM m (MempoolSnapshot blk)
181167
-- ^ Get a snapshot of the current mempool state. This allows for
182168
-- further pure queries on the snapshot.
@@ -212,6 +198,33 @@ data Mempool m blk = Mempool
212198
-- Instead, we treat it the same way as a Mempool which is /at/
213199
-- capacity, i.e., we won't admit new transactions until some have been
214200
-- removed because they have become invalid.
201+
, testSyncWithLedger :: m (MempoolSnapshot blk)
202+
-- ^ ONLY FOR TESTS
203+
--
204+
-- Sync the transactions in the mempool with the current ledger state
205+
-- of the 'ChainDB'.
206+
--
207+
-- The transactions that exist within the mempool will be revalidated
208+
-- against the current ledger state. Transactions which are found to be
209+
-- invalid with respect to the current ledger state, will be dropped
210+
-- from the mempool, whereas valid transactions will remain.
211+
--
212+
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
213+
-- of persistence. Additionally, this makes it possible to trace the
214+
-- removal of invalid transactions.
215+
--
216+
-- n.b. in our current implementation, when one opens a mempool, we
217+
-- spawn a thread which performs this action whenever the 'ChainDB' tip
218+
-- point changes.
219+
, testForkMempoolThread :: forall a. String -> m a -> m (Thread m a)
220+
-- ^ FOR TESTS ONLY
221+
--
222+
-- If we want to run a thread that can perform syncs in the mempool, it needs
223+
-- to be registered in the mempool's internal registry. This function exposes
224+
-- such functionality.
225+
--
226+
-- The 'String' passed will be used as the thread label, and the @m a@ will be
227+
-- the action forked in the thread.
215228
}
216229

217230
{-------------------------------------------------------------------------------
@@ -353,4 +366,5 @@ data MempoolSnapshot blk = MempoolSnapshot
353366
, snapshotStateHash :: ChainHash (TickedLedgerState blk)
354367
-- ^ The resulting state currently in the mempool after applying the
355368
-- transactions
369+
, snapshotPoint :: Point blk
356370
}

0 commit comments

Comments
 (0)