Skip to content

Commit 506d793

Browse files
committed
Carry a forker in the mempool
1 parent 11bad3a commit 506d793

File tree

9 files changed

+113
-151
lines changed

9 files changed

+113
-151
lines changed

ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ initInternalState
453453
mempool <-
454454
openMempool
455455
registry
456-
(chainDBLedgerInterface chainDB)
456+
(chainDBLedgerInterface chainDB registry)
457457
(configLedger cfg)
458458
mempoolCapacityOverride
459459
(mempoolTracer tracers)

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Impl/Common.hs

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ module Ouroboros.Consensus.Mempool.Impl.Common
3939
, tickLedgerState
4040
) where
4141

42-
import Control.Concurrent.Class.MonadMVar (MVar, newMVar)
4342
import Control.Concurrent.Class.MonadSTM.Strict.TMVar (newTMVarIO)
4443
import Control.Monad.Trans.Except (runExcept)
44+
import Control.ResourceRegistry
4545
import Control.Tracer
4646
import qualified Data.Foldable as Foldable
4747
import qualified Data.List.NonEmpty as NE
@@ -61,8 +61,11 @@ import Ouroboros.Consensus.Mempool.TxSeq (TxSeq (..), TxTicket (..))
6161
import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq
6262
import Ouroboros.Consensus.Storage.ChainDB (ChainDB)
6363
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
64+
import Ouroboros.Consensus.Storage.LedgerDB.Forker
6465
import Ouroboros.Consensus.Util.Enclose (EnclosingTimed)
6566
import Ouroboros.Consensus.Util.IOLike hiding (newMVar)
67+
import Ouroboros.Consensus.Util.NormalForm.StrictMVar
68+
import Ouroboros.Network.Protocol.LocalStateQuery.Type
6669

6770
{-------------------------------------------------------------------------------
6871
Internal State
@@ -188,27 +191,27 @@ initInternalState capacityOverride lastTicketNo cfg slot st =
188191

189192
-- | Abstract interface needed to run a Mempool.
190193
data LedgerInterface m blk = LedgerInterface
191-
{ getCurrentLedgerState :: STM m (LedgerState blk EmptyMK)
192-
-- ^ Get the current tip of the LedgerDB.
193-
, getLedgerTablesAtFor ::
194-
Point blk ->
195-
LedgerTables (LedgerState blk) KeysMK ->
196-
m (Maybe (LedgerTables (LedgerState blk) ValuesMK))
197-
-- ^ Get values at the given point on the chain. Returns Nothing if the
198-
-- anchor moved or if the state is not found on the ledger db.
194+
{ getCurrentLedgerState ::
195+
STM m (LedgerState blk EmptyMK, m (Either GetForkerError (ReadOnlyForker' m blk)))
196+
-- ^ Get the current tip of the LedgerDB and an action to get a forker there.
199197
}
200198

201199
-- | Create a 'LedgerInterface' from a 'ChainDB'.
202200
chainDBLedgerInterface ::
203-
IOLike m =>
204-
ChainDB m blk -> LedgerInterface m blk
205-
chainDBLedgerInterface chainDB =
206-
LedgerInterface
207-
{ getCurrentLedgerState =
208-
ledgerState <$> ChainDB.getCurrentLedger chainDB
209-
, getLedgerTablesAtFor = \pt keys ->
210-
fmap castLedgerTables <$> ChainDB.getLedgerTablesAtFor chainDB pt (castLedgerTables keys)
211-
}
201+
(IOLike m, IsLedger (LedgerState blk)) =>
202+
ChainDB m blk ->
203+
ResourceRegistry m ->
204+
LedgerInterface m blk
205+
chainDBLedgerInterface chainDB registry =
206+
let getStateAndForker = do
207+
st <- ChainDB.getCurrentLedger chainDB
208+
pure
209+
( ledgerState st
210+
, ChainDB.getReadOnlyForkerAtPoint chainDB registry (SpecificPoint (castPoint $ getTip st))
211+
)
212+
in LedgerInterface
213+
{ getCurrentLedgerState = getStateAndForker
214+
}
212215

213216
{-------------------------------------------------------------------------------
214217
Mempool environment
@@ -219,10 +222,11 @@ chainDBLedgerInterface chainDB =
219222
-- different operations.
220223
data MempoolEnv m blk = MempoolEnv
221224
{ mpEnvLedger :: LedgerInterface m blk
225+
, mpEnvForker :: StrictMVar m (ReadOnlyForker' m blk)
222226
, mpEnvLedgerCfg :: LedgerConfig blk
223227
, mpEnvStateVar :: StrictTMVar m (InternalState blk)
224-
, mpEnvAddTxsRemoteFifo :: MVar m ()
225-
, mpEnvAddTxsAllFifo :: MVar m ()
228+
, mpEnvAddTxsRemoteFifo :: StrictMVar m ()
229+
, mpEnvAddTxsAllFifo :: StrictMVar m ()
226230
, mpEnvTracer :: Tracer m (TraceEventMempool blk)
227231
, mpEnvCapacityOverride :: MempoolCapacityBytesOverride
228232
}
@@ -238,23 +242,31 @@ initMempoolEnv ::
238242
Tracer m (TraceEventMempool blk) ->
239243
m (MempoolEnv m blk)
240244
initMempoolEnv ledgerInterface cfg capacityOverride tracer = do
241-
st <- atomically $ getCurrentLedgerState ledgerInterface
242-
let (slot, st') = tickLedgerState cfg (ForgeInUnknownSlot st)
243-
isVar <-
244-
newTMVarIO $
245-
initInternalState capacityOverride TxSeq.zeroTicketNo cfg slot st'
246-
addTxRemoteFifo <- newMVar ()
247-
addTxAllFifo <- newMVar ()
248-
return
249-
MempoolEnv
250-
{ mpEnvLedger = ledgerInterface
251-
, mpEnvLedgerCfg = cfg
252-
, mpEnvStateVar = isVar
253-
, mpEnvAddTxsRemoteFifo = addTxRemoteFifo
254-
, mpEnvAddTxsAllFifo = addTxAllFifo
255-
, mpEnvTracer = tracer
256-
, mpEnvCapacityOverride = capacityOverride
257-
}
245+
(st, meFrk) <- atomically $ getCurrentLedgerState ledgerInterface
246+
eFrk <- meFrk
247+
case eFrk of
248+
-- This should happen very rarely, if between getting the state and getting
249+
-- the forker, the ledgerdb has changed. We just retry here.
250+
Left{} -> initMempoolEnv ledgerInterface cfg capacityOverride tracer
251+
Right frk -> do
252+
frkMVar <- newMVar frk
253+
let (slot, st') = tickLedgerState cfg (ForgeInUnknownSlot st)
254+
isVar <-
255+
newTMVarIO $
256+
initInternalState capacityOverride TxSeq.zeroTicketNo cfg slot st'
257+
addTxRemoteFifo <- newMVar ()
258+
addTxAllFifo <- newMVar ()
259+
return
260+
MempoolEnv
261+
{ mpEnvLedger = ledgerInterface
262+
, mpEnvLedgerCfg = cfg
263+
, mpEnvForker = frkMVar
264+
, mpEnvStateVar = isVar
265+
, mpEnvAddTxsRemoteFifo = addTxRemoteFifo
266+
, mpEnvAddTxsAllFifo = addTxAllFifo
267+
, mpEnvTracer = tracer
268+
, mpEnvCapacityOverride = capacityOverride
269+
}
258270

259271
{-------------------------------------------------------------------------------
260272
Ticking the ledger state

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Init.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ forkSyncStateOnTipPointChange registry menv =
7676
-- Using the tip ('Point') allows for quicker equality checks
7777
getCurrentTip :: STM m (Point blk)
7878
getCurrentTip =
79-
ledgerTipPoint
79+
ledgerTipPoint . fst
8080
<$> getCurrentLedgerState (mpEnvLedger menv)
8181

8282
-- | Unlike 'openMempool', this function does not fork a background thread

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs

Lines changed: 53 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ module Ouroboros.Consensus.Mempool.Update
1010
) where
1111

1212
import Cardano.Slotting.Slot
13-
import Control.Concurrent.Class.MonadMVar (withMVar)
14-
import Control.Monad (void)
1513
import Control.Monad.Except (runExcept)
1614
import Control.Tracer
1715
import qualified Data.Foldable as Foldable
@@ -20,7 +18,6 @@ import qualified Data.List.NonEmpty as NE
2018
import Data.Maybe (fromMaybe)
2119
import qualified Data.Measure as Measure
2220
import qualified Data.Set as Set
23-
import Data.Void
2421
import Ouroboros.Consensus.HeaderValidation
2522
import Ouroboros.Consensus.Ledger.Abstract
2623
import Ouroboros.Consensus.Ledger.SupportsMempool
@@ -29,9 +26,11 @@ import Ouroboros.Consensus.Mempool.Capacity
2926
import Ouroboros.Consensus.Mempool.Impl.Common
3027
import Ouroboros.Consensus.Mempool.TxSeq (TxTicket (..))
3128
import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq
29+
import Ouroboros.Consensus.Storage.LedgerDB.Forker hiding (trace)
3230
import Ouroboros.Consensus.Util (whenJust)
3331
import Ouroboros.Consensus.Util.Enclose
3432
import Ouroboros.Consensus.Util.IOLike hiding (withMVar)
33+
import Ouroboros.Consensus.Util.NormalForm.StrictMVar
3534
import Ouroboros.Consensus.Util.STM
3635
import Ouroboros.Network.Block
3736

@@ -154,7 +153,7 @@ doAddTx mpEnv wti tx =
154153
doAddTx' Nothing
155154
where
156155
MempoolEnv
157-
{ mpEnvLedger = ldgrInterface
156+
{ mpEnvForker = forker
158157
, mpEnvLedgerCfg = cfg
159158
, mpEnvStateVar = istate
160159
, mpEnvTracer = trcr
@@ -172,31 +171,14 @@ doAddTx mpEnv wti tx =
172171

173172
res <- withTMVarAnd istate additionalCheck $
174173
\is () -> do
175-
mTbs <- getLedgerTablesAtFor ldgrInterface (isTip is) (getTransactionKeySets tx)
176-
case mTbs of
177-
Just tbs -> do
178-
traceWith trcr $ TraceMempoolLedgerFound (isTip is)
179-
case pureTryAddTx cfg wti tx is tbs of
180-
NotEnoughSpaceLeft -> do
181-
pure (Retry (isMempoolSize is), is)
182-
Processed outcome@(TransactionProcessingResult is' _ _) -> do
183-
pure (OK outcome, fromMaybe is is')
184-
Nothing -> do
185-
traceWith trcr $ TraceMempoolLedgerNotFound (isTip is)
186-
-- We couldn't retrieve the values because the state is no longer on
187-
-- the db. We need to resync.
188-
pure (Resync, is)
189-
case res of
190-
Retry s' -> doAddTx' (Just s')
191-
OK outcome -> pure outcome
192-
Resync -> do
193-
void $ implSyncWithLedger mpEnv
194-
doAddTx' mbPrevSize
195-
196-
data WithTMVarOutcome retry ok
197-
= Retry !retry
198-
| OK ok
199-
| Resync
174+
frkr <- readMVar forker
175+
tbs <- castLedgerTables <$> roforkerReadTables frkr (castLedgerTables $ getTransactionKeySets tx)
176+
case pureTryAddTx cfg wti tx is tbs of
177+
NotEnoughSpaceLeft -> do
178+
pure (Left (isMempoolSize is), is)
179+
Processed outcome@(TransactionProcessingResult is' _ _) -> do
180+
pure (Right outcome, fromMaybe is is')
181+
either (doAddTx' . Just) pure res
200182

201183
pureTryAddTx ::
202184
( LedgerSupportsMempool blk
@@ -324,9 +306,9 @@ implRemoveTxsEvenIfValid ::
324306
MempoolEnv m blk ->
325307
NE.NonEmpty (GenTxId blk) ->
326308
m ()
327-
implRemoveTxsEvenIfValid mpEnv toRemove = do
328-
(out :: WithTMVarOutcome Void ()) <- withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface) $
329-
\is ls -> do
309+
implRemoveTxsEvenIfValid mpEnv toRemove =
310+
withTMVar istate $
311+
\is -> do
330312
let toKeep =
331313
filter
332314
( (`notElem` Set.fromList (NE.toList toRemove))
@@ -335,33 +317,25 @@ implRemoveTxsEvenIfValid mpEnv toRemove = do
335317
. txTicketTx
336318
)
337319
(TxSeq.toList $ isTxs is)
338-
(slot, ticked) = tickLedgerState cfg (ForgeInUnknownSlot ls)
339320
toKeep' = Foldable.foldMap' (getTransactionKeySets . txForgetValidated . TxSeq.txTicketTx) toKeep
340-
mTbs <- getLedgerTablesAtFor ldgrInterface (castPoint (getTip ls)) toKeep'
341-
case mTbs of
342-
Nothing -> pure (Resync, is)
343-
Just tbs -> do
344-
let (is', t) =
345-
pureRemoveTxs
346-
capacityOverride
347-
cfg
348-
slot
349-
ticked
350-
tbs
351-
(isLastTicketNo is)
352-
toKeep
353-
toRemove
354-
traceWith trcr t
355-
pure (OK (), is')
356-
case out of
357-
Resync -> do
358-
void $ implSyncWithLedger mpEnv
359-
implRemoveTxsEvenIfValid mpEnv toRemove
360-
OK () -> pure ()
321+
frkr <- readMVar forker
322+
tbs <- castLedgerTables <$> roforkerReadTables frkr (castLedgerTables toKeep')
323+
let (is', t) =
324+
pureRemoveTxs
325+
capacityOverride
326+
cfg
327+
(isSlotNo is)
328+
(isLedgerState is)
329+
tbs
330+
(isLastTicketNo is)
331+
toKeep
332+
toRemove
333+
traceWith trcr t
334+
pure ((), is')
361335
where
362336
MempoolEnv
363337
{ mpEnvStateVar = istate
364-
, mpEnvLedger = ldgrInterface
338+
, mpEnvForker = forker
365339
, mpEnvTracer = trcr
366340
, mpEnvLedgerCfg = cfg
367341
, mpEnvCapacityOverride = capacityOverride
@@ -415,21 +389,28 @@ implSyncWithLedger ::
415389
MempoolEnv m blk ->
416390
m (MempoolSnapshot blk)
417391
implSyncWithLedger mpEnv = encloseTimedWith (TraceMempoolSynced >$< mpEnvTracer mpEnv) $ do
418-
(res :: WithTMVarOutcome Void (MempoolSnapshot blk)) <-
392+
res <-
419393
withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface) $
420-
\is ls -> do
421-
let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls
422-
if pointHash (isTip is) == castHash (getTipHash ls) && isSlotNo is == slot
423-
then do
424-
-- The tip didn't change, put the same state.
425-
traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is)
426-
pure (OK (snapshotFromIS is), is)
427-
else do
428-
-- We need to revalidate
429-
let pt = castPoint (getTip ls)
430-
mTbs <- getLedgerTablesAtFor ldgrInterface pt (isTxKeys is)
431-
case mTbs of
432-
Just tbs -> do
394+
\is (ls, meFrk) -> do
395+
eFrk <- meFrk
396+
case eFrk of
397+
Left{} -> pure (Left (), is)
398+
Right frk -> do
399+
let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls
400+
if pointHash (isTip is) == castHash (getTipHash ls) && isSlotNo is == slot
401+
then do
402+
-- The tip didn't change, put the same state.
403+
traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is)
404+
pure (Right (snapshotFromIS is), is)
405+
else do
406+
-- We need to revalidate
407+
modifyMVar_
408+
forkerMVar
409+
( \oldFrk -> do
410+
roforkerClose oldFrk
411+
pure frk
412+
)
413+
tbs <- castLedgerTables <$> roforkerReadTables frk (castLedgerTables $ isTxKeys is)
433414
let (is', mTrace) =
434415
pureSyncWithLedger
435416
capacityOverride
@@ -439,16 +420,12 @@ implSyncWithLedger mpEnv = encloseTimedWith (TraceMempoolSynced >$< mpEnvTracer
439420
tbs
440421
is
441422
whenJust mTrace (traceWith trcr)
442-
pure (OK (snapshotFromIS is'), is')
443-
Nothing -> do
444-
-- If the point is gone, resync
445-
pure (Resync, is)
446-
case res of
447-
OK v -> pure v
448-
Resync -> implSyncWithLedger mpEnv
423+
pure (Right (snapshotFromIS is'), is')
424+
either (const $ implSyncWithLedger mpEnv) pure res
449425
where
450426
MempoolEnv
451427
{ mpEnvStateVar = istate
428+
, mpEnvForker = forkerMVar
452429
, mpEnvLedger = ldgrInterface
453430
, mpEnvTracer = trcr
454431
, mpEnvLedgerCfg = cfg

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -383,14 +383,6 @@ data ChainDB m blk = ChainDB
383383
, getChainSelStarvation :: STM m ChainSelStarvation
384384
-- ^ Whether ChainSel is currently starved, or when was last time it
385385
-- stopped being starved.
386-
, getLedgerTablesAtFor ::
387-
Point blk ->
388-
LedgerTables (ExtLedgerState blk) KeysMK ->
389-
m (Maybe (LedgerTables (ExtLedgerState blk) ValuesMK))
390-
-- ^ Read ledger tables at a given point on the chain, if it exists.
391-
--
392-
-- This is intended to be used by the mempool to hydrate a ledger state at
393-
-- a specific point.
394386
, getStatistics :: m (Maybe Statistics)
395387
-- ^ Get statistics from the LedgerDB, in particular the number of entries
396388
-- in the tables.

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,6 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
271271
, getPastLedger = getEnvSTM1 h Query.getPastLedger
272272
, getHeaderStateHistory = getEnvSTM h Query.getHeaderStateHistory
273273
, getReadOnlyForkerAtPoint = getEnv2 h Query.getReadOnlyForkerAtPoint
274-
, getLedgerTablesAtFor = getEnv2 h Query.getLedgerTablesAtFor
275274
, getStatistics = getEnv h Query.getStatistics
276275
}
277276
addBlockTestFuse <- newFuse "test chain selection"

0 commit comments

Comments
 (0)