From 5dd590721c6255c113e2e2f1964f2608fb34074b Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Tue, 17 Jun 2025 16:40:43 +0200 Subject: [PATCH 1/2] Carry a forker in the mempool --- .../Cardano/Tools/DBAnalyser/Analysis.hs | 52 ++--- .../Test/ThreadNet/Network.hs | 29 +-- ...250620_132627_jasataco_mempool_carry_vh.md | 33 ++++ ouroboros-consensus/ouroboros-consensus.cabal | 1 + .../Ouroboros/Consensus/Mempool/API.hs | 58 +++--- .../Consensus/Mempool/Impl/Common.hs | 125 +++++++----- .../Ouroboros/Consensus/Mempool/Init.hs | 36 ++-- .../Ouroboros/Consensus/Mempool/Update.hs | 183 +++++++++--------- .../Consensus/Storage/ChainDB/API.hs | 8 - .../Consensus/Storage/ChainDB/Impl.hs | 1 - .../Consensus/Storage/ChainDB/Impl/Query.hs | 13 +- .../Consensus/Storage/LedgerDB/API.hs | 14 -- .../Consensus/Storage/LedgerDB/Forker.hs | 35 +++- .../Test/Consensus/Mempool/Mocked.hs | 34 +++- .../consensus-test/Test/Consensus/Mempool.hs | 104 +++++----- .../Test/Consensus/Mempool/Fairness.hs | 79 ++++---- .../Test/Consensus/Mempool/StateMachine.hs | 58 +++--- 17 files changed, 496 insertions(+), 367 deletions(-) create mode 100644 ouroboros-consensus/changelog.d/20250620_132627_jasataco_mempool_carry_vh.md diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Analysis.hs b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Analysis.hs index 3ab4e39886..36cc409f89 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Analysis.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/DBAnalyser/Analysis.hs @@ -77,6 +77,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol ) import Ouroboros.Consensus.Ledger.Tables.Utils import qualified Ouroboros.Consensus.Mempool as Mempool +import Ouroboros.Consensus.Mempool.Impl.Common import Ouroboros.Consensus.Protocol.Abstract (LedgerView) import Ouroboros.Consensus.Storage.Common (BlockComponent (..)) import Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDB) @@ -862,31 +863,30 @@ reproMempoolForge numBlks env = do <> "1 or 2 blocks at a time, not " <> show numBlks - mempool <- - Mempool.openMempoolWithoutSyncThread - Mempool.LedgerInterface - { Mempool.getCurrentLedgerState = ledgerState <$> LedgerDB.getVolatileTip ledgerDB - , Mempool.getLedgerTablesAtFor = \pt keys -> do - frk <- LedgerDB.getForkerAtTarget ledgerDB registry (SpecificPoint pt) - case frk of - Left _ -> pure Nothing - Right fr -> do - tbs <- - Just . castLedgerTables - <$> LedgerDB.forkerReadTables fr (castLedgerTables keys) - LedgerDB.forkerClose fr - pure tbs - } - lCfg - -- one mebibyte should generously accomodate two blocks' worth of txs - ( Mempool.MempoolCapacityBytesOverride $ - LedgerSupportsMempool.ByteSize32 $ - 1024 * 1024 - ) - nullTracer - - void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool) - pure Nothing + withRegistry $ \reg -> do + mempool <- + Mempool.openMempoolWithoutSyncThread + reg + Mempool.LedgerInterface + { Mempool.getCurrentLedgerState = \reg' -> do + st <- LedgerDB.getVolatileTip ledgerDB + pure $ + MempoolLedgerDBView + (ledgerState st) + ( fmap (LedgerDB.ledgerStateReadOnlyForker . LedgerDB.readOnlyForker) + <$> LedgerDB.getForkerAtTarget ledgerDB reg' (SpecificPoint (castPoint $ getTip st)) + ) + } + lCfg + -- one mebibyte should generously accomodate two blocks' worth of txs + ( Mempool.MempoolCapacityBytesOverride $ + LedgerSupportsMempool.ByteSize32 $ + 1024 * 1024 + ) + nullTracer + + void $ processAll db registry GetBlock startFrom limit Nothing (process howManyBlocks mempool) + pure Nothing where AnalysisEnv { cfg @@ -991,7 +991,7 @@ reproMempoolForge numBlks env = do LedgerDB.tryFlush ledgerDB -- this flushes blk from the mempool, since every tx in it is now on the chain - void $ Mempool.syncWithLedger mempool + void $ Mempool.testSyncWithLedger mempool {------------------------------------------------------------------------------- Auxiliary: processing all blocks in the DB diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index 5b252b2c29..88cb87e9ff 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -628,16 +628,15 @@ runThreadNetwork HasCallStack => OracularClock m -> SlotNo -> - ResourceRegistry m -> (SlotNo -> STM m ()) -> STM m (Point blk) -> (ResourceRegistry m -> m (ReadOnlyForker' m blk)) -> Mempool m blk -> [GenTx blk] -> -- \^ valid transactions the node should immediately propagate - m () - forkCrucialTxs clock s0 registry unblockForge getTipPoint mforker mempool txs0 = do - void $ forkLinkedThread registry "crucialTxs" $ withRegistry $ \reg -> do + m (Thread m Void) + forkCrucialTxs clock s0 unblockForge getTipPoint mforker mempool txs0 = do + testForkMempoolThread mempool "crucialTxs" $ withRegistry $ \reg -> do let loop (slot, mempFp) = do forker <- mforker reg extLedger <- atomically $ roforkerGetLedgerState forker @@ -679,7 +678,7 @@ runThreadNetwork -- avoid the race in which we wake up before the mempool's -- background thread wakes up by mimicking it before we do -- anything else - void $ syncWithLedger mempool + void $ testSyncWithLedger mempool loop fps' loop (s0, []) @@ -1150,15 +1149,17 @@ runThreadNetwork -- -- TODO Is there a risk that this will block because the 'forkTxProducer' -- fills up the mempool too quickly? - forkCrucialTxs - clock - joinSlot - registry - unblockForge - (ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB) - getForker - mempool - txs0 + threadCrucialTxs <- + forkCrucialTxs + clock + joinSlot + unblockForge + (ledgerTipPoint . ledgerState <$> ChainDB.getCurrentLedger chainDB) + getForker + mempool + txs0 + + void $ allocate registry (\_ -> pure threadCrucialTxs) cancelThread forkTxProducer coreNodeId diff --git a/ouroboros-consensus/changelog.d/20250620_132627_jasataco_mempool_carry_vh.md b/ouroboros-consensus/changelog.d/20250620_132627_jasataco_mempool_carry_vh.md new file mode 100644 index 0000000000..2d6d63d60b --- /dev/null +++ b/ouroboros-consensus/changelog.d/20250620_132627_jasataco_mempool_carry_vh.md @@ -0,0 +1,33 @@ + + +### Patch + +- The mempool will now carry its own forker instead of acquiring one on each + revalidation. This particularly implies that the mempool will no longer + re-sync under the hood while trying to add a transaction, and only the + background thread will perform such a re-sync. + +- The mempool now has its own registry in which it allocates forkers. The + background thread was moved to this inner registry such that it can access the + mempool internal registry, but an action to cancel it will still live in the + outer registry, to ensure the thread is closed before we attempt to close the + mempool internal registry. Otherwise we would run into a race condition if the + background thread would attempt a resync while the internal registry was being + closed. + + + +### Breaking + +- Removed `getLedgerTablesAtFor` from the ChainDB API. Clients now have to + actually open a forker and manage it. diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 0cada6966c..e7486dd37c 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -543,6 +543,7 @@ library unstable-mempool-test-utils contra-tracer, deepseq, ouroboros-consensus, + resource-registry, strict-stm, library unstable-tutorials diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/API.hs index 511109c2bd..d9db885e21 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/API.hs @@ -1,4 +1,5 @@ {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE UndecidableInstances #-} @@ -34,8 +35,9 @@ module Ouroboros.Consensus.Mempool.API , zeroTicketNo ) where +import Control.ResourceRegistry import qualified Data.List.NonEmpty as NE -import Ouroboros.Consensus.Block (ChainHash, SlotNo) +import Ouroboros.Consensus.Block (ChainHash, Point, SlotNo) import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.SupportsMempool import qualified Ouroboros.Consensus.Mempool.Capacity as Cap @@ -109,11 +111,11 @@ data Mempool m blk = Mempool -- -- Note that transactions that are invalid will /never/ be added to the -- mempool. However, it is possible that, at a given point in time, - -- transactions which were valid in an older ledger state but are invalid - -- in the current ledger state, could exist within the mempool until they - -- are revalidated and dropped from the mempool via a call to - -- 'syncWithLedger' or by the background thread that watches the ledger - -- for changes. + -- transactions which were valid in an older ledger state but are invalid in + -- the current ledger state, could exist within the mempool until they are + -- revalidated and dropped from the mempool via a call to by the background + -- thread that watches the ledger for changes or by 'testSyncWithLedger' in + -- testing scenarios. -- -- This action returns one of two results. -- @@ -161,22 +163,6 @@ data Mempool m blk = Mempool -- persistence. , removeTxsEvenIfValid :: NE.NonEmpty (GenTxId blk) -> m () -- ^ Manually remove the given transactions from the mempool. - , syncWithLedger :: m (MempoolSnapshot blk) - -- ^ Sync the transactions in the mempool with the current ledger state - -- of the 'ChainDB'. - -- - -- The transactions that exist within the mempool will be revalidated - -- against the current ledger state. Transactions which are found to be - -- invalid with respect to the current ledger state, will be dropped - -- from the mempool, whereas valid transactions will remain. - -- - -- We keep this in @m@ instead of @STM m@ to leave open the possibility - -- of persistence. Additionally, this makes it possible to trace the - -- removal of invalid transactions. - -- - -- n.b. in our current implementation, when one opens a mempool, we - -- spawn a thread which performs this action whenever the 'ChainDB' tip - -- point changes. , getSnapshot :: STM m (MempoolSnapshot blk) -- ^ Get a snapshot of the current mempool state. This allows for -- further pure queries on the snapshot. @@ -212,6 +198,33 @@ data Mempool m blk = Mempool -- Instead, we treat it the same way as a Mempool which is /at/ -- capacity, i.e., we won't admit new transactions until some have been -- removed because they have become invalid. + , testSyncWithLedger :: m (MempoolSnapshot blk) + -- ^ ONLY FOR TESTS + -- + -- Sync the transactions in the mempool with the current ledger state + -- of the 'ChainDB'. + -- + -- The transactions that exist within the mempool will be revalidated + -- against the current ledger state. Transactions which are found to be + -- invalid with respect to the current ledger state, will be dropped + -- from the mempool, whereas valid transactions will remain. + -- + -- We keep this in @m@ instead of @STM m@ to leave open the possibility + -- of persistence. Additionally, this makes it possible to trace the + -- removal of invalid transactions. + -- + -- n.b. in our current implementation, when one opens a mempool, we + -- spawn a thread which performs this action whenever the 'ChainDB' tip + -- point changes. + , testForkMempoolThread :: forall a. String -> m a -> m (Thread m a) + -- ^ FOR TESTS ONLY + -- + -- If we want to run a thread that can perform syncs in the mempool, it needs + -- to be registered in the mempool's internal registry. This function exposes + -- such functionality. + -- + -- The 'String' passed will be used as the thread label, and the @m a@ will be + -- the action forked in the thread. } {------------------------------------------------------------------------------- @@ -353,4 +366,5 @@ data MempoolSnapshot blk = MempoolSnapshot , snapshotStateHash :: ChainHash (TickedLedgerState blk) -- ^ The resulting state currently in the mempool after applying the -- transactions + , snapshotPoint :: Point blk } diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Impl/Common.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Impl/Common.hs index d29c321924..79432d1a86 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Impl/Common.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Impl/Common.hs @@ -3,6 +3,7 @@ {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeApplications #-} @@ -21,6 +22,7 @@ module Ouroboros.Consensus.Mempool.Impl.Common -- * Ledger interface , LedgerInterface (..) + , MempoolLedgerDBView (..) , chainDBLedgerInterface -- * Validation @@ -39,9 +41,9 @@ module Ouroboros.Consensus.Mempool.Impl.Common , tickLedgerState ) where -import Control.Concurrent.Class.MonadMVar (MVar, newMVar) import Control.Concurrent.Class.MonadSTM.Strict.TMVar (newTMVarIO) import Control.Monad.Trans.Except (runExcept) +import Control.ResourceRegistry import Control.Tracer import qualified Data.Foldable as Foldable import qualified Data.List.NonEmpty as NE @@ -61,8 +63,11 @@ import Ouroboros.Consensus.Mempool.TxSeq (TxSeq (..), TxTicket (..)) import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq import Ouroboros.Consensus.Storage.ChainDB (ChainDB) import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB +import Ouroboros.Consensus.Storage.LedgerDB.Forker import Ouroboros.Consensus.Util.Enclose (EnclosingTimed) import Ouroboros.Consensus.Util.IOLike hiding (newMVar) +import Ouroboros.Consensus.Util.NormalForm.StrictMVar +import Ouroboros.Network.Protocol.LocalStateQuery.Type {------------------------------------------------------------------------------- Internal State @@ -187,27 +192,41 @@ initInternalState capacityOverride lastTicketNo cfg slot st = -------------------------------------------------------------------------------} -- | Abstract interface needed to run a Mempool. -data LedgerInterface m blk = LedgerInterface - { getCurrentLedgerState :: STM m (LedgerState blk EmptyMK) - -- ^ Get the current tip of the LedgerDB. - , getLedgerTablesAtFor :: - Point blk -> - LedgerTables (LedgerState blk) KeysMK -> - m (Maybe (LedgerTables (LedgerState blk) ValuesMK)) - -- ^ Get values at the given point on the chain. Returns Nothing if the - -- anchor moved or if the state is not found on the ledger db. +newtype LedgerInterface m blk = LedgerInterface + { getCurrentLedgerState :: ResourceRegistry m -> STM m (MempoolLedgerDBView m blk) + -- ^ The resource registry should be the one of the Mempool + -- ('mpEnvRegistry'). It will be used to allocate the forker. } +data MempoolLedgerDBView m blk = MempoolLedgerDBView + { mldViewState :: LedgerState blk EmptyMK + -- ^ The ledger state currently at the tip of the LedgerDB + , mldViewGetForker :: m (Either GetForkerError (ReadOnlyForker m (LedgerState blk) blk)) + -- ^ An action to get a forker at 'mldViewState' or an error in the unlikely + -- case that such state is now gone from the LedgerDB. + } + +instance + (StandardHash blk, UpdateLedger blk) => + Eq (MempoolLedgerDBView m blk) + where + MempoolLedgerDBView a _ == MempoolLedgerDBView b _ = + ledgerTipPoint a == ledgerTipPoint b + -- | Create a 'LedgerInterface' from a 'ChainDB'. chainDBLedgerInterface :: - IOLike m => - ChainDB m blk -> LedgerInterface m blk + (IOLike m, IsLedger (LedgerState blk)) => + ChainDB m blk -> + LedgerInterface m blk chainDBLedgerInterface chainDB = LedgerInterface - { getCurrentLedgerState = - ledgerState <$> ChainDB.getCurrentLedger chainDB - , getLedgerTablesAtFor = \pt keys -> - fmap castLedgerTables <$> ChainDB.getLedgerTablesAtFor chainDB pt (castLedgerTables keys) + { getCurrentLedgerState = \reg -> do + st <- ChainDB.getCurrentLedger chainDB + pure + $ MempoolLedgerDBView + (ledgerState st) + $ fmap (fmap ledgerStateReadOnlyForker) + $ ChainDB.getReadOnlyForkerAtPoint chainDB reg (SpecificPoint (castPoint $ getTip st)) } {------------------------------------------------------------------------------- @@ -219,10 +238,12 @@ chainDBLedgerInterface chainDB = -- different operations. data MempoolEnv m blk = MempoolEnv { mpEnvLedger :: LedgerInterface m blk + , mpEnvForker :: StrictMVar m (ReadOnlyForker m (LedgerState blk) blk) , mpEnvLedgerCfg :: LedgerConfig blk + , mpEnvRegistry :: ResourceRegistry m , mpEnvStateVar :: StrictTMVar m (InternalState blk) - , mpEnvAddTxsRemoteFifo :: MVar m () - , mpEnvAddTxsAllFifo :: MVar m () + , mpEnvAddTxsRemoteFifo :: StrictMVar m () + , mpEnvAddTxsAllFifo :: StrictMVar m () , mpEnvTracer :: Tracer m (TraceEventMempool blk) , mpEnvCapacityOverride :: MempoolCapacityBytesOverride } @@ -236,25 +257,39 @@ initMempoolEnv :: LedgerConfig blk -> MempoolCapacityBytesOverride -> Tracer m (TraceEventMempool blk) -> + ResourceRegistry m -> m (MempoolEnv m blk) -initMempoolEnv ledgerInterface cfg capacityOverride tracer = do - st <- atomically $ getCurrentLedgerState ledgerInterface - let (slot, st') = tickLedgerState cfg (ForgeInUnknownSlot st) - isVar <- - newTMVarIO $ - initInternalState capacityOverride TxSeq.zeroTicketNo cfg slot st' - addTxRemoteFifo <- newMVar () - addTxAllFifo <- newMVar () - return - MempoolEnv - { mpEnvLedger = ledgerInterface - , mpEnvLedgerCfg = cfg - , mpEnvStateVar = isVar - , mpEnvAddTxsRemoteFifo = addTxRemoteFifo - , mpEnvAddTxsAllFifo = addTxAllFifo - , mpEnvTracer = tracer - , mpEnvCapacityOverride = capacityOverride - } +initMempoolEnv ledgerInterface cfg capacityOverride tracer topLevelRegistry = do + (_, mpEnvRegistry) <- allocate topLevelRegistry (\_ -> unsafeNewRegistry) closeRegistry + initMempoolEnv' mpEnvRegistry + where + initMempoolEnv' reg = do + MempoolLedgerDBView st meFrk <- atomically $ getCurrentLedgerState ledgerInterface reg + eFrk <- meFrk + case eFrk of + -- This should happen very rarely, if between getting the state and getting + -- the forker, the ledgerdb has changed. We just retry here. + Left{} -> initMempoolEnv' reg + Right frk -> do + frkMVar <- newMVar frk + let (slot, st') = tickLedgerState cfg (ForgeInUnknownSlot st) + isVar <- + newTMVarIO $ + initInternalState capacityOverride TxSeq.zeroTicketNo cfg slot st' + addTxRemoteFifo <- newMVar () + addTxAllFifo <- newMVar () + return + MempoolEnv + { mpEnvLedger = ledgerInterface + , mpEnvLedgerCfg = cfg + , mpEnvForker = frkMVar + , mpEnvRegistry = reg + , mpEnvStateVar = isVar + , mpEnvAddTxsRemoteFifo = addTxRemoteFifo + , mpEnvAddTxsAllFifo = addTxAllFifo + , mpEnvTracer = tracer + , mpEnvCapacityOverride = capacityOverride + } {------------------------------------------------------------------------------- Ticking the ledger state @@ -447,6 +482,7 @@ snapshotFromIS is = , snapshotSlotNo = isSlotNo is , snapshotStateHash = getTipHash $ isLedgerState is , snapshotTake = implSnapshotTake is + , snapshotPoint = castPoint $ getTip $ isLedgerState is } where implSnapshotGetTxs :: @@ -532,18 +568,13 @@ data TraceEventMempool blk | -- | A sync is not needed, as the point at the tip of the LedgerDB and the -- point at the mempool are the same. TraceMempoolSyncNotNeeded (Point blk) - | -- | We will try to add a transaction. Adding a transaction might need to - -- trigger a re-sync. + | -- | We will try to add a transaction. TraceMempoolAttemptingAdd (GenTx blk) - | -- | When adding a transaction, the ledger state in the mempool was found - -- in the LedgerDB, and therefore we can read values, even if it is not the - -- tip of the LedgerDB. An async re-sync will be performed eventually in - -- that case. - TraceMempoolLedgerFound (Point blk) - | -- | When adding a transaction, the ledger state in the mempool is gone - -- from the LedgerDB, so we cannot read values for the new - -- transaction. This forces an in-place re-sync. - TraceMempoolLedgerNotFound (Point blk) + | -- | When performing a re-sync we will read the LedgerDB tip twice. This + -- trace will be emitted if in between those two steps the LedgerDB moved to + -- an alternative fork. It is completely innocuous but we would like to + -- double check that it happens very rarely or almost never. + TraceMempoolTipMovedBetweenSTMBlocks deriving Generic deriving instance diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Init.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Init.hs index a0c79c4cb0..06708c4317 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Init.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Init.hs @@ -10,7 +10,6 @@ module Ouroboros.Consensus.Mempool.Init import Control.Monad (void) import Control.ResourceRegistry import Control.Tracer -import Ouroboros.Consensus.Block import Ouroboros.Consensus.HeaderValidation import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.SupportsMempool @@ -40,9 +39,9 @@ openMempool :: MempoolCapacityBytesOverride -> Tracer m (TraceEventMempool blk) -> m (Mempool m blk) -openMempool registry ledger cfg capacityOverride tracer = do - env <- initMempoolEnv ledger cfg capacityOverride tracer - forkSyncStateOnTipPointChange registry env +openMempool topLevelRegistry ledger cfg capacityOverride tracer = do + env <- initMempoolEnv ledger cfg capacityOverride tracer topLevelRegistry + forkSyncStateOnTipPointChange topLevelRegistry env return $ mkMempool env -- | Spawn a thread which syncs the 'Mempool' state whenever the 'LedgerState' @@ -57,10 +56,10 @@ forkSyncStateOnTipPointChange :: ResourceRegistry m -> MempoolEnv m blk -> m () -forkSyncStateOnTipPointChange registry menv = - void $ +forkSyncStateOnTipPointChange topLevelRegistry menv = do + w <- forkLinkedWatcher - registry + (mpEnvRegistry menv) "Mempool.syncStateOnTipPointChange" Watcher { wFingerprint = id @@ -68,16 +67,21 @@ forkSyncStateOnTipPointChange registry menv = , wNotify = action , wReader = getCurrentTip } + + -- With this allocation on the top level registry, we make sure that we first + -- stop the watcher thread before closing the mempool registry, as otherwise + -- we would run into a race condition (the thread might try to re-sync and + -- allocate a forker on the mempool registry which would be closing down). + void $ allocate topLevelRegistry (\_ -> pure w) cancelThread where - action :: Point blk -> m () - action _tipPoint = + action :: MempoolLedgerDBView m blk -> m () + action _a = void $ implSyncWithLedger menv -- Using the tip ('Point') allows for quicker equality checks - getCurrentTip :: STM m (Point blk) + getCurrentTip :: STM m (MempoolLedgerDBView m blk) getCurrentTip = - ledgerTipPoint - <$> getCurrentLedgerState (mpEnvLedger menv) + getCurrentLedgerState (mpEnvLedger menv) (mpEnvRegistry menv) -- | Unlike 'openMempool', this function does not fork a background thread -- that synchronises with the ledger state whenever the later changes. @@ -89,13 +93,14 @@ openMempoolWithoutSyncThread :: , HasTxId (GenTx blk) , ValidateEnvelope blk ) => + ResourceRegistry m -> LedgerInterface m blk -> LedgerConfig blk -> MempoolCapacityBytesOverride -> Tracer m (TraceEventMempool blk) -> m (Mempool m blk) -openMempoolWithoutSyncThread ledger cfg capacityOverride tracer = - mkMempool <$> initMempoolEnv ledger cfg capacityOverride tracer +openMempoolWithoutSyncThread registry ledger cfg capacityOverride tracer = + mkMempool <$> initMempoolEnv ledger cfg capacityOverride tracer registry mkMempool :: ( IOLike m @@ -108,10 +113,11 @@ mkMempool mpEnv = Mempool { addTx = implAddTx mpEnv , removeTxsEvenIfValid = implRemoveTxsEvenIfValid mpEnv - , syncWithLedger = implSyncWithLedger mpEnv , getSnapshot = snapshotFromIS <$> readTMVar istate , getSnapshotFor = implGetSnapshotFor mpEnv , getCapacity = isCapacity <$> readTMVar istate + , testSyncWithLedger = implSyncWithLedger mpEnv + , testForkMempoolThread = forkLinkedThread (mpEnvRegistry mpEnv) } where MempoolEnv diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs index fc8b070639..3953750861 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs @@ -10,8 +10,6 @@ module Ouroboros.Consensus.Mempool.Update ) where import Cardano.Slotting.Slot -import Control.Concurrent.Class.MonadMVar (withMVar) -import Control.Monad (void) import Control.Monad.Except (runExcept) import Control.Tracer import qualified Data.Foldable as Foldable @@ -20,18 +18,20 @@ import qualified Data.List.NonEmpty as NE import Data.Maybe (fromMaybe) import qualified Data.Measure as Measure import qualified Data.Set as Set -import Data.Void import Ouroboros.Consensus.HeaderValidation import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.SupportsMempool +import Ouroboros.Consensus.Ledger.Tables.Utils (emptyLedgerTables) import Ouroboros.Consensus.Mempool.API import Ouroboros.Consensus.Mempool.Capacity import Ouroboros.Consensus.Mempool.Impl.Common import Ouroboros.Consensus.Mempool.TxSeq (TxTicket (..)) import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq +import Ouroboros.Consensus.Storage.LedgerDB.Forker hiding (trace) import Ouroboros.Consensus.Util (whenJust) import Ouroboros.Consensus.Util.Enclose import Ouroboros.Consensus.Util.IOLike hiding (withMVar) +import Ouroboros.Consensus.Util.NormalForm.StrictMVar import Ouroboros.Consensus.Util.STM import Ouroboros.Network.Block @@ -154,7 +154,7 @@ doAddTx mpEnv wti tx = doAddTx' Nothing where MempoolEnv - { mpEnvLedger = ldgrInterface + { mpEnvForker = forker , mpEnvLedgerCfg = cfg , mpEnvStateVar = istate , mpEnvTracer = trcr @@ -172,31 +172,16 @@ doAddTx mpEnv wti tx = res <- withTMVarAnd istate additionalCheck $ \is () -> do - mTbs <- getLedgerTablesAtFor ldgrInterface (isTip is) (getTransactionKeySets tx) - case mTbs of - Just tbs -> do - traceWith trcr $ TraceMempoolLedgerFound (isTip is) - case pureTryAddTx cfg wti tx is tbs of - NotEnoughSpaceLeft -> do - pure (Retry (isMempoolSize is), is) - Processed outcome@(TransactionProcessingResult is' _ _) -> do - pure (OK outcome, fromMaybe is is') - Nothing -> do - traceWith trcr $ TraceMempoolLedgerNotFound (isTip is) - -- We couldn't retrieve the values because the state is no longer on - -- the db. We need to resync. - pure (Resync, is) - case res of - Retry s' -> doAddTx' (Just s') - OK outcome -> pure outcome - Resync -> do - void $ implSyncWithLedger mpEnv - doAddTx' mbPrevSize - -data WithTMVarOutcome retry ok - = Retry !retry - | OK ok - | Resync + frkr <- readMVar forker + tbs <- + castLedgerTables + <$> roforkerReadTables frkr (castLedgerTables $ getTransactionKeySets tx) + case pureTryAddTx cfg wti tx is tbs of + NotEnoughSpaceLeft -> do + pure (Left (isMempoolSize is), is) + Processed outcome@(TransactionProcessingResult is' _ _) -> do + pure (Right outcome, fromMaybe is is') + either (doAddTx' . Just) pure res pureTryAddTx :: ( LedgerSupportsMempool blk @@ -212,7 +197,11 @@ pureTryAddTx :: LedgerTables (LedgerState blk) ValuesMK -> TriedToAddTx blk pureTryAddTx cfg wti tx is values = - let st = applyMempoolDiffs values (getTransactionKeySets tx) (isLedgerState is) + let st = + applyMempoolDiffs + values + (getTransactionKeySets tx) + (isLedgerState is) in case runExcept $ txMeasure cfg st tx of Left err -> -- The transaction does not have a valid measure (eg its ExUnits is @@ -324,9 +313,9 @@ implRemoveTxsEvenIfValid :: MempoolEnv m blk -> NE.NonEmpty (GenTxId blk) -> m () -implRemoveTxsEvenIfValid mpEnv toRemove = do - (out :: WithTMVarOutcome Void ()) <- withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface) $ - \is ls -> do +implRemoveTxsEvenIfValid mpEnv toRemove = + withTMVar istate $ + \is -> do let toKeep = filter ( (`notElem` Set.fromList (NE.toList toRemove)) @@ -335,33 +324,25 @@ implRemoveTxsEvenIfValid mpEnv toRemove = do . txTicketTx ) (TxSeq.toList $ isTxs is) - (slot, ticked) = tickLedgerState cfg (ForgeInUnknownSlot ls) toKeep' = Foldable.foldMap' (getTransactionKeySets . txForgetValidated . TxSeq.txTicketTx) toKeep - mTbs <- getLedgerTablesAtFor ldgrInterface (castPoint (getTip ls)) toKeep' - case mTbs of - Nothing -> pure (Resync, is) - Just tbs -> do - let (is', t) = - pureRemoveTxs - capacityOverride - cfg - slot - ticked - tbs - (isLastTicketNo is) - toKeep - toRemove - traceWith trcr t - pure (OK (), is') - case out of - Resync -> do - void $ implSyncWithLedger mpEnv - implRemoveTxsEvenIfValid mpEnv toRemove - OK () -> pure () + frkr <- readMVar forker + tbs <- castLedgerTables <$> roforkerReadTables frkr (castLedgerTables toKeep') + let (is', t) = + pureRemoveTxs + capacityOverride + cfg + (isSlotNo is) + (isLedgerState is `withLedgerTables` emptyLedgerTables) + tbs + (isLastTicketNo is) + toKeep + toRemove + traceWith trcr t + pure ((), is') where MempoolEnv { mpEnvStateVar = istate - , mpEnvLedger = ldgrInterface + , mpEnvForker = forker , mpEnvTracer = trcr , mpEnvLedgerCfg = cfg , mpEnvCapacityOverride = capacityOverride @@ -414,42 +395,68 @@ implSyncWithLedger :: ) => MempoolEnv m blk -> m (MempoolSnapshot blk) -implSyncWithLedger mpEnv = encloseTimedWith (TraceMempoolSynced >$< mpEnvTracer mpEnv) $ do - (res :: WithTMVarOutcome Void (MempoolSnapshot blk)) <- - withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface) $ - \is ls -> do - let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls - if pointHash (isTip is) == castHash (getTipHash ls) && isSlotNo is == slot - then do - -- The tip didn't change, put the same state. - traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is) - pure (OK (snapshotFromIS is), is) - else do - -- We need to revalidate - let pt = castPoint (getTip ls) - mTbs <- getLedgerTablesAtFor ldgrInterface pt (isTxKeys is) - case mTbs of - Just tbs -> do - let (is', mTrace) = - pureSyncWithLedger - capacityOverride - cfg - slot - ls' - tbs - is - whenJust mTrace (traceWith trcr) - pure (OK (snapshotFromIS is'), is') - Nothing -> do - -- If the point is gone, resync - pure (Resync, is) - case res of - OK v -> pure v - Resync -> implSyncWithLedger mpEnv +implSyncWithLedger mpEnv = + encloseTimedWith (TraceMempoolSynced >$< mpEnvTracer mpEnv) $ do + res <- + -- There could possibly be a race condition if we used there the state + -- that triggered the re-syncing in the background watcher, if a different + -- action acquired the state before the revalidation started. + -- + -- For that reason, we read the state again here in the same STM + -- transaction in which we acquire the internal state of the mempool. + -- + -- This implies that the watcher might be triggered again with the same + -- state from the point of view of the mempool, if after the watcher saw a + -- new state and this read for re-syncing, the state has changed. The + -- watcher will see it once again and trigger re-validation again. Just + -- for performance reasons, we will avoid re-validating the mempool if the + -- state didn't change. + withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface registry) $ + \is (MempoolLedgerDBView ls meFrk) -> do + eFrk <- meFrk + case eFrk of + -- This case should happen only if the tip has moved again, this time + -- to a separate fork, since the background thread saw a change in the + -- tip, which should happen very rarely + Left{} -> do + traceWith trcr TraceMempoolTipMovedBetweenSTMBlocks + pure (Nothing, is) + Right frk -> do + let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls + if pointHash (isTip is) == castHash (getTipHash ls) && isSlotNo is == slot + then do + -- The tip didn't change, put the same state. + traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is) + pure (Just (snapshotFromIS is), is) + else do + -- The tip changed, we have to revalidate + modifyMVar_ + forkerMVar + ( \oldFrk -> do + roforkerClose oldFrk + pure frk + ) + tbs <- castLedgerTables <$> roforkerReadTables frk (castLedgerTables $ isTxKeys is) + let (is', mTrace) = + pureSyncWithLedger + capacityOverride + cfg + slot + ls' + tbs + is + whenJust mTrace (traceWith trcr) + pure (Just (snapshotFromIS is'), is') + maybe + (implSyncWithLedger mpEnv) + pure + res where MempoolEnv { mpEnvStateVar = istate + , mpEnvForker = forkerMVar , mpEnvLedger = ldgrInterface + , mpEnvRegistry = registry , mpEnvTracer = trcr , mpEnvLedgerCfg = cfg , mpEnvCapacityOverride = capacityOverride diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs index 7eeb5716f7..303fbcf78e 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs @@ -383,14 +383,6 @@ data ChainDB m blk = ChainDB , getChainSelStarvation :: STM m ChainSelStarvation -- ^ Whether ChainSel is currently starved, or when was last time it -- stopped being starved. - , getLedgerTablesAtFor :: - Point blk -> - LedgerTables (ExtLedgerState blk) KeysMK -> - m (Maybe (LedgerTables (ExtLedgerState blk) ValuesMK)) - -- ^ Read ledger tables at a given point on the chain, if it exists. - -- - -- This is intended to be used by the mempool to hydrate a ledger state at - -- a specific point. , getStatistics :: m (Maybe Statistics) -- ^ Get statistics from the LedgerDB, in particular the number of entries -- in the tables. diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs index a36a36a0d2..a33055b067 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs @@ -271,7 +271,6 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do , getPastLedger = getEnvSTM1 h Query.getPastLedger , getHeaderStateHistory = getEnvSTM h Query.getHeaderStateHistory , getReadOnlyForkerAtPoint = getEnv2 h Query.getReadOnlyForkerAtPoint - , getLedgerTablesAtFor = getEnv2 h Query.getLedgerTablesAtFor , getStatistics = getEnv h Query.getStatistics } addBlockTestFuse <- newFuse "test chain selection" diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs index fcc148b6b1..821586f745 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs @@ -16,7 +16,6 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Query , getIsFetched , getIsInvalidBlock , getIsValid - , getLedgerTablesAtFor , getMaxSlotNo , getPastLedger , getReadOnlyForkerAtPoint @@ -42,7 +41,7 @@ import Ouroboros.Consensus.HeaderStateHistory ( HeaderStateHistory (..) ) import Ouroboros.Consensus.HeaderValidation (HeaderWithTime) -import Ouroboros.Consensus.Ledger.Abstract (EmptyMK, KeysMK, ValuesMK) +import Ouroboros.Consensus.Ledger.Abstract (EmptyMK) import Ouroboros.Consensus.Ledger.Extended import Ouroboros.Consensus.Protocol.Abstract import Ouroboros.Consensus.Storage.ChainDB.API @@ -260,16 +259,6 @@ getReadOnlyForkerAtPoint :: m (Either LedgerDB.GetForkerError (LedgerDB.ReadOnlyForker' m blk)) getReadOnlyForkerAtPoint CDB{..} = LedgerDB.getReadOnlyForker cdbLedgerDB -getLedgerTablesAtFor :: - IOLike m => - ChainDbEnv m blk -> - Point blk -> - LedgerTables (ExtLedgerState blk) KeysMK -> - m (Maybe (LedgerTables (ExtLedgerState blk) ValuesMK)) -getLedgerTablesAtFor = - (\ldb pt ks -> eitherToMaybe <$> LedgerDB.readLedgerTablesAtFor ldb pt ks) - . cdbLedgerDB - getStatistics :: IOLike m => ChainDbEnv m blk -> m (Maybe LedgerDB.Statistics) getStatistics CDB{..} = LedgerDB.getTipStatistics cdbLedgerDB diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs index 24dcc84414..3491f343da 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs @@ -146,7 +146,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.API -- * Forker , getReadOnlyForker , getTipStatistics - , readLedgerTablesAtFor , withPrivateTipForker , withTipForker @@ -408,19 +407,6 @@ getReadOnlyForker :: m (Either GetForkerError (ReadOnlyForker m l blk)) getReadOnlyForker ldb rr pt = fmap readOnlyForker <$> getForkerAtTarget ldb rr pt --- | Read a table of values at the requested point via a 'ReadOnlyForker' -readLedgerTablesAtFor :: - IOLike m => - LedgerDB m l blk -> - Point blk -> - LedgerTables l KeysMK -> - m (Either GetForkerError (LedgerTables l ValuesMK)) -readLedgerTablesAtFor ldb p ks = - bracketWithPrivateRegistry - (\rr -> fmap readOnlyForker <$> getForkerAtTarget ldb rr (SpecificPoint p)) - (mapM_ roforkerClose) - $ \foEith -> Monad.forM foEith (`roforkerReadTables` ks) - {------------------------------------------------------------------------------- Snapshots -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs index d25355882b..cea2fc630d 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/Forker.hs @@ -1,7 +1,7 @@ {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE DerivingVia #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FunctionalDependencies #-} @@ -28,6 +28,8 @@ module Ouroboros.Consensus.Storage.LedgerDB.Forker , RangeQueryPrevious (..) , Statistics (..) , forkerCurrentPoint + , castRangeQueryPrevious + , ledgerStateReadOnlyForker -- ** Read only , ReadOnlyForker (..) @@ -126,6 +128,8 @@ data Forker m l blk = Forker -- ^ Commit the fork, which was constructed using 'forkerPush', as the -- current version of the LedgerDB. } + deriving Generic + deriving NoThunks via OnlyCheckWhnf (Forker m l blk) -- | An identifier for a 'Forker'. See 'ldbForkers'. newtype ForkerKey = ForkerKey Word16 @@ -144,6 +148,11 @@ instance data RangeQueryPrevious l = NoPreviousQuery | PreviousQueryWasFinal | PreviousQueryWasUpTo (TxIn l) +castRangeQueryPrevious :: TxIn l ~ TxIn l' => RangeQueryPrevious l -> RangeQueryPrevious l' +castRangeQueryPrevious NoPreviousQuery = NoPreviousQuery +castRangeQueryPrevious PreviousQueryWasFinal = PreviousQueryWasFinal +castRangeQueryPrevious (PreviousQueryWasUpTo txin) = PreviousQueryWasUpTo txin + data RangeQuery l = RangeQuery { rqPrev :: !(RangeQueryPrevious l) , rqCount :: !Int @@ -191,6 +200,25 @@ forkerCurrentPoint forker = . getTip <$> forkerGetLedgerState forker +ledgerStateReadOnlyForker :: + IOLike m => ReadOnlyForker m (ExtLedgerState blk) blk -> ReadOnlyForker m (LedgerState blk) blk +ledgerStateReadOnlyForker frk = + ReadOnlyForker + { roforkerClose = roforkerClose + , roforkerReadTables = fmap castLedgerTables . roforkerReadTables . castLedgerTables + , roforkerRangeReadTables = fmap castLedgerTables . roforkerRangeReadTables . castRangeQueryPrevious + , roforkerGetLedgerState = ledgerState <$> roforkerGetLedgerState + , roforkerReadStatistics = roforkerReadStatistics + } + where + ReadOnlyForker + { roforkerClose + , roforkerReadTables + , roforkerRangeReadTables + , roforkerGetLedgerState + , roforkerReadStatistics + } = frk + {------------------------------------------------------------------------------- Read-only forkers -------------------------------------------------------------------------------} @@ -218,6 +246,11 @@ data ReadOnlyForker m l blk = ReadOnlyForker , roforkerReadStatistics :: !(m (Maybe Statistics)) -- ^ See 'forkerReadStatistics' } + deriving Generic + +instance NoThunks (ReadOnlyForker m l blk) where + wNoThunks _ _ = pure Nothing + showTypeOf _ = "ReadOnlyForker" type instance HeaderHash (ReadOnlyForker m l blk) = HeaderHash l diff --git a/ouroboros-consensus/src/unstable-mempool-test-utils/Test/Consensus/Mempool/Mocked.hs b/ouroboros-consensus/src/unstable-mempool-test-utils/Test/Consensus/Mempool/Mocked.hs index 1bda3971ba..f0e09f43a9 100644 --- a/ouroboros-consensus/src/unstable-mempool-test-utils/Test/Consensus/Mempool/Mocked.hs +++ b/ouroboros-consensus/src/unstable-mempool-test-utils/Test/Consensus/Mempool/Mocked.hs @@ -22,19 +22,19 @@ import Control.Concurrent.Class.MonadSTM.Strict , atomically , newTVarIO , readTVar - , readTVarIO , writeTVar ) import Control.DeepSeq (NFData (rnf)) +import Control.ResourceRegistry import Control.Tracer (Tracer) import qualified Data.List.NonEmpty as NE -import Ouroboros.Consensus.Block (castPoint) import Ouroboros.Consensus.HeaderValidation as Header import Ouroboros.Consensus.Ledger.Basics import qualified Ouroboros.Consensus.Ledger.Basics as Ledger import qualified Ouroboros.Consensus.Ledger.SupportsMempool as Ledger import Ouroboros.Consensus.Ledger.Tables.Utils - ( forgetLedgerTables + ( emptyLedgerTables + , forgetLedgerTables , restrictValues' ) import Ouroboros.Consensus.Mempool (Mempool) @@ -43,6 +43,8 @@ import Ouroboros.Consensus.Mempool.API ( AddTxOnBehalfOf , MempoolAddTxResult ) +import Ouroboros.Consensus.Mempool.Impl.Common (MempoolLedgerDBView (MempoolLedgerDBView)) +import Ouroboros.Consensus.Storage.LedgerDB.Forker data MockedMempool m blk = MockedMempool { getLedgerInterface :: !(Mempool.LedgerInterface m blk) @@ -62,7 +64,7 @@ instance NFData (MockedMempool m blk) where rnf MockedMempool{} = () data InitialMempoolAndModelParams blk = MempoolAndModelParams - { immpInitialState :: !(Ledger.LedgerState blk ValuesMK) + { immpInitialState :: !(LedgerState blk ValuesMK) -- ^ Initial ledger state for the mocked Ledger DB interface. , immpLedgerConfig :: !(Ledger.LedgerConfig blk) -- ^ Ledger configuration, which is needed to open the mempool. @@ -79,17 +81,29 @@ openMockedMempool :: IO (MockedMempool IO blk) openMockedMempool capacityOverride tracer initialParams = do currentLedgerStateTVar <- newTVarIO (immpInitialState initialParams) + reg <- unsafeNewRegistry let ledgerItf = Mempool.LedgerInterface - { Mempool.getCurrentLedgerState = forgetLedgerTables <$> readTVar currentLedgerStateTVar - , Mempool.getLedgerTablesAtFor = \pt keys -> do - st <- readTVarIO currentLedgerStateTVar - if castPoint (getTip st) == pt - then pure $ Just $ restrictValues' st keys - else pure Nothing + { Mempool.getCurrentLedgerState = \_reg -> do + st <- readTVar currentLedgerStateTVar + pure $ + MempoolLedgerDBView + (forgetLedgerTables st) + ( pure $ + Right $ + ReadOnlyForker + { roforkerClose = pure () + , roforkerGetLedgerState = pure (forgetLedgerTables st) + , roforkerReadTables = \keys -> + pure $ projectLedgerTables st `restrictValues'` keys + , roforkerReadStatistics = pure Nothing + , roforkerRangeReadTables = \_ -> pure emptyLedgerTables + } + ) } mempool <- Mempool.openMempoolWithoutSyncThread + reg ledgerItf (immpLedgerConfig initialParams) capacityOverride diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool.hs index da792e2599..434bcfa9c6 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool.hs @@ -36,6 +36,7 @@ import Control.Monad (foldM, forM, forM_, void) import Control.Monad.Except (runExcept) import Control.Monad.IOSim (runSimOrThrow) import Control.Monad.State (State, evalState, get, modify) +import Control.ResourceRegistry import Control.Tracer (Tracer (..)) import Data.Bifunctor (first, second) import Data.Either (isRight) @@ -47,13 +48,14 @@ import Data.Maybe (mapMaybe) import Data.Semigroup (stimes) import qualified Data.Set as Set import Data.Word -import Ouroboros.Consensus.Block import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.SupportsMempool import Ouroboros.Consensus.Ledger.Tables.Utils import Ouroboros.Consensus.Mempool +import Ouroboros.Consensus.Mempool.Impl.Common (MempoolLedgerDBView (..)) import Ouroboros.Consensus.Mempool.TxSeq as TxSeq import Ouroboros.Consensus.Mock.Ledger hiding (TxId) +import Ouroboros.Consensus.Storage.LedgerDB.Forker import Ouroboros.Consensus.Util (repeatedly, repeatedlyM) import Ouroboros.Consensus.Util.Condense (condense) import Ouroboros.Consensus.Util.IOLike @@ -295,7 +297,7 @@ prop_Mempool_TraceRemovedTxs setup = -- Sync the mempool with the ledger. Now some of the transactions in the -- mempool should have been removed. - void $ syncWithLedger mempool + void $ testSyncWithLedger mempool -- Predict which transactions should have been removed curLedger <- atomically getCurrentLedger @@ -703,12 +705,22 @@ withTestMempool setup@TestSetup{..} prop = varCurrentLedgerState <- uncheckedNewTVarM testLedgerState let ledgerInterface = LedgerInterface - { getCurrentLedgerState = forgetLedgerTables <$> readTVar varCurrentLedgerState - , getLedgerTablesAtFor = \pt keys -> do - st <- atomically $ readTVar varCurrentLedgerState - if castPoint (getTip st) == pt - then pure $ Just $ restrictValues' st keys - else pure Nothing + { getCurrentLedgerState = \_reg -> do + st <- readTVar varCurrentLedgerState + pure $ + MempoolLedgerDBView + (forgetLedgerTables st) + ( pure $ + Right $ + ReadOnlyForker + { roforkerClose = pure () + , roforkerReadTables = + pure . (projectLedgerTables st `restrictValues'`) + , roforkerRangeReadTables = const $ pure emptyLedgerTables + , roforkerGetLedgerState = pure $ forgetLedgerTables st + , roforkerReadStatistics = pure Nothing + } + ) } -- Set up the Tracer @@ -716,42 +728,44 @@ withTestMempool setup@TestSetup{..} prop = -- TODO use IOSim's dynamicTracer let tracer = Tracer $ \ev -> atomically $ modifyTVar varEvents (ev :) - -- Open the mempool and add the initial transactions - mempool <- - openMempoolWithoutSyncThread - ledgerInterface - testLedgerCfg - testMempoolCapOverride - tracer - result <- addTxs mempool testInitialTxs - - -- the invalid transactions are reported in the same order they were - -- added, so the first error is not the result of a cascade - sequence_ - [ error $ "Invalid initial transaction: " <> condense invalidTx <> " because of error " <> show err - | MempoolTxRejected invalidTx err <- result - ] + withRegistry $ \reg -> do + -- Open the mempool and add the initial transactions + mempool <- + openMempoolWithoutSyncThread + reg + ledgerInterface + testLedgerCfg + testMempoolCapOverride + tracer + result <- addTxs mempool testInitialTxs + + -- the invalid transactions are reported in the same order they were + -- added, so the first error is not the result of a cascade + sequence_ + [ error $ "Invalid initial transaction: " <> condense invalidTx <> " because of error " <> show err + | MempoolTxRejected invalidTx err <- result + ] - -- Clear the trace - atomically $ writeTVar varEvents [] - - -- Apply the property to the 'TestMempool' record - res <- - property - <$> prop - TestMempool - { mempool - , getTraceEvents = atomically $ reverse <$> readTVar varEvents - , eraseTraceEvents = atomically $ writeTVar varEvents [] - , addTxsToLedger = addTxsToLedger varCurrentLedgerState - , getCurrentLedger = readTVar varCurrentLedgerState - } - validContents <- - atomically $ - checkMempoolValidity - <$> readTVar varCurrentLedgerState - <*> getSnapshot mempool - return $ res .&&. validContents + -- Clear the trace + atomically $ writeTVar varEvents [] + + -- Apply the property to the 'TestMempool' record + res <- + property + <$> prop + TestMempool + { mempool + , getTraceEvents = atomically $ reverse <$> readTVar varEvents + , eraseTraceEvents = atomically $ writeTVar varEvents [] + , addTxsToLedger = addTxsToLedger varCurrentLedgerState + , getCurrentLedger = readTVar varCurrentLedgerState + } + validContents <- + atomically $ + checkMempoolValidity + <$> readTVar varCurrentLedgerState + <*> getSnapshot mempool + return $ res .&&. validContents addTxToLedger :: forall m. @@ -1154,8 +1168,8 @@ executeAction testMempool action = case action of currentTicketAssignment :: IOLike m => Mempool m TestBlock -> m TicketAssignment -currentTicketAssignment Mempool{syncWithLedger} = do - MempoolSnapshot{snapshotTxs} <- syncWithLedger +currentTicketAssignment Mempool{testSyncWithLedger} = do + MempoolSnapshot{snapshotTxs} <- testSyncWithLedger return $ Map.fromList [ (ticketNo, txId (txForgetValidated tx)) diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/Fairness.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/Fairness.hs index 5b2cff3845..8ddb62e312 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/Fairness.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/Fairness.hs @@ -19,6 +19,7 @@ import Control.Concurrent (threadDelay) import qualified Control.Concurrent.Async as Async import Control.Exception (assert) import Control.Monad (forever, void) +import Control.ResourceRegistry import qualified Control.Tracer as Tracer import Data.Foldable (asum) import qualified Data.List as List @@ -32,6 +33,8 @@ import Ouroboros.Consensus.Ledger.Tables.Utils import Ouroboros.Consensus.Mempool (Mempool) import qualified Ouroboros.Consensus.Mempool as Mempool import qualified Ouroboros.Consensus.Mempool.Capacity as Mempool +import Ouroboros.Consensus.Mempool.Impl.Common (MempoolLedgerDBView (..)) +import Ouroboros.Consensus.Storage.LedgerDB.Forker import Ouroboros.Consensus.Util.IOLike (STM, atomically, retry) import System.Random (randomIO) import Test.Consensus.Mempool.Fairness.TestBlock @@ -95,47 +98,57 @@ testTxSizeFairness TestParams{mempoolMaxCapacity, smallTxSize, largeTxSize, nrOf ledgerItf :: Mempool.LedgerInterface IO TestBlock ledgerItf = Mempool.LedgerInterface - { Mempool.getCurrentLedgerState = + { Mempool.getCurrentLedgerState = \_reg -> pure $ - testInitLedgerWithState NoPayLoadDependentState - , Mempool.getLedgerTablesAtFor = \_ _ -> - pure $ - Just emptyLedgerTables + MempoolLedgerDBView + (testInitLedgerWithState NoPayLoadDependentState) + ( pure $ + Right $ + ReadOnlyForker + { roforkerClose = pure () + , roforkerReadTables = const $ pure emptyLedgerTables + , roforkerRangeReadTables = const $ pure emptyLedgerTables + , roforkerGetLedgerState = pure $ testInitLedgerWithState NoPayLoadDependentState + , roforkerReadStatistics = pure Nothing + } + ) } eraParams = HardFork.defaultEraParams (Consensus.SecurityParam $ knownNonZeroBounded @10) (Time.slotLengthFromSec 2) - mempool <- - Mempool.openMempoolWithoutSyncThread - ledgerItf - (testBlockLedgerConfigFrom eraParams) - (Mempool.mkCapacityBytesOverride mempoolMaxCapacity) - Tracer.nullTracer - ---------------------------------------------------------------------------- - -- Add and collect transactions - ---------------------------------------------------------------------------- - let waitForSmallAddersToFillMempool = threadDelay 1_000 - txs <- - runConcurrently - [ adders mempool smallTxSize - , waitForSmallAddersToFillMempool >> adders mempool largeTxSize - , waitForSmallAddersToFillMempool >> remover mempool nrOftxsToCollect - ] + withRegistry $ \reg -> do + mempool <- + Mempool.openMempoolWithoutSyncThread + reg + ledgerItf + (testBlockLedgerConfigFrom eraParams) + (Mempool.mkCapacityBytesOverride mempoolMaxCapacity) + Tracer.nullTracer + ---------------------------------------------------------------------------- + -- Add and collect transactions + ---------------------------------------------------------------------------- + let waitForSmallAddersToFillMempool = threadDelay 1_000 + txs <- + runConcurrently + [ adders mempool smallTxSize + , waitForSmallAddersToFillMempool >> adders mempool largeTxSize + , waitForSmallAddersToFillMempool >> remover mempool nrOftxsToCollect + ] - ---------------------------------------------------------------------------- - -- Count the small and large transactions - ---------------------------------------------------------------------------- - let - nrSmall :: Double - nrLarge :: Double - (nrSmall, nrLarge) = - (fromIntegral . length *** fromIntegral . length) $ - List.partition (<= smallTxSize) $ - fmap txSize txs - length txs @?= nrOftxsToCollect - theRatioOfTheDifferenceBetween nrSmall nrLarge `isBelow` toleranceThreshold + ---------------------------------------------------------------------------- + -- Count the small and large transactions + ---------------------------------------------------------------------------- + let + nrSmall :: Double + nrLarge :: Double + (nrSmall, nrLarge) = + (fromIntegral . length *** fromIntegral . length) $ + List.partition (<= smallTxSize) $ + fmap txSize txs + length txs @?= nrOftxsToCollect + theRatioOfTheDifferenceBetween nrSmall nrLarge `isBelow` toleranceThreshold where theRatioOfTheDifferenceBetween x y = (abs (x - y) / (x + y), x, y) diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/StateMachine.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/StateMachine.hs index 33dfd05954..5d2f8803a2 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/StateMachine.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/StateMachine.hs @@ -30,6 +30,7 @@ import Control.Arrow (second) import Control.Concurrent.Class.MonadSTM.Strict.TChan import Control.Monad (void) import Control.Monad.Except (runExcept) +import Control.ResourceRegistry import qualified Control.Tracer as CT (Tracer (..), traceWith) import qualified Data.Foldable as Foldable import Data.Function (on) @@ -58,6 +59,7 @@ import Ouroboros.Consensus.Mock.Ledger.Block import Ouroboros.Consensus.Mock.Ledger.State import Ouroboros.Consensus.Mock.Ledger.UTxO (Expiry, Tx, TxIn, TxOut) import qualified Ouroboros.Consensus.Mock.Ledger.UTxO as Mock +import Ouroboros.Consensus.Storage.LedgerDB.Forker import Ouroboros.Consensus.Util import Ouroboros.Consensus.Util.Condense (condense) import Ouroboros.Consensus.Util.IOLike hiding (bracket) @@ -555,33 +557,23 @@ newLedgerInterface initialLedger = do t <- newTVarIO $ MockedLedgerDB initialLedger Set.empty Set.empty pure ( LedgerInterface - { getCurrentLedgerState = forgetLedgerTables . ldbTip <$> readTVar t - , getLedgerTablesAtFor = \pt keys -> do - MockedLedgerDB ti oldReachableTips _ <- atomically $ readTVar t - if pt == castPoint (getTip ti) -- if asking for tables at the tip of the - -- ledger db - then - let tbs = ltliftA2 f keys $ projectLedgerTables ti - in pure $ Just tbs - else case Foldable.find ((castPoint pt ==) . getTip) oldReachableTips of - Nothing -> pure Nothing - Just mtip -> - if pt == castPoint (getTip mtip) - -- if asking for tables at some still reachable state - then - let tbs = ltliftA2 f keys $ projectLedgerTables mtip - in pure $ Just tbs - else - -- if asking for tables at other point or at the mempool tip but - -- it is not reachable - pure Nothing + { getCurrentLedgerState = \_reg -> do + st <- ldbTip <$> readTVar t + pure + ( forgetLedgerTables st + , pure $ + Right $ + ReadOnlyForker + { roforkerClose = pure () + , roforkerReadStatistics = pure Nothing + , roforkerReadTables = pure . (projectLedgerTables st `restrictValues'`) + , roforkerRangeReadTables = const $ pure emptyLedgerTables + , roforkerGetLedgerState = pure $ forgetLedgerTables st + } + ) } , t ) - where - f :: Ord k => KeysMK k v -> ValuesMK k v -> ValuesMK k v - f (KeysMK s) (ValuesMK v) = - ValuesMK (Map.restrictKeys v s) -- | Make a SUT mkSUT :: @@ -594,20 +586,22 @@ mkSUT :: ) => LedgerConfig blk -> LedgerState blk ValuesMK -> - m (SUT m blk, CT.Tracer m String) + m (SUT m blk, CT.Tracer m String, ResourceRegistry m) mkSUT cfg initialLedger = do (lif, t) <- newLedgerInterface initialLedger trcrChan <- atomically newTChan :: m (StrictTChan m (Either String (TraceEventMempool blk))) let trcr = CT.Tracer $ -- Dbg.traceShowM @(Either String (TraceEventMempool blk)) atomically . writeTChan trcrChan + reg <- unsafeNewRegistry mempool <- openMempoolWithoutSyncThread + reg lif cfg (MempoolCapacityBytesOverride $ unIgnoringOverflow txMaxBytes') (CT.Tracer $ CT.traceWith trcr . Right) - pure (SUT mempool t, CT.Tracer $ atomically . writeTChan trcrChan . Left) + pure (SUT mempool t, CT.Tracer $ atomically . writeTChan trcrChan . Left, reg) semantics :: ( MonadSTM m @@ -709,10 +703,12 @@ sm :: StateMachine (Model blk) (Command blk) m (Response blk) -> CT.Tracer m String -> StrictTVar m (SUT m blk) -> + ResourceRegistry m -> StateMachine (Model blk) (Command blk) m (Response blk) -sm sm0 trcr ior = +sm sm0 trcr ior reg = sm0 { QC.semantics = \c -> semantics trcr c ior + , QC.cleanup = \_ -> closeRegistry reg } smUnused :: @@ -767,9 +763,9 @@ prop_mempoolSequential cfg capacity initialState gTxs = forAllCommands sm0 Nothi \cmds -> monadicIO ( do - (sut, trcr) <- run $ mkSUT cfg initialState + (sut, trcr, reg) <- run $ mkSUT cfg initialState ior <- run $ newTVarIO sut - let sm' = sm sm0 trcr ior + let sm' = sm sm0 trcr ior reg (hist, model, res) <- runCommands sm' cmds prettyCommands sm0 hist $ checkCommandNames cmds @@ -809,9 +805,9 @@ prop_mempoolParallel :: Property prop_mempoolParallel cfg capacity initialState ma gTxs = forAllParallelCommandsNTimes sm0 Nothing 100 $ \cmds -> monadicIO $ do - (sut, trcr) <- run $ mkSUT cfg initialState + (sut, trcr, reg) <- run $ mkSUT cfg initialState ior <- run $ newTVarIO sut - let sm' = sm sm0 trcr ior + let sm' = sm sm0 trcr ior reg res <- runParallelCommands sm' cmds prettyParallelCommandsWithOpts cmds From 194067161932d3010cd6d27d5117e49c7876ae43 Mon Sep 17 00:00:00 2001 From: Javier Sagredo Date: Tue, 24 Jun 2025 13:34:50 +0200 Subject: [PATCH 2/2] Re-enable Mempool's parallel QSM test --- .../Test/Consensus/Mempool/StateMachine.hs | 231 ++++++++---------- 1 file changed, 108 insertions(+), 123 deletions(-) diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/StateMachine.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/StateMachine.hs index 5d2f8803a2..8f8d14bf70 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/StateMachine.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/Mempool/StateMachine.hs @@ -28,7 +28,6 @@ module Test.Consensus.Mempool.StateMachine (tests) where import Cardano.Slotting.Slot import Control.Arrow (second) import Control.Concurrent.Class.MonadSTM.Strict.TChan -import Control.Monad (void) import Control.Monad.Except (runExcept) import Control.ResourceRegistry import qualified Control.Tracer as CT (Tracer (..), traceWith) @@ -52,7 +51,7 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol ) import Ouroboros.Consensus.Ledger.Tables.Utils import Ouroboros.Consensus.Mempool -import Ouroboros.Consensus.Mempool.Impl.Common (tickLedgerState) +import Ouroboros.Consensus.Mempool.Impl.Common (MempoolLedgerDBView (..), tickLedgerState) import Ouroboros.Consensus.Mempool.TxSeq import Ouroboros.Consensus.Mock.Ledger.Address import Ouroboros.Consensus.Mock.Ledger.Block @@ -63,6 +62,7 @@ import Ouroboros.Consensus.Storage.LedgerDB.Forker import Ouroboros.Consensus.Util import Ouroboros.Consensus.Util.Condense (condense) import Ouroboros.Consensus.Util.IOLike hiding (bracket) +import Ouroboros.Network.Block (genesisPoint) import Test.Cardano.Ledger.TreeDiff () import Test.Consensus.Mempool.Util ( TestBlock @@ -88,21 +88,12 @@ import Test.Util.ToExpr () Datatypes -------------------------------------------------------------------------------} --- | Whether the LedgerDB should be wiped out -data ModifyDB = KeepDB | ClearDB deriving (Generic, ToExpr, NoThunks) - -instance Arbitrary ModifyDB where - arbitrary = elements [KeepDB, ClearDB] - -keepsDB :: ModifyDB -> Bool -keepsDB KeepDB = True -keepsDB ClearDB = False - -- | The model data Model blk r = Model { modelMempoolIntermediateState :: !(TickedLedgerState blk ValuesMK) -- ^ The current tip on the mempool , modelTxs :: ![(GenTx blk, TicketNo)] + , modelAllValidTxs :: ![(GenTx blk, TicketNo)] -- ^ The current list of transactions , modelCurrentSize :: !(TxMeasure blk) -- ^ The current size of the mempool @@ -116,12 +107,9 @@ data Model blk r = Model modelLedgerDBTip :: !(LedgerState blk ValuesMK) -- ^ The current tip on the ledgerdb - , modelReachableStates :: !(Set (LedgerState blk ValuesMK)) - -- ^ The old states which are still on the LedgerDB. These should - -- technically be ancestors of the tip, but for the mempool we don't care. - , modelOtherStates :: !(Set (LedgerState blk ValuesMK)) - -- ^ States which were previously on the LedgerDB. We keep these so that - -- 'ChangeLedger' does not generate a different state with the same hash. + , modelLedgerDBOtherStates :: !(Set (LedgerState blk ValuesMK)) + -- ^ The old states which are still on the LedgerDB. + , modelIsSyncing :: !Bool } -- | The commands used by QSM @@ -152,7 +140,6 @@ data Action blk r data Event blk r = ChangeLedger !(LedgerState blk ValuesMK) - !ModifyDB deriving Generic1 deriving (Rank2.Functor, Rank2.Foldable, Rank2.Traversable, CommandNames) @@ -240,18 +227,15 @@ generator ma gTxs model = ( getTip modelLedgerDBTip `Set.insert` Set.map getTip - ( modelOtherStates - `Set.union` modelReachableStates - ) + modelLedgerDBOtherStates ) . getTip ) ] - ++ (if Set.null modelReachableStates then [] else [elements (Set.toList modelReachableStates)]) - ++ (if Set.null modelOtherStates then [] else [elements (Set.toList modelOtherStates)]) + ++ (if Set.null modelLedgerDBOtherStates then [] else [elements (Set.toList modelLedgerDBOtherStates)]) ) `suchThat` (not . (== (getTip modelLedgerDBTip)) . getTip) - Event . ChangeLedger ls <$> arbitrary + pure $ Event $ ChangeLedger ls ) , (10, pure $ Action GetSnapshot) ] @@ -259,8 +243,7 @@ generator ma gTxs model = Model { modelMempoolIntermediateState , modelLedgerDBTip - , modelReachableStates - , modelOtherStates + , modelLedgerDBOtherStates } = model data Response blk r @@ -268,6 +251,8 @@ data Response blk r Void | -- | Return the contents of a snapshot GotSnapshot ![(GenTx blk, TicketNo)] + | AddResult ![MempoolAddTxResult blk] + | Synced !(Point blk, [(GenTx blk, TicketNo)]) deriving Generic1 deriving (Rank2.Functor, Rank2.Foldable, Rank2.Traversable) @@ -286,14 +271,15 @@ initModel :: initModel cfg capacity initialState = Model { modelMempoolIntermediateState = ticked - , modelReachableStates = Set.empty + , modelLedgerDBOtherStates = Set.empty , modelLedgerDBTip = initialState , modelTxs = [] , modelCurrentSize = Measure.zero + , modelAllValidTxs = [] , modelLastSeenTicketNo = zeroTicketNo , modelCapacity = capacity , modelConfig = cfg - , modelOtherStates = Set.empty + , modelIsSyncing = False } where ticked = tick cfg initialState @@ -303,10 +289,10 @@ mock :: Command blk Symbolic -> GenSym (Response blk Symbolic) mock model = \case - Action (TryAddTxs _) -> pure Void - Action SyncLedger -> pure Void + Action (TryAddTxs _) -> pure $ AddResult [] + Action SyncLedger -> pure $ Synced (genesisPoint, []) Action GetSnapshot -> pure $ GotSnapshot $ modelTxs model - Event (ChangeLedger _ _) -> pure Void + Event (ChangeLedger _) -> pure Void {------------------------------------------------------------------------------- Transitions @@ -347,32 +333,22 @@ doChangeLedger :: (StandardHash blk, GetTip (LedgerState blk)) => Model blk r -> LedgerState blk ValuesMK -> - ModifyDB -> Model blk r -doChangeLedger model l' b' = +doChangeLedger model l' = model { modelLedgerDBTip = l' - , modelReachableStates = - if keepsDB b' - then l' `Set.delete` Set.insert modelLedgerDBTip modelReachableStates - else Set.empty - , modelOtherStates = - if keepsDB b' - then modelOtherStates - else modelLedgerDBTip `Set.insert` (modelOtherStates `Set.union` modelReachableStates) + , modelLedgerDBOtherStates = + Set.insert modelLedgerDBTip modelLedgerDBOtherStates } where Model { modelLedgerDBTip - , modelReachableStates - , modelOtherStates + , modelLedgerDBOtherStates } = model doTryAddTxs :: ( LedgerSupportsMempool blk , ValidateEnvelope blk - , Eq (TickedLedgerState blk ValuesMK) - , Eq (GenTx blk) ) => Model blk r -> [GenTx blk] -> @@ -381,8 +357,8 @@ doTryAddTxs model [] = model doTryAddTxs model txs = case Foldable.find ((castPoint (getTip st) ==) . getTip) - (Set.insert modelLedgerDBTip modelReachableStates) of - Nothing -> doTryAddTxs (doSync model) txs + (Set.insert modelLedgerDBTip modelLedgerDBOtherStates) of + Nothing -> error "Impossible!" Just _ -> let nextTicket = succ $ modelLastSeenTicketNo model (validTxs, tk, newSize, st'') = @@ -391,6 +367,7 @@ doTryAddTxs model txs = in model { modelMempoolIntermediateState = st'' , modelTxs = modelTxs' + , modelAllValidTxs = modelAllValidTxs ++ validTxs , modelLastSeenTicketNo = pred tk , modelCurrentSize = newSize } @@ -398,16 +375,16 @@ doTryAddTxs model txs = Model { modelMempoolIntermediateState = st , modelTxs + , modelAllValidTxs , modelCurrentSize - , modelReachableStates + , modelLedgerDBOtherStates , modelLedgerDBTip , modelConfig = cfg , modelCapacity } = model transition :: - ( Eq (GenTx blk) - , Eq (TickedLedgerState blk ValuesMK) + ( Eq (TickedLedgerState blk ValuesMK) , LedgerSupportsMempool blk , ToExpr (GenTx blk) , ValidateEnvelope blk @@ -418,10 +395,10 @@ transition :: Response blk r -> Model blk r transition model cmd resp = case (cmd, resp) of - (Action (TryAddTxs txs), Void) -> doTryAddTxs model txs - (Event (ChangeLedger l b), Void) -> doChangeLedger model l b - (Action GetSnapshot, GotSnapshot{}) -> model - (Action SyncLedger, Void) -> doSync model + (Action (TryAddTxs txs), AddResult _res) -> (doTryAddTxs model txs){modelIsSyncing = False} + (Event (ChangeLedger l), Void) -> (doChangeLedger model l){modelIsSyncing = False} + (Action GetSnapshot, GotSnapshot{}) -> model{modelIsSyncing = False} + (Action SyncLedger, Synced{}) -> (doSync model){modelIsSyncing = True} _ -> error $ "mismatched command " @@ -539,8 +516,6 @@ data MockedLedgerDB blk = MockedLedgerDB -- ^ The current LedgerDB tip , reachableTips :: !(Set (LedgerState blk ValuesMK)) -- ^ States which are still reachable in the LedgerDB - , otherStates :: !(Set (LedgerState blk ValuesMK)) - -- ^ States which are no longer reachable in the LedgerDB } deriving Generic @@ -554,23 +529,24 @@ newLedgerInterface :: LedgerState blk ValuesMK -> m (LedgerInterface m blk, StrictTVar m (MockedLedgerDB blk)) newLedgerInterface initialLedger = do - t <- newTVarIO $ MockedLedgerDB initialLedger Set.empty Set.empty + t <- newTVarIO $ MockedLedgerDB initialLedger Set.empty pure ( LedgerInterface { getCurrentLedgerState = \_reg -> do st <- ldbTip <$> readTVar t - pure - ( forgetLedgerTables st - , pure $ - Right $ - ReadOnlyForker - { roforkerClose = pure () - , roforkerReadStatistics = pure Nothing - , roforkerReadTables = pure . (projectLedgerTables st `restrictValues'`) - , roforkerRangeReadTables = const $ pure emptyLedgerTables - , roforkerGetLedgerState = pure $ forgetLedgerTables st - } - ) + pure $ + MempoolLedgerDBView + (forgetLedgerTables st) + ( pure $ + Right $ + ReadOnlyForker + { roforkerClose = pure () + , roforkerReadStatistics = pure Nothing + , roforkerReadTables = pure . (projectLedgerTables st `restrictValues'`) + , roforkerRangeReadTables = const $ pure emptyLedgerTables + , roforkerGetLedgerState = pure $ forgetLedgerTables st + } + ) } , t ) @@ -616,41 +592,22 @@ semantics trcr cmd r = do SUT m t <- atomically $ readTVar r case cmd of Action (TryAddTxs txs) -> do - mapM_ (addTx m AddTxForRemotePeer) txs - pure Void + AddResult <$> mapM (addTx m AddTxForRemotePeer) txs Action SyncLedger -> do - void $ syncWithLedger m - pure Void + snap <- testSyncWithLedger m + pure (Synced (snapshotPoint snap, [(txForgetValidated tt, tk) | (tt, tk, _) <- snapshotTxs snap])) Action GetSnapshot -> do txs <- snapshotTxs <$> atomically (getSnapshot m) pure $ GotSnapshot [(txForgetValidated vtx, tk) | (vtx, tk, _) <- txs] - Event (ChangeLedger l' newReachable) -> do + Event (ChangeLedger l') -> do CT.traceWith trcr $ "ChangingLedger to " <> show (getTip l') atomically $ do - MockedLedgerDB ledgerTip oldReachableTips oldUnreachableTips <- readTVar t + MockedLedgerDB ledgerTip oldReachableTips <- readTVar t if getTip l' == getTip ledgerTip then - if keepsDB newReachable - then pure () - else - let (newReachableTips, newUnreachableTips) = - ( Set.empty - , Set.insert ledgerTip $ - Set.union oldUnreachableTips oldReachableTips - ) - in writeTVar t (MockedLedgerDB l' newReachableTips newUnreachableTips) + pure () else - let - (newReachableTips, newUnreachableTips) = - if keepsDB newReachable - then (Set.insert ledgerTip oldReachableTips, oldUnreachableTips) - else - ( Set.empty - , Set.insert ledgerTip $ - Set.union oldUnreachableTips oldReachableTips - ) - in - writeTVar t (MockedLedgerDB l' newReachableTips newUnreachableTips) + writeTVar t (MockedLedgerDB l' (Set.insert ledgerTip oldReachableTips)) pure Void {------------------------------------------------------------------------------- @@ -660,20 +617,41 @@ semantics trcr cmd r = do precondition :: Model blk Symbolic -> Command blk Symbolic -> Logic -- precondition cfg Model {modelCurrentSize} (Action (TryAddTxs txs)) = -- Boolean $ not (null txs) && modelCurrentSize > 0 && sum (map tSize rights $ init txs) < modelCurrentSize +precondition m (Action SyncLedger) = Boolean $ not (modelIsSyncing m) precondition _ _ = Top postcondition :: ( LedgerSupportsMempool blk , Eq (GenTx blk) - -- , Show (TickedLedgerState blk ValuesMK) + , -- , Show (TickedLedgerState blk ValuesMK) + UnTick blk + , ValidateEnvelope blk + , ToExpr (Command blk Concrete) + , ToExpr (GenTx blk) ) => Model blk Concrete -> Command blk Concrete -> Response blk Concrete -> Logic postcondition model (Action GetSnapshot) (GotSnapshot txs) = - -- Annotate (show $ modelMempoolIntermediateState model) $ - modelTxs model .== txs + Annotate "Mismatch getting snapshot" $ + Annotate (show $ modelAllValidTxs model) $ + modelTxs model .== txs +postcondition model c@(Action (TryAddTxs txs)) r@(AddResult res) = + let model' = transition model c r + in Annotate "Mismatch result adding transaction" $ + Annotate (show (modelTxs model', zip txs res)) $ + Boolean $ + and + [ tx `elem` map fst (modelTxs model') + | (tx, res') <- zip txs res + , case res' of MempoolTxAdded{} -> True; _ -> False + ] +postcondition model c@(Action SyncLedger) r@(Synced (_, txs)) = + let model' = transition model c r + in Annotate "Mismatch revalidating transactions in Sync" $ + Annotate (show (modelTxs model', txs)) $ + modelTxs model' .== txs postcondition _ _ _ = Top noPostcondition :: @@ -827,14 +805,11 @@ tests = \i -> fmap (fmap fst . fst) . genTxs i , testGroup "parallel" - [ -- See ouroboros-consensus#1549 for why this test is disabled. - - -- testProperty "atomic" $ - -- withMaxSuccess 1000 $ - -- prop_mempoolParallel testLedgerConfigNoSizeLimits txMaxBytes' testInitLedger Atomic $ - -- \i -> fmap (fmap fst . fst) . genTxs i - -- , - testProperty "non atomic" $ + [ testProperty "atomic" $ + withMaxSuccess 10000 $ + prop_mempoolParallel testLedgerConfigNoSizeLimits txMaxBytes' testInitLedger Atomic $ + \i -> fmap (fmap fst . fst) . genTxs i + , testProperty "non atomic" $ withMaxSuccess 10 $ prop_mempoolParallel testLedgerConfigNoSizeLimits txMaxBytes' testInitLedger NonAtomic $ \i -> fmap (fmap fst . fst) . genTxs i @@ -913,9 +888,9 @@ instance ToExpr (Action TestBlock r) where [ App ( take 8 (tail $ init $ show txid) <> " " - <> show [(take 8 (tail $ init $ show a), b) | (a, b) <- Set.toList txins] + <> filter (/= '"') (show [(take 8 (tail $ init $ show a), b) | (a, b) <- Set.toList txins]) <> " ->> " - <> show [(condense a, b) | (_, (a, b)) <- Map.toList txouts] + <> filter (/= '"') (show [(condense a, b) | (_, (a, b)) <- Map.toList txouts]) <> "" ) [] @@ -927,12 +902,8 @@ instance ToExpr (Action TestBlock r) where toExpr GetSnapshot = App "GetSnapshot" [] instance ToExpr (LedgerState blk ValuesMK) => ToExpr (Event blk r) where - toExpr (ChangeLedger ls b) = - Rec "ChangeLedger" $ - TD.fromList - [ ("tip", toExpr ls) - , ("newFork", toExpr b) - ] + toExpr (ChangeLedger ls) = + App "ChangeLedger" [toExpr ls] instance ToExpr (Command TestBlock r) where toExpr (Action act) = toExpr act @@ -951,8 +922,21 @@ instance where toExpr Void = App "Void" [] toExpr (GotSnapshot s) = - Rec "GotSnapshot" $ - TD.fromList [("txs", toExpr s)] + App + "GotSnapshot" + [Lst [toExpr s]] + toExpr (AddResult res) = + App "AddResult" $ + [ Lst $ + map + ( (flip App []) . \case + MempoolTxAdded{} -> "OK" + MempoolTxRejected{} -> "NO" + ) + res + ] + toExpr (Synced res) = + App "Synced" [App (show res) []] instance ( ToExpr (GenTx blk) @@ -979,11 +963,12 @@ instance ToExpr (TickedLedgerState TestBlock ValuesMK) where instance ToExpr (LedgerState TestBlock ValuesMK) where toExpr (SimpleLedgerState st tbs) = - Rec "LedgerState" $ - TD.fromList - [ ("state", toExpr $ mockTip st) - , ("tables", toExpr tbs) - ] + App "LedgerState" $ + [ Lst + [ toExpr (pointSlot $ mockTip st, pointHash $ mockTip st) + , toExpr tbs + ] + ] instance ToExpr Addr where toExpr a = App (show a) [] @@ -993,7 +978,7 @@ deriving instance ToExpr Tx deriving instance ToExpr Expiry instance ToExpr (LedgerTables (LedgerState TestBlock) ValuesMK) where - toExpr = genericToExpr + toExpr (LedgerTables (ValuesMK v)) = Lst [toExpr (condense txin, condense txout) | (txin, txout) <- Map.toList v] instance ToExpr (ValuesMK TxIn TxOut) where toExpr (ValuesMK m) = App "Values" [toExpr m]