diff --git a/cabal.project b/cabal.project index 9c97a6c5ba..b983c81b56 100644 --- a/cabal.project +++ b/cabal.project @@ -14,7 +14,7 @@ repository cardano-haskell-packages -- update either of these. index-state: -- Bump this if you need newer packages from Hackage - , hackage.haskell.org 2025-09-26T20:57:57Z + , hackage.haskell.org 2025-10-23T13:39:53Z -- Bump this if you need newer packages from CHaP , cardano-haskell-packages 2025-10-01T14:54:25Z diff --git a/flake.lock b/flake.lock index acef080678..e64ab24f45 100644 --- a/flake.lock +++ b/flake.lock @@ -3,11 +3,11 @@ "CHaP": { "flake": false, "locked": { - "lastModified": 1759339316, - "narHash": "sha256-SW/K9yfhNLNCDAl2ZC8ol0w8X+AwyLin0XOvnn50468=", + "lastModified": 1761315163, + "narHash": "sha256-h+JPIMflNAOpY3XhZNcS5sUAOyO06499uWATj2j6P5Q=", "owner": "intersectmbo", "repo": "cardano-haskell-packages", - "rev": "aa50d6dffede91c8fdfcef94c71641a00214522a", + "rev": "131bcd51c4869b191e8c3afbb9f3fd326cd6e5e1", "type": "github" }, "original": { @@ -270,11 +270,11 @@ "hackageNix": { "flake": false, "locked": { - "lastModified": 1759314141, - "narHash": "sha256-eioqBX8q8H9lIj6bcG7JhwfN7Kg+OdpV1lkz7GS+/GI=", + "lastModified": 1761265459, + "narHash": "sha256-7tsC/ZcNBJR1pXWdKsRoh/qlVDhCxb1Ukr7PVd2zieE=", "owner": "input-output-hk", "repo": "hackage.nix", - "rev": "a58ee1f20b5db10c09beffc6d18505b6a253b84a", + "rev": "eb8e4d02528b4973cd09450bb62cf34997777226", "type": "github" }, "original": { diff --git a/ouroboros-consensus-diffusion/changelog.d/20251023_152349_javier.sagredo_fix_resource_registry.md b/ouroboros-consensus-diffusion/changelog.d/20251023_152349_javier.sagredo_fix_resource_registry.md new file mode 100644 index 0000000000..e39a3215bf --- /dev/null +++ b/ouroboros-consensus-diffusion/changelog.d/20251023_152349_javier.sagredo_fix_resource_registry.md @@ -0,0 +1,23 @@ + + +### Patch + +- Bump to `resource-registry ^>= 0.2`. + + + diff --git a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal index f1c5b42fcd..917d1145f7 100644 --- a/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal +++ b/ouroboros-consensus-diffusion/ouroboros-consensus-diffusion.cabal @@ -95,7 +95,7 @@ library ouroboros-network-framework ^>=0.19, ouroboros-network-protocols ^>=0.15, random, - resource-registry ^>=0.1, + resource-registry ^>=0.2, safe-wild-cards ^>=1.0, serialise ^>=0.2, text, diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index 6df036c539..8a58fe3b03 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -1158,7 +1158,7 @@ runThreadNetwork mempool txs0 - void $ allocate registry (\_ -> pure threadCrucialTxs) cancelThread + void $ allocateThread registry (\_ -> pure threadCrucialTxs) forkTxProducer coreNodeId diff --git a/ouroboros-consensus/changelog.d/20251023_152234_javier.sagredo_fix_resource_registry.md b/ouroboros-consensus/changelog.d/20251023_152234_javier.sagredo_fix_resource_registry.md new file mode 100644 index 0000000000..b8c382095d --- /dev/null +++ b/ouroboros-consensus/changelog.d/20251023_152234_javier.sagredo_fix_resource_registry.md @@ -0,0 +1,24 @@ + + +### Patch + +- Bump to `resource-registry ^>= 0.2`. + +### Non-Breaking + +- Do not open forkers unnecessarily in the Mempool when re-syncing it. + +- Committing a forker will move the handles to the registry of the LedgerDB. The + discarded fork will be queued to be released by the `garbageCollect` logic. + + diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 5f2cd98720..8774d1c049 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -344,7 +344,7 @@ library psqueues ^>=0.2.3, quiet ^>=0.2, rawlock ^>=0.1.1, - resource-registry ^>=0.1, + resource-registry ^>=0.2, semialign >=1.1, serialise ^>=0.2, singletons, @@ -393,7 +393,7 @@ library ouroboros-consensus-lsm ouroboros-consensus, primitive, random, - resource-registry ^>=0.1, + resource-registry ^>=0.2, serialise ^>=0.2, streaming, text, diff --git a/ouroboros-consensus/src/ouroboros-consensus-lsm/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs b/ouroboros-consensus/src/ouroboros-consensus-lsm/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs index fb32c422bd..659cac9f5a 100644 --- a/ouroboros-consensus/src/ouroboros-consensus-lsm/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs +++ b/ouroboros-consensus/src/ouroboros-consensus-lsm/Ouroboros/Consensus/Storage/LedgerDB/V2/LSM.hs @@ -64,6 +64,7 @@ import Data.Void import Database.LSMTree (Salt, Session, Table) import qualified Database.LSMTree as LSM import GHC.Generics +import GHC.Stack (HasCallStack) import NoThunks.Class import Ouroboros.Consensus.Block import Ouroboros.Consensus.Ledger.Abstract @@ -167,21 +168,22 @@ newLSMLedgerTablesHandle :: , IndexedMemPack (l EmptyMK) (TxOut l) ) => Tracer m LedgerDBV2Trace -> - ResourceRegistry m -> (ResourceKey m, UTxOTable m) -> m (LedgerTablesHandle m l) -newLSMLedgerTablesHandle tracer rr (resKey, t) = do +newLSMLedgerTablesHandle tracer (origResKey, t) = do traceWith tracer TraceLedgerTablesHandleCreate + tv <- newTVarIO origResKey pure LedgerTablesHandle - { close = implClose resKey - , duplicate = implDuplicate rr t tracer + { close = implClose tv + , duplicate = \rr -> implDuplicate rr t tracer , read = implRead t , readRange = implReadRange t , readAll = implReadAll t , pushDiffs = implPushDiffs t , takeHandleSnapshot = implTakeHandleSnapshot t , tablesSize = pure Nothing + , transfer = atomically . writeTVar tv } {-# INLINE implClose #-} @@ -192,8 +194,9 @@ newLSMLedgerTablesHandle tracer rr (resKey, t) = do {-# INLINE implPushDiffs #-} {-# INLINE implTakeHandleSnapshot #-} -implClose :: IOLike m => ResourceKey m -> m () -implClose = Monad.void . release +implClose :: (HasCallStack, IOLike m) => StrictTVar m (ResourceKey m) -> m () +implClose tv = + Monad.void $ release =<< readTVarIO tv implDuplicate :: ( IOLike m @@ -203,9 +206,9 @@ implDuplicate :: ResourceRegistry m -> UTxOTable m -> Tracer m LedgerDBV2Trace -> - m (LedgerTablesHandle m l) + m (ResourceKey m, LedgerTablesHandle m l) implDuplicate rr t tracer = do - table <- + (rk, table) <- allocate rr (\_ -> LSM.duplicate t) @@ -213,7 +216,7 @@ implDuplicate rr t tracer = do traceWith tracer TraceLedgerTablesHandleClose LSM.closeTable t' ) - newLSMLedgerTablesHandle tracer rr table + (rk,) <$> newLSMLedgerTablesHandle tracer (rk, table) implRead :: forall m l. @@ -461,7 +464,7 @@ loadSnapshot tracer rr ccfg fs session ds = case pointToWithOriginRealPoint (castPoint (getTip extLedgerSt)) of Origin -> throwE InitFailureGenesis NotOrigin pt -> do - values <- + (rk, values) <- lift $ allocate rr @@ -481,7 +484,7 @@ loadSnapshot tracer rr ccfg fs session ds = $ InitFailureRead ReadSnapshotDataCorruption (,pt) - <$> lift (empty extLedgerSt values (newLSMLedgerTablesHandle tracer rr)) + <$> lift (empty extLedgerSt (rk, values) (newLSMLedgerTablesHandle tracer)) -- | Create the initial LSM table from values, which should happen only at -- Genesis. @@ -495,18 +498,16 @@ tableFromValuesMK :: LedgerTables l ValuesMK -> m (ResourceKey m, UTxOTable m) tableFromValuesMK tracer rr session st (LedgerTables (ValuesMK values)) = do - res@(_, table) <- + (rk, table) <- allocate rr - ( \_ -> - LSM.newTableWith (LSM.defaultTableConfig{LSM.confFencePointerIndex = LSM.OrdinaryIndex}) session - ) + (\_ -> LSM.newTable session) ( \tb -> do traceWith tracer TraceLedgerTablesHandleClose LSM.closeTable tb ) mapM_ (go table) $ chunks 1000 $ Map.toList values - pure res + pure (rk, table) where go table items = LSM.inserts table $ @@ -600,7 +601,7 @@ instance newHandleFromValues trcr reg res st = do table <- tableFromValuesMK trcr reg (sessionResource res) (forgetLedgerTables st) (ltprj st) - newLSMLedgerTablesHandle trcr reg table + newLSMLedgerTablesHandle trcr table snapshotManager _ res = Ouroboros.Consensus.Storage.LedgerDB.V2.LSM.snapshotManager (sessionResource res) @@ -731,7 +732,7 @@ mkLSMYieldArgs fp snapName mkFS mkGen _ reg = do (LSM.SnapshotLabel $ T.pack "UTxO table") ) LSM.closeTable - YieldLSM 1000 <$> newLSMLedgerTablesHandle nullTracer reg tb + YieldLSM 1000 <$> newLSMLedgerTablesHandle nullTracer tb -- | Create Sink arguments for LSM mkLSMSinkArgs :: diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs index 3953750861..bf5df9e2b4 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs @@ -413,23 +413,23 @@ implSyncWithLedger mpEnv = -- state didn't change. withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface registry) $ \is (MempoolLedgerDBView ls meFrk) -> do - eFrk <- meFrk - case eFrk of - -- This case should happen only if the tip has moved again, this time - -- to a separate fork, since the background thread saw a change in the - -- tip, which should happen very rarely - Left{} -> do - traceWith trcr TraceMempoolTipMovedBetweenSTMBlocks - pure (Nothing, is) - Right frk -> do - let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls - if pointHash (isTip is) == castHash (getTipHash ls) && isSlotNo is == slot - then do - -- The tip didn't change, put the same state. - traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is) - pure (Just (snapshotFromIS is), is) - else do - -- The tip changed, we have to revalidate + let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls + if pointHash (isTip is) == castHash (getTipHash ls) && isSlotNo is == slot + then do + -- The tip didn't change, put the same state. + traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is) + pure (Just (snapshotFromIS is), is) + else do + -- The tip changed, we have to revalidate + eFrk <- meFrk + case eFrk of + -- This case should happen only if the tip has moved again, this time + -- to a separate fork, since the background thread saw a change in the + -- tip, which should happen very rarely + Left{} -> do + traceWith trcr TraceMempoolTipMovedBetweenSTMBlocks + pure (Nothing, is) + Right frk -> do modifyMVar_ forkerMVar ( \oldFrk -> do diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs index d0e7bb1c9d..5013a4a628 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2.hs @@ -100,6 +100,7 @@ mkInitDb args getBlock snapManager getVolatileSuffix res = do lock <- RAWLock.new () forkers <- newTVarIO Map.empty nextForkerKey <- newTVarIO (ForkerKey 0) + ldbToClose <- newTVarIO [] let env = LedgerDBEnv { ldbSeq = varDB @@ -111,10 +112,12 @@ mkInitDb args getBlock snapManager getVolatileSuffix res = do , ldbCfg = lgrConfig , ldbHasFS = lgrHasFS , ldbResolveBlock = getBlock + , ldbRegistry = lgrRegistry , ldbQueryBatchSize = lgrQueryBatchSize , ldbOpenHandlesLock = lock , ldbGetVolatileSuffix = getVolatileSuffix , ldbResourceKeys = SomeResources res + , ldbToClose } h <- LDBHandle <$> newTVarIO (LedgerDBOpen env) pure $ implMkLedgerDb h snapManager @@ -185,7 +188,7 @@ mkInternals h snapManager = let selectWhereTo = case whereTo of TakeAtImmutableTip -> anchorHandle TakeAtVolatileTip -> currentHandle - withStateRef env (MkSolo . selectWhereTo) $ \(MkSolo (st, _)) -> + withStateRef env (MkSolo . selectWhereTo) $ \(MkSolo (_, st)) -> Monad.void $ takeSnapshot snapManager @@ -330,6 +333,7 @@ implGarbageCollect env slotNo = do atomically $ modifyTVar (ldbPrevApplied env) $ Set.dropWhileAntitone ((< slotNo) . realPointSlot) + mapM_ closeLedgerSeq =<< readTVarIO (ldbToClose env) -- It is safe to close the handles outside of the locked region, which reduces -- contention. See the docs of 'ldbOpenHandlesLock'. Monad.join $ RAWLock.withWriteAccess (ldbOpenHandlesLock env) $ \() -> do @@ -350,7 +354,7 @@ implTryTakeSnapshot :: implTryTakeSnapshot snapManager env mTime nrBlocks = if onDiskShouldTakeSnapshot (ldbSnapshotPolicy env) (uncurry (flip diffTime) <$> mTime) nrBlocks then do - withStateRef env (MkSolo . anchorHandle) $ \(MkSolo (st, _)) -> + withStateRef env (MkSolo . anchorHandle) $ \(MkSolo (_, st)) -> Monad.void $ takeSnapshot snapManager @@ -428,6 +432,13 @@ data LedgerDBEnv m l blk = LedgerDBEnv , ldbHasFS :: !(SomeHasFS m) , ldbResolveBlock :: !(ResolveBlock m blk) , ldbQueryBatchSize :: !QueryBatchSize + , ldbRegistry :: !(ResourceRegistry m) + -- ^ The registry of the LedgerDB, to give it to forkers to transfer committed + -- handles to the LedgerDB. + , ldbToClose :: !(StrictTVar m [LedgerSeq m l]) + -- ^ When committing forkers, the discarded part of the LedgerDB will be put + -- in this TVar such that the 'garbageCollect' function will release such + -- resources. , ldbOpenHandlesLock :: !(RAWLock m ()) -- ^ While holding a read lock (at least), all handles in the 'ldbSeq' are -- guaranteed to be open. During this time, the handle can be duplicated and @@ -562,13 +573,13 @@ getStateRef :: LedgerDBEnv m l blk -> ResourceRegistry m -> (LedgerSeq m l -> t (StateRef m l)) -> - m (t (StateRef m l, ResourceKey m)) + m (t (ResourceKey m, StateRef m l)) getStateRef ldbEnv reg project = RAWLock.withReadAccess (ldbOpenHandlesLock ldbEnv) $ \() -> do tst <- project <$> atomically (getVolatileLedgerSeq ldbEnv) for tst $ \st -> do - (resKey, tables') <- allocate reg (\_ -> duplicate $ tables st) close - pure (st{tables = tables'}, resKey) + (key, tables') <- duplicate (tables st) reg + pure (key, st{tables = tables'}) -- | Like 'StateRef', but takes care of closing the handle when the given action -- returns or errors. @@ -576,7 +587,7 @@ withStateRef :: (IOLike m, Traversable t, GetTip l) => LedgerDBEnv m l blk -> (LedgerSeq m l -> t (StateRef m l)) -> - (t (StateRef m l, ResourceKey m) -> m a) -> + (t (ResourceKey m, StateRef m l) -> m a) -> m a withStateRef ldbEnv project f = withRegistry $ \reg -> getStateRef ldbEnv reg project >>= f @@ -591,7 +602,7 @@ acquireAtTarget :: LedgerDBEnv m l blk -> Either Word64 (Target (Point blk)) -> ResourceRegistry m -> - m (Either GetForkerError (StateRef m l, ResourceKey m)) + m (Either GetForkerError (ResourceKey m, StateRef m l)) acquireAtTarget ldbEnv target reg = getStateRef ldbEnv reg $ \l -> case target of Right VolatileTip -> pure $ currentHandle l @@ -646,12 +657,11 @@ newForkerByRollback h rr n = getEnv h $ \ldbEnv -> closeForkerEnv :: IOLike m => ForkerEnv m l blk -> m () -closeForkerEnv ForkerEnv{foeResourcesToRelease = (lock, key, toRelease)} = - RAWLock.withWriteAccess lock $ - const $ do - Monad.join $ atomically (swapTVar toRelease (pure ())) - _ <- release key - pure ((), ()) +closeForkerEnv ForkerEnv{foeLedgerDbLock, foeCleanup, foeInitialHandleKey} = + RAWLock.withWriteAccess foeLedgerDbLock $ \() -> do + Monad.void $ release foeInitialHandleKey + Monad.join $ readTVarIO foeCleanup + pure ((), ()) getForkerEnv :: forall m l blk r. @@ -739,26 +749,25 @@ newForker :: LedgerDBHandle m l blk -> LedgerDBEnv m l blk -> ResourceRegistry m -> - (StateRef m l, ResourceKey m) -> + (ResourceKey m, StateRef m l) -> m (Forker m l blk) -newForker h ldbEnv rr (st, rk) = do +newForker h ldbEnv rr (rk, st) = do forkerKey <- atomically $ stateTVar (ldbNextForkerKey ldbEnv) $ \r -> (r, r + 1) let tr = LedgerDBForkerEvent . TraceForkerEventWithKey forkerKey >$< ldbTracer ldbEnv traceWith tr ForkerOpen lseqVar <- newTVarIO . LedgerSeq . AS.Empty $ st - -- The closing action that we allocate in the TVar from the start is not - -- strictly necessary if the caller uses a short-lived registry like the ones - -- in Chain selection or the forging loop. Just in case the user passes a - -- long-lived registry, we store such closing action to make sure the handle - -- is closed even under @forkerClose@ if the registry outlives the forker. - (k, toRelease) <- allocate rr (\_ -> newTVarIO (Monad.void (release rk))) (Monad.join . readTVarIO) + foeCleanup <- newTVarIO $ pure () let forkerEnv = ForkerEnv { foeLedgerSeq = lseqVar + , foeLedgerDbRegistry = ldbRegistry ldbEnv + , foeResourceRegistry = rr , foeSwitchVar = ldbSeq ldbEnv , foeTracer = tr - , foeResourcesToRelease = (ldbOpenHandlesLock ldbEnv, k, toRelease) , foeInitialHandleKey = rk + , foeCleanup + , foeLedgerDbLock = ldbOpenHandlesLock ldbEnv + , foeLedgerDbToClose = ldbToClose ldbEnv } atomically $ modifyTVar (ldbForkers ldbEnv) $ Map.insert forkerKey forkerEnv pure $ diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs index b252ae5da2..a1b9e77c49 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/Forker.hs @@ -19,8 +19,7 @@ module Ouroboros.Consensus.Storage.LedgerDB.V2.Forker , module Ouroboros.Consensus.Storage.LedgerDB.Forker ) where -import qualified Control.Monad as Monad -import Control.RAWLock hiding (read) +import Control.RAWLock (RAWLock) import Control.ResourceRegistry import Control.Tracer import Data.Maybe (fromMaybe) @@ -34,6 +33,7 @@ import Ouroboros.Consensus.Storage.LedgerDB.API import Ouroboros.Consensus.Storage.LedgerDB.Args import Ouroboros.Consensus.Storage.LedgerDB.Forker import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq +import Ouroboros.Consensus.Util (whenJust) import Ouroboros.Consensus.Util.CallStack import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.NormalForm.StrictTVar () @@ -49,13 +49,23 @@ data ForkerEnv m l blk = ForkerEnv -- ^ Local version of the LedgerSeq , foeSwitchVar :: !(StrictTVar m (LedgerSeq m l)) -- ^ This TVar is the same as the LedgerDB one + , foeLedgerDbRegistry :: !(ResourceRegistry m) + -- ^ The registry in the LedgerDB to move handles to in case we commit the + -- forker. + , foeLedgerDbToClose :: !(StrictTVar m [LedgerSeq m l]) , foeTracer :: !(Tracer m TraceForkerEvent) -- ^ Config - , foeResourcesToRelease :: !(RAWLock m (), ResourceKey m, StrictTVar m (m ())) - -- ^ Release the resources + , foeResourceRegistry :: !(ResourceRegistry m) + -- ^ The registry local to the forker , foeInitialHandleKey :: !(ResourceKey m) -- ^ Resource key for the initial handle to ensure it is released. See -- comments in 'implForkerCommit'. + , foeCleanup :: !(StrictTVar m (m ())) + -- ^ An action to run on cleanup. If the forker was not committed this will be + -- the trivial action. Otherwise it will move the required handles to the + -- LedgerDB and release the discarded ones. + , foeLedgerDbLock :: !(RAWLock m ()) + -- ^ 'ldbOpenHandlesLock'. } deriving Generic @@ -127,17 +137,15 @@ implForkerPush env newState = do st = forgetLedgerTables newState bracketOnError - (duplicate (tables $ currentHandle lseq)) - close - ( \newtbs -> do + (duplicate (tables $ currentHandle lseq) (foeResourceRegistry env)) + (release . fst) + ( \(_, newtbs) -> do pushDiffs newtbs st0 newState let lseq' = extend (StateRef st newtbs) lseq traceWith (foeTracer env) ForkerPushEnd - atomically $ do - writeTVar (foeLedgerSeq env) lseq' - modifyTVar ((\(_, _, r) -> r) $ foeResourcesToRelease env) (>> close newtbs) + atomically $ writeTVar (foeLedgerSeq env) lseq' ) implForkerCommit :: @@ -148,42 +156,41 @@ implForkerCommit env = do LedgerSeq lseq <- readTVar foeLedgerSeq let intersectionSlot = getTipSlot $ state $ AS.anchor lseq let predicate = (== getTipHash (state (AS.anchor lseq))) . getTipHash . state - closeDiscarded <- do + (transfer, ldbToClose) <- stateTVar foeSwitchVar ( \(LedgerSeq olddb) -> fromMaybe theImpossible $ do -- Split the selection at the intersection point. The snd component will -- have to be closed. - (olddb', toClose) <- AS.splitAfterMeasure intersectionSlot (either predicate predicate) olddb + (toKeepBase, toCloseLdb) <- AS.splitAfterMeasure intersectionSlot (either predicate predicate) olddb + (toCloseForker, toKeepTip) <- + AS.splitAfterMeasure intersectionSlot (either predicate predicate) lseq -- Join the prefix of the selection with the sequence in the forker - newdb <- AS.join (const $ const True) olddb' lseq - let closeDiscarded = do - -- Do /not/ close the anchor of @toClose@, as that is also the - -- tip of @olddb'@ which will be used in @newdb@. - case toClose of - AS.Empty _ -> pure () - _ AS.:< closeOld' -> closeLedgerSeq (LedgerSeq closeOld') - -- Finally, close the anchor of @lseq@ (which is a duplicate of - -- the head of @olddb'@). To close this handle, we have to - -- release the 'foeInitialHandleKey' as that one is registered - -- on the registry used to open the forker. Releasing it will - -- call 'close' on the handle which will call 'release' on the key - -- for the handle. - Monad.void $ release foeInitialHandleKey - pure (closeDiscarded, LedgerSeq newdb) + newdb <- AS.join (const $ const True) toKeepBase toKeepTip + -- Do /not/ close the anchor of @toClose@, as that is also the + -- tip of @olddb'@ which will be used in @newdb@. + let ldbToClose = case toCloseLdb of + AS.Empty _ -> Nothing + _ AS.:< closeOld' -> Just (LedgerSeq closeOld') + transferCommitted = do + closeLedgerSeq (LedgerSeq toCloseForker) + + -- All the other remaining handles are transferred to the LedgerDB registry + keys <- transferRegistry foeResourceRegistry foeLedgerDbRegistry + mapM_ (\(k, v) -> transfer (tables v) k) $ zip keys (AS.toOldestFirst toKeepTip) + + pure ((transferCommitted, ldbToClose), LedgerSeq newdb) ) - - -- We are discarding the previous value in the TVar because we had accumulated - -- actions for closing the states pushed to the forker. As we are committing - -- those we have to close the ones discarded in this function and forget about - -- those releasing actions. - writeTVar ((\(_, _, r) -> r) $ foeResourcesToRelease) closeDiscarded + whenJust ldbToClose (modifyTVar foeLedgerDbToClose . (:)) + writeTVar foeCleanup transfer where ForkerEnv { foeLedgerSeq , foeSwitchVar - , foeResourcesToRelease - , foeInitialHandleKey + , foeResourceRegistry + , foeLedgerDbRegistry + , foeCleanup + , foeLedgerDbToClose } = env theImpossible = diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs index be6ec8a080..ffcfc064fe 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/InMemory.hs @@ -116,6 +116,7 @@ newInMemoryLedgerTablesHandle tracer someFS@(SomeHasFS hasFS) l = do , pushDiffs = implPushDiffs tv , takeHandleSnapshot = implTakeHandleSnapshot tv hasFS , tablesSize = implTablesSize tv + , transfer = const (pure ()) } {-# INLINE implClose #-} @@ -147,10 +148,15 @@ implDuplicate :: Tracer m LedgerDBV2Trace -> StrictTVar m (LedgerTablesHandleState l) -> SomeHasFS m -> - m (LedgerTablesHandle m l) -implDuplicate tracer tv someFS = do + ResourceRegistry m -> + m (ResourceKey m, LedgerTablesHandle m l) +implDuplicate tracer tv someFS rr = do hs <- readTVarIO tv - !x <- guardClosed hs $ newInMemoryLedgerTablesHandle tracer someFS + !x <- guardClosed hs $ \v -> + allocate + rr + (\_ -> newInMemoryLedgerTablesHandle tracer someFS v) + close pure x implRead :: diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs index 774c35b5e6..49ce1df73e 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/V2/LedgerSeq.hs @@ -83,7 +83,10 @@ import Prelude hiding (read) -- | The interface fulfilled by handles on both the InMemory and LSM handles. data LedgerTablesHandle m l = LedgerTablesHandle { close :: !(m ()) - , duplicate :: !(m (LedgerTablesHandle m l)) + , transfer :: !(ResourceKey m -> m ()) + -- ^ Update the closing action in this handle with a new resource key, as the + -- handle has moved to a different registry. + , duplicate :: !(ResourceRegistry m -> m (ResourceKey m, LedgerTablesHandle m l)) -- ^ Create a copy of the handle. -- -- A duplicated handle must provide access to all the data that was there in @@ -249,10 +252,10 @@ reapplyBlock :: ResourceRegistry m -> LedgerSeq m l -> m (StateRef m l) -reapplyBlock evs cfg b _rr db = do +reapplyBlock evs cfg b rr db = do let ks = getBlockKeySets b StateRef st tbs = currentHandle db - newtbs <- duplicate tbs + (_, newtbs) <- duplicate tbs rr vals <- read newtbs st ks let st' = tickThenReapply evs cfg b (st `withLedgerTables` vals) newst = forgetLedgerTables st' diff --git a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs index 1c45dae1be..f1e7d4ac37 100644 --- a/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs +++ b/ouroboros-consensus/test/storage-test/Test/Ouroboros/Storage/LedgerDB/StateMachine.hs @@ -614,6 +614,8 @@ instance RunModel Model (StateT Environment IO) where let args = mkArgs secParam salt -- TODO after a drop and restore we restart the db but the session has been closed below where I wrote blahblahblah openLedgerDB (argFlavorArgs args) chainDb (argLedgerDbCfg args) fs rr + lift $ + garbageCollect ldb . fromWithOrigin 0 . pointSlot . getTip =<< atomically (getImmutableTip ldb) put (Environment ldb testInternals chainDb mkArgs fs getNumOpenHandles cleanup rr) pure $ pure () perform _ WipeLedgerDB _ = do @@ -642,6 +644,7 @@ instance RunModel Model (StateT Environment IO) where (reverse (map blockRealPoint blks) ++) . drop (fromIntegral n) atomically (forkerCommit forker) forkerClose forker + garbageCollect ldb . fromWithOrigin 0 . pointSlot . getTip =<< atomically (getImmutableTip ldb) pure $ pure () ValidateExceededRollBack{} -> pure $ Left ErrorValidateExceededRollback ValidateLedgerError (AnnLedgerError forker _ err) -> forkerClose forker >> error ("Unexpected ledger error" <> show err)