Skip to content

Commit 7fd3638

Browse files
committed
Carry a forker in the mempool
1 parent e2d80d1 commit 7fd3638

File tree

17 files changed

+485
-351
lines changed

17 files changed

+485
-351
lines changed

ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Analysis.hs

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -862,31 +862,29 @@ reproMempoolForge numBlks env = do
862862
<> "1 or 2 blocks at a time, not "
863863
<> show numBlks
864864

865-
mempool <-
866-
Mempool.openMempoolWithoutSyncThread
867-
Mempool.LedgerInterface
868-
{ Mempool.getCurrentLedgerState = ledgerState <$> LedgerDB.getVolatileTip ledgerDB
869-
, Mempool.getLedgerTablesAtFor = \pt keys -> do
870-
frk <- LedgerDB.getForkerAtTarget ledgerDB registry (SpecificPoint pt)
871-
case frk of
872-
Left _ -> pure Nothing
873-
Right fr -> do
874-
tbs <-
875-
Just . castLedgerTables
876-
<$> LedgerDB.forkerReadTables fr (castLedgerTables keys)
877-
LedgerDB.forkerClose fr
878-
pure tbs
879-
}
880-
lCfg
881-
-- one mebibyte should generously accomodate two blocks' worth of txs
882-
( Mempool.MempoolCapacityBytesOverride $
883-
LedgerSupportsMempool.ByteSize32 $
884-
1024 * 1024
885-
)
886-
nullTracer
887-
888-
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
889-
pure Nothing
865+
withRegistry $ \reg -> do
866+
mempool <-
867+
Mempool.openMempoolWithoutSyncThread
868+
reg
869+
Mempool.LedgerInterface
870+
{ Mempool.getCurrentLedgerState = \reg' -> do
871+
st <- LedgerDB.getVolatileTip ledgerDB
872+
pure
873+
( ledgerState st
874+
, fmap (LedgerDB.ledgerStateReadOnlyForker . LedgerDB.readOnlyForker)
875+
<$> LedgerDB.getForkerAtTarget ledgerDB reg' (SpecificPoint (castPoint $ getTip st))
876+
)
877+
}
878+
lCfg
879+
-- one mebibyte should generously accomodate two blocks' worth of txs
880+
( Mempool.MempoolCapacityBytesOverride $
881+
LedgerSupportsMempool.ByteSize32 $
882+
1024 * 1024
883+
)
884+
nullTracer
885+
886+
void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
887+
pure Nothing
890888
where
891889
AnalysisEnv
892890
{ cfg
@@ -991,7 +989,7 @@ reproMempoolForge numBlks env = do
991989
LedgerDB.tryFlush ledgerDB
992990

993991
-- this flushes blk from the mempool, since every tx in it is now on the chain
994-
void $ Mempool.syncWithLedger mempool
992+
void $ Mempool.testSyncWithLedger mempool
995993

996994
{-------------------------------------------------------------------------------
997995
Auxiliary: processing all blocks in the DB

ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -628,16 +628,15 @@ runThreadNetwork
628628
HasCallStack =>
629629
OracularClock m ->
630630
SlotNo ->
631-
ResourceRegistry m ->
632631
(SlotNo -> STM m ()) ->
633632
STM m (Point blk) ->
634633
(ResourceRegistry m -> m (ReadOnlyForker' m blk)) ->
635634
Mempool m blk ->
636635
[GenTx blk] ->
637636
-- \^ valid transactions the node should immediately propagate
638-
m ()
639-
forkCrucialTxs clock s0 registry unblockForge getTipPoint mforker mempool txs0 = do
640-
void $ forkLinkedThread registry "crucialTxs" $ withRegistry $ \reg -> do
637+
m (Thread m Void)
638+
forkCrucialTxs clock s0 unblockForge getTipPoint mforker mempool txs0 = do
639+
testForkMempoolThread mempool "crucialTxs" $ withRegistry $ \reg -> do
641640
let loop (slot, mempFp) = do
642641
forker <- mforker reg
643642
extLedger <- atomically $ roforkerGetLedgerState forker
@@ -679,7 +678,7 @@ runThreadNetwork
679678
-- avoid the race in which we wake up before the mempool's
680679
-- background thread wakes up by mimicking it before we do
681680
-- anything else
682-
void $ syncWithLedger mempool
681+
void $ testSyncWithLedger mempool
683682

684683
loop fps'
685684
loop (s0, [])
@@ -1150,15 +1149,17 @@ runThreadNetwork
11501149
--
11511150
-- TODO Is there a risk that this will block because the 'forkTxProducer'
11521151
-- fills up the mempool too quickly?
1153-
forkCrucialTxs
1154-
clock
1155-
joinSlot
1156-
registry
1157-
unblockForge
1158-
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1159-
getForker
1160-
mempool
1161-
txs0
1152+
threadCrucialTxs <-
1153+
forkCrucialTxs
1154+
clock
1155+
joinSlot
1156+
unblockForge
1157+
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
1158+
getForker
1159+
mempool
1160+
txs0
1161+
1162+
void $ allocate registry (\_ -> pure threadCrucialTxs) cancelThread
11621163

11631164
forkTxProducer
11641165
coreNodeId
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<!--
2+
A new scriv changelog fragment.
3+
4+
Uncomment the section that is right (remove the HTML comment wrapper).
5+
For top level release notes, leave all the headers commented out.
6+
-->
7+
8+
### Patch
9+
10+
- The mempool will now carry its own forker instead of acquiring one on each
11+
revalidation. This particularly implies that the mempool will no longer
12+
re-sync under the hood while trying to add a transaction, and only the
13+
background thread will perform such a re-sync.
14+
15+
- The ledger state that triggers a re-sync in the background thread will be the
16+
one used to revalidate the mempool. Previously, another read for the new tip
17+
would be performed when starting the sync, which could lead to confusing
18+
(although innocuous) situations in which:
19+
20+
- the tip changes from A to B
21+
- the background thread sees the tip changed from A, so it records the new tip (B) and triggers the re-sync
22+
- the tip changes again from B to C before the syncing process reads the tip again
23+
- the mempool is re-synced with C
24+
- the background thread would now see that the tip changed from B, so it records the new tip (C) and triggers the re-sync
25+
- the mempool is re-synced **AGAIN** with C
26+
27+
This sequence of actions can't happen again. Instead, we will sync exactly
28+
with the state that triggered the re-syncing.
29+
30+
- The mempool now has its own registry in which it allocates forkers. The
31+
background thread was moved to this inner registry such that it can access the
32+
mempool internal registry, but an action to cancel it will still live in the
33+
outer registry, to ensure the thread is closed before we attempt to close the
34+
mempool internal registry. Otherwise we would run into a race condition if the
35+
background thread would attempt a resync while the internal registry was being
36+
closed.
37+
38+
<!--
39+
### Non-Breaking
40+
41+
- A bullet item for the Non-Breaking category.
42+
43+
-->
44+
45+
### Breaking
46+
47+
- Removed `getLedgerTablesAtFor` from the ChainDB API. Clients now have to
48+
actually open a forker and manage it.

ouroboros-consensus/ouroboros-consensus.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ library unstable-mempool-test-utils
543543
contra-tracer,
544544
deepseq,
545545
ouroboros-consensus,
546+
resource-registry,
546547
strict-stm,
547548

548549
library unstable-tutorials

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/API.hs

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE RankNTypes #-}
23
{-# LANGUAGE ScopedTypeVariables #-}
34
{-# LANGUAGE StandaloneDeriving #-}
45
{-# LANGUAGE UndecidableInstances #-}
@@ -34,6 +35,7 @@ module Ouroboros.Consensus.Mempool.API
3435
, zeroTicketNo
3536
) where
3637

38+
import Control.ResourceRegistry
3739
import qualified Data.List.NonEmpty as NE
3840
import Ouroboros.Consensus.Block (ChainHash, SlotNo)
3941
import Ouroboros.Consensus.Ledger.Abstract
@@ -112,7 +114,7 @@ data Mempool m blk = Mempool
112114
-- transactions which were valid in an older ledger state but are invalid
113115
-- in the current ledger state, could exist within the mempool until they
114116
-- are revalidated and dropped from the mempool via a call to
115-
-- 'syncWithLedger' or by the background thread that watches the ledger
117+
-- 'testSyncWithLedger' or by the background thread that watches the ledger
116118
-- for changes.
117119
--
118120
-- This action returns one of two results.
@@ -161,22 +163,6 @@ data Mempool m blk = Mempool
161163
-- persistence.
162164
, removeTxsEvenIfValid :: NE.NonEmpty (GenTxId blk) -> m ()
163165
-- ^ Manually remove the given transactions from the mempool.
164-
, syncWithLedger :: m (MempoolSnapshot blk)
165-
-- ^ Sync the transactions in the mempool with the current ledger state
166-
-- of the 'ChainDB'.
167-
--
168-
-- The transactions that exist within the mempool will be revalidated
169-
-- against the current ledger state. Transactions which are found to be
170-
-- invalid with respect to the current ledger state, will be dropped
171-
-- from the mempool, whereas valid transactions will remain.
172-
--
173-
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
174-
-- of persistence. Additionally, this makes it possible to trace the
175-
-- removal of invalid transactions.
176-
--
177-
-- n.b. in our current implementation, when one opens a mempool, we
178-
-- spawn a thread which performs this action whenever the 'ChainDB' tip
179-
-- point changes.
180166
, getSnapshot :: STM m (MempoolSnapshot blk)
181167
-- ^ Get a snapshot of the current mempool state. This allows for
182168
-- further pure queries on the snapshot.
@@ -212,6 +198,33 @@ data Mempool m blk = Mempool
212198
-- Instead, we treat it the same way as a Mempool which is /at/
213199
-- capacity, i.e., we won't admit new transactions until some have been
214200
-- removed because they have become invalid.
201+
, testSyncWithLedger :: m (MempoolSnapshot blk)
202+
-- ^ ONLY FOR TESTS
203+
--
204+
-- Sync the transactions in the mempool with the current ledger state
205+
-- of the 'ChainDB'.
206+
--
207+
-- The transactions that exist within the mempool will be revalidated
208+
-- against the current ledger state. Transactions which are found to be
209+
-- invalid with respect to the current ledger state, will be dropped
210+
-- from the mempool, whereas valid transactions will remain.
211+
--
212+
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
213+
-- of persistence. Additionally, this makes it possible to trace the
214+
-- removal of invalid transactions.
215+
--
216+
-- n.b. in our current implementation, when one opens a mempool, we
217+
-- spawn a thread which performs this action whenever the 'ChainDB' tip
218+
-- point changes.
219+
, testForkMempoolThread :: forall a. String -> m a -> m (Thread m a)
220+
-- ^ FOR TESTS ONLY
221+
--
222+
-- If we want to run a thread that can perform syncs in the mempool, it needs
223+
-- to be registered in the mempool's internal registry. This function exposes
224+
-- such functionality.
225+
--
226+
-- The 'String' passed will be used as the thread label, and the @m a@ will be
227+
-- the action forked in the thread.
215228
}
216229

217230
{-------------------------------------------------------------------------------

0 commit comments

Comments
 (0)