Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
)
import Ouroboros.Consensus.Ledger.Tables.Utils
import qualified Ouroboros.Consensus.Mempool as Mempool
import Ouroboros.Consensus.Mempool.Impl.Common
import Ouroboros.Consensus.Protocol.Abstract (LedgerView)
import Ouroboros.Consensus.Storage.Common (BlockComponent (..))
import Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDB)
Expand Down Expand Up @@ -862,31 +863,30 @@ reproMempoolForge numBlks env = do
<> "1 or 2 blocks at a time, not "
<> show numBlks

mempool <-
Mempool.openMempoolWithoutSyncThread
Mempool.LedgerInterface
{ Mempool.getCurrentLedgerState = ledgerState <$> LedgerDB.getVolatileTip ledgerDB
, Mempool.getLedgerTablesAtFor = \pt keys -> do
frk <- LedgerDB.getForkerAtTarget ledgerDB registry (SpecificPoint pt)
case frk of
Left _ -> pure Nothing
Right fr -> do
tbs <-
Just . castLedgerTables
<$> LedgerDB.forkerReadTables fr (castLedgerTables keys)
LedgerDB.forkerClose fr
pure tbs
}
lCfg
-- one mebibyte should generously accomodate two blocks' worth of txs
( Mempool.MempoolCapacityBytesOverride $
LedgerSupportsMempool.ByteSize32 $
1024 * 1024
)
nullTracer

void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
pure Nothing
withRegistry $ \reg -> do
mempool <-
Mempool.openMempoolWithoutSyncThread
reg
Mempool.LedgerInterface
{ Mempool.getCurrentLedgerState = \reg' -> do
st <- LedgerDB.getVolatileTip ledgerDB
pure $
MempoolLedgerDBView
(ledgerState st)
( fmap (LedgerDB.ledgerStateReadOnlyForker . LedgerDB.readOnlyForker)
<$> LedgerDB.getForkerAtTarget ledgerDB reg' (SpecificPoint (castPoint $ getTip st))
)
}
lCfg
-- one mebibyte should generously accomodate two blocks' worth of txs
( Mempool.MempoolCapacityBytesOverride $
LedgerSupportsMempool.ByteSize32 $
1024 * 1024
)
nullTracer

void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool)
pure Nothing
where
AnalysisEnv
{ cfg
Expand Down Expand Up @@ -991,7 +991,7 @@ reproMempoolForge numBlks env = do
LedgerDB.tryFlush ledgerDB

-- this flushes blk from the mempool, since every tx in it is now on the chain
void $ Mempool.syncWithLedger mempool
void $ Mempool.testSyncWithLedger mempool

{-------------------------------------------------------------------------------
Auxiliary: processing all blocks in the DB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,16 +628,15 @@ runThreadNetwork
HasCallStack =>
OracularClock m ->
SlotNo ->
ResourceRegistry m ->
(SlotNo -> STM m ()) ->
STM m (Point blk) ->
(ResourceRegistry m -> m (ReadOnlyForker' m blk)) ->
Mempool m blk ->
[GenTx blk] ->
-- \^ valid transactions the node should immediately propagate
m ()
forkCrucialTxs clock s0 registry unblockForge getTipPoint mforker mempool txs0 = do
void $ forkLinkedThread registry "crucialTxs" $ withRegistry $ \reg -> do
m (Thread m Void)
forkCrucialTxs clock s0 unblockForge getTipPoint mforker mempool txs0 = do
testForkMempoolThread mempool "crucialTxs" $ withRegistry $ \reg -> do
let loop (slot, mempFp) = do
forker <- mforker reg
extLedger <- atomically $ roforkerGetLedgerState forker
Expand Down Expand Up @@ -679,7 +678,7 @@ runThreadNetwork
-- avoid the race in which we wake up before the mempool's
-- background thread wakes up by mimicking it before we do
-- anything else
void $ syncWithLedger mempool
void $ testSyncWithLedger mempool

loop fps'
loop (s0, [])
Expand Down Expand Up @@ -1150,15 +1149,17 @@ runThreadNetwork
--
-- TODO Is there a risk that this will block because the 'forkTxProducer'
-- fills up the mempool too quickly?
forkCrucialTxs
clock
joinSlot
registry
unblockForge
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
getForker
mempool
txs0
threadCrucialTxs <-
forkCrucialTxs
clock
joinSlot
unblockForge
(ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB)
getForker
mempool
txs0

void $ allocate registry (\_ -> pure threadCrucialTxs) cancelThread

forkTxProducer
coreNodeId
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<!--
A new scriv changelog fragment.

Uncomment the section that is right (remove the HTML comment wrapper).
For top level release notes, leave all the headers commented out.
-->

### Patch

- The mempool will now carry its own forker instead of acquiring one on each
revalidation. This particularly implies that the mempool will no longer
re-sync under the hood while trying to add a transaction, and only the
background thread will perform such a re-sync.

- The mempool now has its own registry in which it allocates forkers. The
background thread was moved to this inner registry such that it can access the
mempool internal registry, but an action to cancel it will still live in the
outer registry, to ensure the thread is closed before we attempt to close the
mempool internal registry. Otherwise we would run into a race condition if the
background thread would attempt a resync while the internal registry was being
closed.

<!--
### Non-Breaking

- A bullet item for the Non-Breaking category.

-->

### Breaking

- Removed `getLedgerTablesAtFor` from the ChainDB API. Clients now have to
actually open a forker and manage it.
1 change: 1 addition & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ library unstable-mempool-test-utils
contra-tracer,
deepseq,
ouroboros-consensus,
resource-registry,
strict-stm,

library unstable-tutorials
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}
Expand Down Expand Up @@ -34,8 +35,9 @@ module Ouroboros.Consensus.Mempool.API
, zeroTicketNo
) where

import Control.ResourceRegistry
import qualified Data.List.NonEmpty as NE
import Ouroboros.Consensus.Block (ChainHash, SlotNo)
import Ouroboros.Consensus.Block (ChainHash, Point, SlotNo)
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.SupportsMempool
import qualified Ouroboros.Consensus.Mempool.Capacity as Cap
Expand Down Expand Up @@ -109,11 +111,11 @@ data Mempool m blk = Mempool
--
-- Note that transactions that are invalid will /never/ be added to the
-- mempool. However, it is possible that, at a given point in time,
-- transactions which were valid in an older ledger state but are invalid
-- in the current ledger state, could exist within the mempool until they
-- are revalidated and dropped from the mempool via a call to
-- 'syncWithLedger' or by the background thread that watches the ledger
-- for changes.
-- transactions which were valid in an older ledger state but are invalid in
-- the current ledger state, could exist within the mempool until they are
-- revalidated and dropped from the mempool via a call to by the background
-- thread that watches the ledger for changes or by 'testSyncWithLedger' in
-- testing scenarios.
--
-- This action returns one of two results.
--
Expand Down Expand Up @@ -161,22 +163,6 @@ data Mempool m blk = Mempool
-- persistence.
, removeTxsEvenIfValid :: NE.NonEmpty (GenTxId blk) -> m ()
-- ^ Manually remove the given transactions from the mempool.
, syncWithLedger :: m (MempoolSnapshot blk)
-- ^ Sync the transactions in the mempool with the current ledger state
-- of the 'ChainDB'.
--
-- The transactions that exist within the mempool will be revalidated
-- against the current ledger state. Transactions which are found to be
-- invalid with respect to the current ledger state, will be dropped
-- from the mempool, whereas valid transactions will remain.
--
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
-- of persistence. Additionally, this makes it possible to trace the
-- removal of invalid transactions.
--
-- n.b. in our current implementation, when one opens a mempool, we
-- spawn a thread which performs this action whenever the 'ChainDB' tip
-- point changes.
, getSnapshot :: STM m (MempoolSnapshot blk)
-- ^ Get a snapshot of the current mempool state. This allows for
-- further pure queries on the snapshot.
Expand Down Expand Up @@ -212,6 +198,33 @@ data Mempool m blk = Mempool
-- Instead, we treat it the same way as a Mempool which is /at/
-- capacity, i.e., we won't admit new transactions until some have been
-- removed because they have become invalid.
, testSyncWithLedger :: m (MempoolSnapshot blk)
-- ^ ONLY FOR TESTS
--
-- Sync the transactions in the mempool with the current ledger state
-- of the 'ChainDB'.
--
-- The transactions that exist within the mempool will be revalidated
-- against the current ledger state. Transactions which are found to be
-- invalid with respect to the current ledger state, will be dropped
-- from the mempool, whereas valid transactions will remain.
--
-- We keep this in @m@ instead of @STM m@ to leave open the possibility
-- of persistence. Additionally, this makes it possible to trace the
-- removal of invalid transactions.
--
-- n.b. in our current implementation, when one opens a mempool, we
-- spawn a thread which performs this action whenever the 'ChainDB' tip
-- point changes.
, testForkMempoolThread :: forall a. String -> m a -> m (Thread m a)
-- ^ FOR TESTS ONLY
--
-- If we want to run a thread that can perform syncs in the mempool, it needs
-- to be registered in the mempool's internal registry. This function exposes
-- such functionality.
--
-- The 'String' passed will be used as the thread label, and the @m a@ will be
-- the action forked in the thread.
}

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -353,4 +366,5 @@ data MempoolSnapshot blk = MempoolSnapshot
, snapshotStateHash :: ChainHash (TickedLedgerState blk)
-- ^ The resulting state currently in the mempool after applying the
-- transactions
, snapshotPoint :: Point blk
}
Loading
Loading