Skip to content

Commit 5dd5907

Browse files
committed
Carry a forker in the mempool
1 parent fdd5049 commit 5dd5907

File tree

17 files changed

+496
-367
lines changed

17 files changed

+496
-367
lines changed

ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Analysis.hs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
7777
)
7878
import Ouroboros.Consensus.Ledger.Tables.Utils
7979
import qualified Ouroboros.Consensus.Mempool as Mempool
80+
import Ouroboros.Consensus.Mempool.Impl.Common
8081
import Ouroboros.Consensus.Protocol.Abstract (LedgerView)
8182
import Ouroboros.Consensus.Storage.Common (BlockComponent (..))
8283
import Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDB)
@@ -862,31 +863,30 @@ reproMempoolForge numBlks env = do
862863
<> "1 or 2 blocks at a time, not "
863864
<> show numBlks
864865

865-
mempool <-
866-
Mempool.openMempoolWithoutSyncThread
867-
Mempool.LedgerInterface
868-
{ Mempool.getCurrentLedgerState = ledgerState <$> LedgerDB.getVolatileTip ledgerDB
869-
, Mempool.getLedgerTablesAtFor = \pt keys -> do
870-
frk <- LedgerDB.getForkerAtTarget ledgerDB registry (SpecificPoint pt)
871-
case frk of
872-
Left _ -> pure Nothing
873-
Right fr -> do
874-
tbs <-
875-
Just . castLedgerTables
876-
<$> LedgerDB.forkerReadTables fr (castLedgerTables keys)
877-
LedgerDB.forkerClose fr
878-
pure tbs
879-
}
880-
lCfg
881-
-- one mebibyte should generously accomodate two blocks' worth of txs
882-
( Mempool.MempoolCapacityBytesOverride $
883-
LedgerSupportsMempool.ByteSize32 $
884-
1024 * 1024
885-
)
886-
nullTracer
887-
888-
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
889-
pure Nothing
866+
withRegistry $ \reg -> do
867+
mempool <-
868+
Mempool.openMempoolWithoutSyncThread
869+
reg
870+
Mempool.LedgerInterface
871+
{ Mempool.getCurrentLedgerState = \reg' -> do
872+
st <- LedgerDB.getVolatileTip ledgerDB
873+
pure $
874+
MempoolLedgerDBView
875+
(ledgerState st)
876+
( fmap (LedgerDB.ledgerStateReadOnlyForker . LedgerDB.readOnlyForker)
877+
<$> LedgerDB.getForkerAtTarget ledgerDB reg' (SpecificPoint (castPoint $ getTip st))
878+
)
879+
}
880+
lCfg
881+
-- one mebibyte should generously accomodate two blocks' worth of txs
882+
( Mempool.MempoolCapacityBytesOverride $
883+
LedgerSupportsMempool.ByteSize32 $
884+
1024 * 1024
885+
)
886+
nullTracer
887+
888+
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
889+
pure Nothing
890890
where
891891
AnalysisEnv
892892
{ cfg
@@ -991,7 +991,7 @@ reproMempoolForge numBlks env = do
991991
LedgerDB.tryFlush ledgerDB
992992

993993
-- this flushes blk from the mempool, since every tx in it is now on the chain
994-
void $ Mempool.syncWithLedger mempool
994+
void $ Mempool.testSyncWithLedger mempool
995995

996996
{-------------------------------------------------------------------------------
997997
Auxiliary: processing all blocks in the DB

ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -628,16 +628,15 @@ runThreadNetwork
628628
HasCallStack =>
629629
OracularClock m ->
630630
SlotNo ->
631-
ResourceRegistry m ->
632631
(SlotNo -> STM m ()) ->
633632
STM m (Point blk) ->
634633
(ResourceRegistry m -> m (ReadOnlyForker' m blk)) ->
635634
Mempool m blk ->
636635
[GenTx blk] ->
637636
-- \^ valid transactions the node should immediately propagate
638-
m ()
639-
forkCrucialTxs clock s0 registry unblockForge getTipPoint mforker mempool txs0 = do
640-
void $ forkLinkedThread registry "crucialTxs" $ withRegistry $ \reg -> do
637+
m (Thread m Void)
638+
forkCrucialTxs clock s0 unblockForge getTipPoint mforker mempool txs0 = do
639+
testForkMempoolThread mempool "crucialTxs" $ withRegistry $ \reg -> do
641640
let loop (slot, mempFp) = do
642641
forker <- mforker reg
643642
extLedger <- atomically $ roforkerGetLedgerState forker
@@ -679,7 +678,7 @@ runThreadNetwork
679678
-- avoid the race in which we wake up before the mempool's
680679
-- background thread wakes up by mimicking it before we do
681680
-- anything else
682-
void $ syncWithLedger mempool
681+
void $ testSyncWithLedger mempool
683682

684683
loop fps'
685684
loop (s0, [])
@@ -1150,15 +1149,17 @@ runThreadNetwork
11501149
--
11511150
-- TODO Is there a risk that this will block because the 'forkTxProducer'
11521151
-- fills up the mempool too quickly?
1153-
forkCrucialTxs
1154-
clock
1155-
joinSlot
1156-
registry
1157-
unblockForge
1158-
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1159-
getForker
1160-
mempool
1161-
txs0
1152+
threadCrucialTxs <-
1153+
forkCrucialTxs
1154+
clock
1155+
joinSlot
1156+
unblockForge
1157+
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1158+
getForker
1159+
mempool
1160+
txs0
1161+
1162+
void $ allocate registry (\_ -> pure threadCrucialTxs) cancelThread
11621163

11631164
forkTxProducer
11641165
coreNodeId
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<!--
2+
A new scriv changelog fragment.
3+
4+
Uncomment the section that is right (remove the HTML comment wrapper).
5+
For top level release notes, leave all the headers commented out.
6+
-->
7+
8+
### Patch
9+
10+
- The mempool will now carry its own forker instead of acquiring one on each
11+
revalidation. This particularly implies that the mempool will no longer
12+
re-sync under the hood while trying to add a transaction, and only the
13+
background thread will perform such a re-sync.
14+
15+
- The mempool now has its own registry in which it allocates forkers. The
16+
background thread was moved to this inner registry such that it can access the
17+
mempool internal registry, but an action to cancel it will still live in the
18+
outer registry, to ensure the thread is closed before we attempt to close the
19+
mempool internal registry. Otherwise we would run into a race condition if the
20+
background thread would attempt a resync while the internal registry was being
21+
closed.
22+
23+
<!--
24+
### Non-Breaking
25+
26+
- A bullet item for the Non-Breaking category.
27+
28+
-->
29+
30+
### Breaking
31+
32+
- Removed `getLedgerTablesAtFor` from the ChainDB API. Clients now have to
33+
actually open a forker and manage it.

ouroboros-consensus/ouroboros-consensus.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ library unstable-mempool-test-utils
543543
contra-tracer,
544544
deepseq,
545545
ouroboros-consensus,
546+
resource-registry,
546547
strict-stm,
547548

548549
library unstable-tutorials

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/API.hs

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE RankNTypes #-}
23
{-# LANGUAGE ScopedTypeVariables #-}
34
{-# LANGUAGE StandaloneDeriving #-}
45
{-# LANGUAGE UndecidableInstances #-}
@@ -34,8 +35,9 @@ module Ouroboros.Consensus.Mempool.API
3435
, zeroTicketNo
3536
) where
3637

38+
import Control.ResourceRegistry
3739
import qualified Data.List.NonEmpty as NE
38-
import Ouroboros.Consensus.Block (ChainHash, SlotNo)
40+
import Ouroboros.Consensus.Block (ChainHash, Point, SlotNo)
3941
import Ouroboros.Consensus.Ledger.Abstract
4042
import Ouroboros.Consensus.Ledger.SupportsMempool
4143
import qualified Ouroboros.Consensus.Mempool.Capacity as Cap
@@ -109,11 +111,11 @@ data Mempool m blk = Mempool
109111
--
110112
-- Note that transactions that are invalid will /never/ be added to the
111113
-- mempool. However, it is possible that, at a given point in time,
112-
-- transactions which were valid in an older ledger state but are invalid
113-
-- in the current ledger state, could exist within the mempool until they
114-
-- are revalidated and dropped from the mempool via a call to
115-
-- 'syncWithLedger' or by the background thread that watches the ledger
116-
-- for changes.
114+
-- transactions which were valid in an older ledger state but are invalid in
115+
-- the current ledger state, could exist within the mempool until they are
116+
-- revalidated and dropped from the mempool via a call to by the background
117+
-- thread that watches the ledger for changes or by 'testSyncWithLedger' in
118+
-- testing scenarios.
117119
--
118120
-- This action returns one of two results.
119121
--
@@ -161,22 +163,6 @@ data Mempool m blk = Mempool
161163
-- persistence.
162164
, removeTxsEvenIfValid :: NE.NonEmpty (GenTxId blk) -> m ()
163165
-- ^ Manually remove the given transactions from the mempool.
164-
, syncWithLedger :: m (MempoolSnapshot blk)
165-
-- ^ Sync the transactions in the mempool with the current ledger state
166-
-- of the 'ChainDB'.
167-
--
168-
-- The transactions that exist within the mempool will be revalidated
169-
-- against the current ledger state. Transactions which are found to be
170-
-- invalid with respect to the current ledger state, will be dropped
171-
-- from the mempool, whereas valid transactions will remain.
172-
--
173-
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
174-
-- of persistence. Additionally, this makes it possible to trace the
175-
-- removal of invalid transactions.
176-
--
177-
-- n.b. in our current implementation, when one opens a mempool, we
178-
-- spawn a thread which performs this action whenever the 'ChainDB' tip
179-
-- point changes.
180166
, getSnapshot :: STM m (MempoolSnapshot blk)
181167
-- ^ Get a snapshot of the current mempool state. This allows for
182168
-- further pure queries on the snapshot.
@@ -212,6 +198,33 @@ data Mempool m blk = Mempool
212198
-- Instead, we treat it the same way as a Mempool which is /at/
213199
-- capacity, i.e., we won't admit new transactions until some have been
214200
-- removed because they have become invalid.
201+
, testSyncWithLedger :: m (MempoolSnapshot blk)
202+
-- ^ ONLY FOR TESTS
203+
--
204+
-- Sync the transactions in the mempool with the current ledger state
205+
-- of the 'ChainDB'.
206+
--
207+
-- The transactions that exist within the mempool will be revalidated
208+
-- against the current ledger state. Transactions which are found to be
209+
-- invalid with respect to the current ledger state, will be dropped
210+
-- from the mempool, whereas valid transactions will remain.
211+
--
212+
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
213+
-- of persistence. Additionally, this makes it possible to trace the
214+
-- removal of invalid transactions.
215+
--
216+
-- n.b. in our current implementation, when one opens a mempool, we
217+
-- spawn a thread which performs this action whenever the 'ChainDB' tip
218+
-- point changes.
219+
, testForkMempoolThread :: forall a. String -> m a -> m (Thread m a)
220+
-- ^ FOR TESTS ONLY
221+
--
222+
-- If we want to run a thread that can perform syncs in the mempool, it needs
223+
-- to be registered in the mempool's internal registry. This function exposes
224+
-- such functionality.
225+
--
226+
-- The 'String' passed will be used as the thread label, and the @m a@ will be
227+
-- the action forked in the thread.
215228
}
216229

217230
{-------------------------------------------------------------------------------
@@ -353,4 +366,5 @@ data MempoolSnapshot blk = MempoolSnapshot
353366
, snapshotStateHash :: ChainHash (TickedLedgerState blk)
354367
-- ^ The resulting state currently in the mempool after applying the
355368
-- transactions
369+
, snapshotPoint :: Point blk
356370
}

0 commit comments

Comments
 (0)