Skip to content

Commit 9b0ea2a

Browse files
committed
limit cache when close to tip
1 parent ec27bb5 commit 9b0ea2a

File tree

9 files changed

+179
-113
lines changed

9 files changed

+179
-113
lines changed

cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,11 @@ migrateBootstrapUTxO syncEnv = do
7979
where
8080
trce = getTrace syncEnv
8181

82-
storeUTxOFromLedger :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> ExtLedgerState CardanoBlock -> ExceptT SyncNodeError (ReaderT SqlBackend m) ()
82+
storeUTxOFromLedger ::
83+
(MonadBaseControl IO m, MonadIO m) =>
84+
SyncEnv ->
85+
ExtLedgerState CardanoBlock ->
86+
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
8387
storeUTxOFromLedger env st = case ledgerState st of
8488
LedgerStateBabbage bts -> storeUTxO env (getUTxO bts)
8589
LedgerStateConway stc -> storeUTxO env (getUTxO stc)

cardano-db-sync/src/Cardano/DbSync/Cache.hs

Lines changed: 134 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module Cardano.DbSync.Cache (
2222
queryStakeAddrWithCache,
2323
queryTxIdWithCache,
2424
rollbackCache,
25+
optimiseCaches,
2526
tryUpdateCacheTx,
2627

2728
-- * CacheStatistics
@@ -73,18 +74,35 @@ import Ouroboros.Consensus.Cardano.Block (StandardCrypto)
7374
-- NOTE: Other tables are not cleaned up since they are not rollbacked.
7475
rollbackCache :: MonadIO m => CacheStatus -> DB.BlockId -> ReaderT SqlBackend m ()
7576
rollbackCache NoCache _ = pure ()
76-
rollbackCache (ActiveCache cache) blockId = do
77+
rollbackCache (ActiveCache _ cache) blockId = do
7778
liftIO $ do
7879
atomically $ writeTVar (cPrevBlock cache) Nothing
7980
atomically $ modifyTVar (cDatum cache) LRU.cleanup
8081
atomically $ modifyTVar (cTxIds cache) FIFO.cleanupCache
8182
void $ rollbackMapEpochInCache cache blockId
8283

84+
-- When syncing and we're close to the tip, we can optimise the caches.
85+
optimiseCaches :: MonadIO m => CacheStatus -> ReaderT SqlBackend m CacheStatus
86+
optimiseCaches c@(ActiveCache isCacheOptomised cache) = do
87+
if isCacheOptomised
88+
then liftIO $ do
89+
-- empty caches not to be used anymore
90+
atomically $ modifyTVar (cTxIds cache) FIFO.cleanupCache
91+
atomically $ writeTVar (cStake cache) (StakeCache Map.empty (LRU.empty 0))
92+
atomically $ modifyTVar (cDatum cache) (LRU.optimise 0)
93+
-- empty then limit the capacity of the cache
94+
atomically $ writeTVar (cMultiAssets cache) (LRU.empty 50000)
95+
-- leaving the following caches as they are:
96+
-- cPools, cPrevBlock, Cstats, cEpoch
97+
pure c
98+
else pure c
99+
optimiseCaches c = pure c
100+
83101
getCacheStatistics :: CacheStatus -> IO CacheStatistics
84102
getCacheStatistics cs =
85103
case cs of
86104
NoCache -> pure initCacheStatistics
87-
ActiveCache ci -> readTVarIO (cStats ci)
105+
ActiveCache _ ci -> readTVarIO (cStats ci)
88106

89107
queryOrInsertRewardAccount ::
90108
(MonadBaseControl IO m, MonadIO m) =>
@@ -150,34 +168,36 @@ queryStakeAddrWithCacheRetBs ::
150168
queryStakeAddrWithCacheRetBs _trce cache cacheUA ra@(Ledger.RewardAccount _ cred) = do
151169
let bs = Ledger.serialiseRewardAccount ra
152170
case cache of
153-
NoCache -> do
154-
mapLeft (,bs) <$> resolveStakeAddress bs
155-
ActiveCache ci -> do
156-
stakeCache <- liftIO $ readTVarIO (cStake ci)
157-
case queryStakeCache cred stakeCache of
158-
Just (addrId, stakeCache') -> do
159-
liftIO $ hitCreds (cStats ci)
160-
case cacheUA of
161-
EvictAndUpdateCache -> do
162-
liftIO $ atomically $ writeTVar (cStake ci) $ deleteStakeCache cred stakeCache'
163-
pure $ Right addrId
164-
_other -> do
165-
liftIO $ atomically $ writeTVar (cStake ci) stakeCache'
166-
pure $ Right addrId
167-
Nothing -> do
168-
queryRes <- mapLeft (,bs) <$> resolveStakeAddress bs
169-
liftIO $ missCreds (cStats ci)
170-
case queryRes of
171-
Left _ -> pure queryRes
172-
Right stakeAddrsId -> do
173-
let !stakeCache' = case cacheUA of
174-
UpdateCache -> stakeCache {scLruCache = LRU.insert cred stakeAddrsId (scLruCache stakeCache)}
175-
UpdateCacheStrong -> stakeCache {scStableCache = Map.insert cred stakeAddrsId (scStableCache stakeCache)}
176-
_ -> stakeCache
177-
liftIO $
178-
atomically $
179-
writeTVar (cStake ci) stakeCache'
180-
pure $ Right stakeAddrsId
171+
NoCache -> mapLeft (,bs) <$> resolveStakeAddress bs
172+
ActiveCache shouldOptomiseCache ci -> do
173+
if shouldOptomiseCache
174+
then mapLeft (,bs) <$> resolveStakeAddress bs
175+
else do
176+
stakeCache <- liftIO $ readTVarIO (cStake ci)
177+
case queryStakeCache cred stakeCache of
178+
Just (addrId, stakeCache') -> do
179+
liftIO $ hitCreds (cStats ci)
180+
case cacheUA of
181+
EvictAndUpdateCache -> do
182+
liftIO $ atomically $ writeTVar (cStake ci) $ deleteStakeCache cred stakeCache'
183+
pure $ Right addrId
184+
_other -> do
185+
liftIO $ atomically $ writeTVar (cStake ci) stakeCache'
186+
pure $ Right addrId
187+
Nothing -> do
188+
queryRes <- mapLeft (,bs) <$> resolveStakeAddress bs
189+
liftIO $ missCreds (cStats ci)
190+
case queryRes of
191+
Left _ -> pure queryRes
192+
Right stakeAddrsId -> do
193+
let !stakeCache' = case cacheUA of
194+
UpdateCache -> stakeCache {scLruCache = LRU.insert cred stakeAddrsId (scLruCache stakeCache)}
195+
UpdateCacheStrong -> stakeCache {scStableCache = Map.insert cred stakeAddrsId (scStableCache stakeCache)}
196+
_ -> stakeCache
197+
liftIO $
198+
atomically $
199+
writeTVar (cStake ci) stakeCache'
200+
pure $ Right stakeAddrsId
181201

182202
-- | True if it was found in LRU
183203
queryStakeCache :: StakeCred -> StakeCache -> Maybe (DB.StakeAddressId, StakeCache)
@@ -204,7 +224,7 @@ queryPoolKeyWithCache cache cacheUA hsh =
204224
case mPhId of
205225
Nothing -> pure $ Left (DB.DbLookupMessage "PoolKeyHash")
206226
Just phId -> pure $ Right phId
207-
ActiveCache ci -> do
227+
ActiveCache _ ci -> do
208228
mp <- liftIO $ readTVarIO (cPools ci)
209229
case Map.lookup hsh mp of
210230
Just phId -> do
@@ -244,7 +264,7 @@ insertPoolKeyWithCache cache cacheUA pHash =
244264
{ DB.poolHashHashRaw = Generic.unKeyHashRaw pHash
245265
, DB.poolHashView = Generic.unKeyHashView pHash
246266
}
247-
ActiveCache ci -> do
267+
ActiveCache _ ci -> do
248268
mp <- liftIO $ readTVarIO (cPools ci)
249269
case Map.lookup pHash mp of
250270
Just phId -> do
@@ -306,26 +326,31 @@ queryMAWithCache ::
306326
ReaderT SqlBackend m (Either (ByteString, ByteString) DB.MultiAssetId)
307327
queryMAWithCache cache policyId asset =
308328
case cache of
309-
NoCache -> do
329+
NoCache -> queryDb
330+
ActiveCache isCacheOptomised ci -> do
331+
if isCacheOptomised
332+
then queryDb
333+
else do
334+
mp <- liftIO $ readTVarIO (cMultiAssets ci)
335+
case LRU.lookup (policyId, asset) mp of
336+
Just (maId, mp') -> do
337+
liftIO $ hitMAssets (cStats ci)
338+
liftIO $ atomically $ writeTVar (cMultiAssets ci) mp'
339+
pure $ Right maId
340+
Nothing -> do
341+
liftIO $ missMAssets (cStats ci)
342+
-- miss. The lookup doesn't change the cache on a miss.
343+
let !policyBs = Generic.unScriptHash $ policyID policyId
344+
let !assetNameBs = Generic.unAssetName asset
345+
maId <- maybe (Left (policyBs, assetNameBs)) Right <$> DB.queryMultiAssetId policyBs assetNameBs
346+
whenRight maId $
347+
liftIO . atomically . modifyTVar (cMultiAssets ci) . LRU.insert (policyId, asset)
348+
pure maId
349+
where
350+
queryDb = do
310351
let !policyBs = Generic.unScriptHash $ policyID policyId
311352
let !assetNameBs = Generic.unAssetName asset
312353
maybe (Left (policyBs, assetNameBs)) Right <$> DB.queryMultiAssetId policyBs assetNameBs
313-
ActiveCache ci -> do
314-
mp <- liftIO $ readTVarIO (cMultiAssets ci)
315-
case LRU.lookup (policyId, asset) mp of
316-
Just (maId, mp') -> do
317-
liftIO $ hitMAssets (cStats ci)
318-
liftIO $ atomically $ writeTVar (cMultiAssets ci) mp'
319-
pure $ Right maId
320-
Nothing -> do
321-
liftIO $ missMAssets (cStats ci)
322-
-- miss. The lookup doesn't change the cache on a miss.
323-
let !policyBs = Generic.unScriptHash $ policyID policyId
324-
let !assetNameBs = Generic.unAssetName asset
325-
maId <- maybe (Left (policyBs, assetNameBs)) Right <$> DB.queryMultiAssetId policyBs assetNameBs
326-
whenRight maId $
327-
liftIO . atomically . modifyTVar (cMultiAssets ci) . LRU.insert (policyId, asset)
328-
pure maId
329354

330355
queryPrevBlockWithCache ::
331356
MonadIO m =>
@@ -336,7 +361,7 @@ queryPrevBlockWithCache ::
336361
queryPrevBlockWithCache msg cache hsh =
337362
case cache of
338363
NoCache -> liftLookupFail msg $ DB.queryBlockId hsh
339-
ActiveCache ci -> do
364+
ActiveCache _ ci -> do
340365
mCachedPrev <- liftIO $ readTVarIO (cPrevBlock ci)
341366
case mCachedPrev of
342367
-- if the cached block matches the requested hash, we return its db id.
@@ -365,27 +390,30 @@ queryTxIdWithCache cache txIdLedger = do
365390
case cache of
366391
-- Direct database query if no cache.
367392
NoCache -> DB.queryTxId txHash
368-
ActiveCache cacheInternal -> do
369-
-- Read current cache state.
370-
cacheTx <- liftIO $ readTVarIO (cTxIds cacheInternal)
371-
372-
case FIFO.lookup txIdLedger cacheTx of
373-
-- Cache hit, return the transaction ID.
374-
Just txId -> do
375-
liftIO $ hitTxIds (cStats cacheInternal)
376-
pure $ Right txId
377-
-- Cache miss.
378-
Nothing -> do
379-
eTxId <- DB.queryTxId txHash
380-
liftIO $ missTxIds (cStats cacheInternal)
381-
case eTxId of
382-
Right txId -> do
383-
-- Update cache.
384-
liftIO $ atomically $ modifyTVar (cTxIds cacheInternal) $ FIFO.insert txIdLedger txId
385-
-- Return ID after updating cache.
393+
ActiveCache isCacheOptomised cacheInternal -> do
394+
if isCacheOptomised
395+
then DB.queryTxId txHash
396+
else do
397+
-- Read current cache state.
398+
cacheTx <- liftIO $ readTVarIO (cTxIds cacheInternal)
399+
400+
case FIFO.lookup txIdLedger cacheTx of
401+
-- Cache hit, return the transaction ID.
402+
Just txId -> do
403+
liftIO $ hitTxIds (cStats cacheInternal)
386404
pure $ Right txId
387-
-- Return lookup failure.
388-
Left _ -> pure $ Left $ DB.DbLookupTxHash txHash
405+
-- Cache miss.
406+
Nothing -> do
407+
eTxId <- DB.queryTxId txHash
408+
liftIO $ missTxIds (cStats cacheInternal)
409+
case eTxId of
410+
Right txId -> do
411+
-- Update cache.
412+
liftIO $ atomically $ modifyTVar (cTxIds cacheInternal) $ FIFO.insert txIdLedger txId
413+
-- Return ID after updating cache.
414+
pure $ Right txId
415+
-- Return lookup failure.
416+
Left _ -> pure $ Left $ DB.DbLookupTxHash txHash
389417
where
390418
txHash = Generic.unTxHash txIdLedger
391419

@@ -398,8 +426,10 @@ tryUpdateCacheTx ::
398426
tryUpdateCacheTx cache ledgerTxId txId = do
399427
case cache of
400428
NoCache -> pure ()
401-
ActiveCache ci -> do
402-
liftIO $ atomically $ modifyTVar (cTxIds ci) $ FIFO.insert ledgerTxId txId
429+
ActiveCache isCacheOptomised ci -> do
430+
if isCacheOptomised
431+
then pure ()
432+
else liftIO $ atomically $ modifyTVar (cTxIds ci) $ FIFO.insert ledgerTxId txId
403433

404434
insertBlockAndCache ::
405435
(MonadIO m, MonadBaseControl IO m) =>
@@ -409,12 +439,15 @@ insertBlockAndCache ::
409439
insertBlockAndCache cache block =
410440
case cache of
411441
NoCache -> DB.insertBlock block
412-
ActiveCache ci -> do
413-
bid <- DB.insertBlock block
414-
liftIO $ do
415-
missPrevBlock (cStats ci)
416-
atomically $ writeTVar (cPrevBlock ci) $ Just (bid, DB.blockHash block)
417-
pure bid
442+
ActiveCache isCacheOptomised ci ->
443+
if isCacheOptomised
444+
then DB.insertBlock block
445+
else do
446+
bid <- DB.insertBlock block
447+
liftIO $ do
448+
missPrevBlock (cStats ci)
449+
atomically $ writeTVar (cPrevBlock ci) $ Just (bid, DB.blockHash block)
450+
pure bid
418451

419452
queryDatum ::
420453
MonadIO m =>
@@ -424,17 +457,20 @@ queryDatum ::
424457
queryDatum cache hsh = do
425458
case cache of
426459
NoCache -> DB.queryDatum $ Generic.dataHashToBytes hsh
427-
ActiveCache ci -> do
428-
mp <- liftIO $ readTVarIO (cDatum ci)
429-
case LRU.lookup hsh mp of
430-
Just (datumId, mp') -> do
431-
liftIO $ hitDatum (cStats ci)
432-
liftIO $ atomically $ writeTVar (cDatum ci) mp'
433-
pure $ Just datumId
434-
Nothing -> do
435-
liftIO $ missDatum (cStats ci)
436-
-- miss. The lookup doesn't change the cache on a miss.
437-
DB.queryDatum $ Generic.dataHashToBytes hsh
460+
ActiveCache isCacheOptomised ci -> do
461+
if isCacheOptomised
462+
then DB.queryDatum $ Generic.dataHashToBytes hsh
463+
else do
464+
mp <- liftIO $ readTVarIO (cDatum ci)
465+
case LRU.lookup hsh mp of
466+
Just (datumId, mp') -> do
467+
liftIO $ hitDatum (cStats ci)
468+
liftIO $ atomically $ writeTVar (cDatum ci) mp'
469+
pure $ Just datumId
470+
Nothing -> do
471+
liftIO $ missDatum (cStats ci)
472+
-- miss. The lookup doesn't change the cache on a miss.
473+
DB.queryDatum $ Generic.dataHashToBytes hsh
438474

439475
-- This assumes the entry is not cached.
440476
insertDatumAndCache ::
@@ -447,12 +483,15 @@ insertDatumAndCache cache hsh dt = do
447483
datumId <- DB.insertDatum dt
448484
case cache of
449485
NoCache -> pure datumId
450-
ActiveCache ci -> do
451-
liftIO $
452-
atomically $
453-
modifyTVar (cDatum ci) $
454-
LRU.insert hsh datumId
455-
pure datumId
486+
ActiveCache isCacheOptomised ci -> do
487+
if isCacheOptomised
488+
then pure datumId
489+
else do
490+
liftIO $
491+
atomically $
492+
modifyTVar (cDatum ci) $
493+
LRU.insert hsh datumId
494+
pure datumId
456495

457496
-- Stakes
458497
hitCreds :: StrictTVar IO CacheStatistics -> IO ()

cardano-db-sync/src/Cardano/DbSync/Cache/Epoch.hs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ readCacheEpoch :: MonadIO m => CacheStatus -> m (Maybe CacheEpoch)
2929
readCacheEpoch cache =
3030
case cache of
3131
NoCache -> pure Nothing
32-
ActiveCache ci -> do
32+
ActiveCache _ ci -> do
3333
cacheEpoch <- liftIO $ readTVarIO (cEpoch ci)
3434
pure $ Just cacheEpoch
3535

3636
readEpochBlockDiffFromCache :: MonadIO m => CacheStatus -> m (Maybe EpochBlockDiff)
3737
readEpochBlockDiffFromCache cache =
3838
case cache of
3939
NoCache -> pure Nothing
40-
ActiveCache ci -> do
40+
ActiveCache _ ci -> do
4141
cE <- liftIO $ readTVarIO (cEpoch ci)
4242
case (ceMapEpoch cE, ceEpochBlockDiff cE) of
4343
(_, epochInternal) -> pure epochInternal
@@ -46,7 +46,7 @@ readLastMapEpochFromCache :: CacheStatus -> IO (Maybe DB.Epoch)
4646
readLastMapEpochFromCache cache =
4747
case cache of
4848
NoCache -> pure Nothing
49-
ActiveCache ci -> do
49+
ActiveCache _ ci -> do
5050
cE <- readTVarIO (cEpoch ci)
5151
let mapEpoch = ceMapEpoch cE
5252
-- making sure db sync wasn't restarted on the last block in epoch
@@ -72,7 +72,7 @@ writeEpochBlockDiffToCache ::
7272
writeEpochBlockDiffToCache cache epCurrent =
7373
case cache of
7474
NoCache -> pure $ Left $ SNErrDefault "writeEpochBlockDiffToCache: Cache is NoCache"
75-
ActiveCache ci -> do
75+
ActiveCache _ ci -> do
7676
cE <- liftIO $ readTVarIO (cEpoch ci)
7777
case (ceMapEpoch cE, ceEpochBlockDiff cE) of
7878
(epochLatest, _) -> writeToCache ci (CacheEpoch epochLatest (Just epCurrent))
@@ -94,7 +94,7 @@ writeToMapEpochCache syncEnv cache latestEpoch = do
9494
NoLedger nle -> getSecurityParameter $ nleProtocolInfo nle
9595
case cache of
9696
NoCache -> pure $ Left $ SNErrDefault "writeToMapEpochCache: Cache is NoCache"
97-
ActiveCache ci -> do
97+
ActiveCache _ ci -> do
9898
-- get EpochBlockDiff so we can use the BlockId we stored when inserting blocks
9999
epochInternalCE <- readEpochBlockDiffFromCache cache
100100
case epochInternalCE of

0 commit comments

Comments
 (0)