Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ migrateBootstrapUTxO syncEnv = do
where
trce = getTrace syncEnv

storeUTxOFromLedger :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> ExtLedgerState CardanoBlock -> ExceptT SyncNodeError (ReaderT SqlBackend m) ()
storeUTxOFromLedger ::
(MonadBaseControl IO m, MonadIO m) =>
SyncEnv ->
ExtLedgerState CardanoBlock ->
ExceptT SyncNodeError (ReaderT SqlBackend m) ()
storeUTxOFromLedger env st = case ledgerState st of
LedgerStateBabbage bts -> storeUTxO env (getUTxO bts)
LedgerStateConway stc -> storeUTxO env (getUTxO stc)
Expand Down
86 changes: 58 additions & 28 deletions cardano-db-sync/src/Cardano/DbSync/Cache.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Cardano.DbSync.Cache (
queryStakeAddrWithCache,
queryTxIdWithCache,
rollbackCache,
optimiseCaches,
tryUpdateCacheTx,

-- * CacheStatistics
Expand Down Expand Up @@ -73,18 +74,37 @@ import Ouroboros.Consensus.Cardano.Block (StandardCrypto)
-- NOTE: Other tables are not cleaned up since they are not rollbacked.
rollbackCache :: MonadIO m => CacheStatus -> DB.BlockId -> ReaderT SqlBackend m ()
rollbackCache NoCache _ = pure ()
rollbackCache (ActiveCache cache) blockId = do
rollbackCache (ActiveCache _ cache) blockId = do
liftIO $ do
atomically $ writeTVar (cPrevBlock cache) Nothing
atomically $ modifyTVar (cDatum cache) LRU.cleanup
atomically $ modifyTVar (cTxIds cache) FIFO.cleanupCache
void $ rollbackMapEpochInCache cache blockId

-- When syncing and we get within 2 minutes of the tip, we can optimise the caches
-- and set the flag to True on ActiveCache.
optimiseCaches :: MonadIO m => CacheStatus -> ReaderT SqlBackend m CacheStatus
optimiseCaches cache =
case cache of
NoCache -> pure cache
ActiveCache True _ -> pure cache
ActiveCache False c -> do
liftIO $ do
-- empty caches not to be used anymore
atomically $ modifyTVar (cTxIds c) FIFO.cleanupCache
atomically $ writeTVar (cStake c) (StakeCache Map.empty (LRU.empty 0))
atomically $ modifyTVar (cDatum c) (LRU.optimise 0)
-- empty then limit the capacity of the cache
atomically $ writeTVar (cMultiAssets c) (LRU.empty 50000)
-- leaving the following caches as they are:
-- cPools, cPrevBlock, Cstats, cEpoch
pure $ ActiveCache True c

getCacheStatistics :: CacheStatus -> IO CacheStatistics
getCacheStatistics cs =
case cs of
NoCache -> pure initCacheStatistics
ActiveCache ci -> readTVarIO (cStats ci)
ActiveCache _ ci -> readTVarIO (cStats ci)

queryOrInsertRewardAccount ::
(MonadBaseControl IO m, MonadIO m) =>
Expand Down Expand Up @@ -150,9 +170,9 @@ queryStakeAddrWithCacheRetBs ::
queryStakeAddrWithCacheRetBs _trce cache cacheUA ra@(Ledger.RewardAccount _ cred) = do
let bs = Ledger.serialiseRewardAccount ra
case cache of
NoCache -> do
mapLeft (,bs) <$> resolveStakeAddress bs
ActiveCache ci -> do
NoCache -> mapLeft (,bs) <$> resolveStakeAddress bs
ActiveCache True _ -> mapLeft (,bs) <$> resolveStakeAddress bs
ActiveCache False ci -> do
stakeCache <- liftIO $ readTVarIO (cStake ci)
case queryStakeCache cred stakeCache of
Just (addrId, stakeCache') -> do
Expand Down Expand Up @@ -204,7 +224,7 @@ queryPoolKeyWithCache cache cacheUA hsh =
case mPhId of
Nothing -> pure $ Left (DB.DbLookupMessage "PoolKeyHash")
Just phId -> pure $ Right phId
ActiveCache ci -> do
ActiveCache _ ci -> do
mp <- liftIO $ readTVarIO (cPools ci)
case Map.lookup hsh mp of
Just phId -> do
Expand Down Expand Up @@ -244,7 +264,7 @@ insertPoolKeyWithCache cache cacheUA pHash =
{ DB.poolHashHashRaw = Generic.unKeyHashRaw pHash
, DB.poolHashView = Generic.unKeyHashView pHash
}
ActiveCache ci -> do
ActiveCache _ ci -> do
mp <- liftIO $ readTVarIO (cPools ci)
case Map.lookup pHash mp of
Just phId -> do
Expand Down Expand Up @@ -306,11 +326,9 @@ queryMAWithCache ::
ReaderT SqlBackend m (Either (ByteString, ByteString) DB.MultiAssetId)
queryMAWithCache cache policyId asset =
case cache of
NoCache -> do
let !policyBs = Generic.unScriptHash $ policyID policyId
let !assetNameBs = Generic.unAssetName asset
maybe (Left (policyBs, assetNameBs)) Right <$> DB.queryMultiAssetId policyBs assetNameBs
ActiveCache ci -> do
NoCache -> queryDb
ActiveCache True _ -> queryDb
ActiveCache False ci -> do
mp <- liftIO $ readTVarIO (cMultiAssets ci)
case LRU.lookup (policyId, asset) mp of
Just (maId, mp') -> do
Expand All @@ -326,6 +344,11 @@ queryMAWithCache cache policyId asset =
whenRight maId $
liftIO . atomically . modifyTVar (cMultiAssets ci) . LRU.insert (policyId, asset)
pure maId
where
queryDb = do
let !policyBs = Generic.unScriptHash $ policyID policyId
let !assetNameBs = Generic.unAssetName asset
maybe (Left (policyBs, assetNameBs)) Right <$> DB.queryMultiAssetId policyBs assetNameBs

queryPrevBlockWithCache ::
MonadIO m =>
Expand All @@ -336,7 +359,7 @@ queryPrevBlockWithCache ::
queryPrevBlockWithCache msg cache hsh =
case cache of
NoCache -> liftLookupFail msg $ DB.queryBlockId hsh
ActiveCache ci -> do
ActiveCache _ ci -> do
mCachedPrev <- liftIO $ readTVarIO (cPrevBlock ci)
case mCachedPrev of
-- if the cached block matches the requested hash, we return its db id.
Expand Down Expand Up @@ -364,8 +387,9 @@ queryTxIdWithCache ::
queryTxIdWithCache cache txIdLedger = do
case cache of
-- Direct database query if no cache.
NoCache -> DB.queryTxId txHash
ActiveCache cacheInternal -> do
NoCache -> qTxHash
ActiveCache True _ -> qTxHash
ActiveCache False cacheInternal -> do
-- Read current cache state.
cacheTx <- liftIO $ readTVarIO (cTxIds cacheInternal)

Expand All @@ -376,7 +400,7 @@ queryTxIdWithCache cache txIdLedger = do
pure $ Right txId
-- Cache miss.
Nothing -> do
eTxId <- DB.queryTxId txHash
eTxId <- qTxHash
liftIO $ missTxIds (cStats cacheInternal)
case eTxId of
Right txId -> do
Expand All @@ -388,18 +412,17 @@ queryTxIdWithCache cache txIdLedger = do
Left _ -> pure $ Left $ DB.DbLookupTxHash txHash
where
txHash = Generic.unTxHash txIdLedger
qTxHash = DB.queryTxId txHash

tryUpdateCacheTx ::
MonadIO m =>
CacheStatus ->
Ledger.TxId StandardCrypto ->
DB.TxId ->
m ()
tryUpdateCacheTx cache ledgerTxId txId = do
case cache of
NoCache -> pure ()
ActiveCache ci -> do
liftIO $ atomically $ modifyTVar (cTxIds ci) $ FIFO.insert ledgerTxId txId
tryUpdateCacheTx (ActiveCache False ci) ledgerTxId txId =
liftIO $ atomically $ modifyTVar (cTxIds ci) $ FIFO.insert ledgerTxId txId
tryUpdateCacheTx _ _ _ = pure ()

insertBlockAndCache ::
(MonadIO m, MonadBaseControl IO m) =>
Expand All @@ -408,13 +431,16 @@ insertBlockAndCache ::
ReaderT SqlBackend m DB.BlockId
insertBlockAndCache cache block =
case cache of
NoCache -> DB.insertBlock block
ActiveCache ci -> do
bid <- DB.insertBlock block
NoCache -> insBlck
ActiveCache True _ -> insBlck
ActiveCache False ci -> do
bid <- insBlck
liftIO $ do
missPrevBlock (cStats ci)
atomically $ writeTVar (cPrevBlock ci) $ Just (bid, DB.blockHash block)
pure bid
where
insBlck = DB.insertBlock block

queryDatum ::
MonadIO m =>
Expand All @@ -423,8 +449,9 @@ queryDatum ::
ReaderT SqlBackend m (Maybe DB.DatumId)
queryDatum cache hsh = do
case cache of
NoCache -> DB.queryDatum $ Generic.dataHashToBytes hsh
ActiveCache ci -> do
NoCache -> queryDtm
ActiveCache True _ -> queryDtm
ActiveCache False ci -> do
mp <- liftIO $ readTVarIO (cDatum ci)
case LRU.lookup hsh mp of
Just (datumId, mp') -> do
Expand All @@ -434,7 +461,9 @@ queryDatum cache hsh = do
Nothing -> do
liftIO $ missDatum (cStats ci)
-- miss. The lookup doesn't change the cache on a miss.
DB.queryDatum $ Generic.dataHashToBytes hsh
queryDtm
where
queryDtm = DB.queryDatum $ Generic.dataHashToBytes hsh

-- This assumes the entry is not cached.
insertDatumAndCache ::
Expand All @@ -447,7 +476,8 @@ insertDatumAndCache cache hsh dt = do
datumId <- DB.insertDatum dt
case cache of
NoCache -> pure datumId
ActiveCache ci -> do
ActiveCache True _ -> pure datumId
ActiveCache False ci -> do
liftIO $
atomically $
modifyTVar (cDatum ci) $
Expand Down
10 changes: 5 additions & 5 deletions cardano-db-sync/src/Cardano/DbSync/Cache/Epoch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ readCacheEpoch :: MonadIO m => CacheStatus -> m (Maybe CacheEpoch)
readCacheEpoch cache =
case cache of
NoCache -> pure Nothing
ActiveCache ci -> do
ActiveCache _ ci -> do
cacheEpoch <- liftIO $ readTVarIO (cEpoch ci)
pure $ Just cacheEpoch

readEpochBlockDiffFromCache :: MonadIO m => CacheStatus -> m (Maybe EpochBlockDiff)
readEpochBlockDiffFromCache cache =
case cache of
NoCache -> pure Nothing
ActiveCache ci -> do
ActiveCache _ ci -> do
cE <- liftIO $ readTVarIO (cEpoch ci)
case (ceMapEpoch cE, ceEpochBlockDiff cE) of
(_, epochInternal) -> pure epochInternal
Expand All @@ -46,7 +46,7 @@ readLastMapEpochFromCache :: CacheStatus -> IO (Maybe DB.Epoch)
readLastMapEpochFromCache cache =
case cache of
NoCache -> pure Nothing
ActiveCache ci -> do
ActiveCache _ ci -> do
cE <- readTVarIO (cEpoch ci)
let mapEpoch = ceMapEpoch cE
-- making sure db sync wasn't restarted on the last block in epoch
Expand All @@ -72,7 +72,7 @@ writeEpochBlockDiffToCache ::
writeEpochBlockDiffToCache cache epCurrent =
case cache of
NoCache -> pure $ Left $ SNErrDefault "writeEpochBlockDiffToCache: Cache is NoCache"
ActiveCache ci -> do
ActiveCache _ ci -> do
cE <- liftIO $ readTVarIO (cEpoch ci)
case (ceMapEpoch cE, ceEpochBlockDiff cE) of
(epochLatest, _) -> writeToCache ci (CacheEpoch epochLatest (Just epCurrent))
Expand All @@ -94,7 +94,7 @@ writeToMapEpochCache syncEnv cache latestEpoch = do
NoLedger nle -> getSecurityParameter $ nleProtocolInfo nle
case cache of
NoCache -> pure $ Left $ SNErrDefault "writeToMapEpochCache: Cache is NoCache"
ActiveCache ci -> do
ActiveCache _ ci -> do
-- get EpochBlockDiff so we can use the BlockId we stored when inserting blocks
epochInternalCE <- readEpochBlockDiffFromCache cache
case epochInternalCE of
Expand Down
9 changes: 9 additions & 0 deletions cardano-db-sync/src/Cardano/DbSync/Cache/LRU.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Cardano.DbSync.Cache.LRU (
LRUCache (..),
empty,
cleanup,
optimise,
trim,
insert,
fromList,
Expand Down Expand Up @@ -46,6 +47,14 @@ cleanup cache =
, cQueue = OrdPSQ.empty
}

optimise :: Word64 -> LRUCache k v -> LRUCache k v
optimise capacity cache =
cache
{ cCapacity = capacity
, cTick = 0
, cQueue = OrdPSQ.empty
}

-- trim ensures the cache size does not exceed its capacity.
-- It removes the least recently used item if the cache is over capacity.
trim :: Ord k => LRUCache k v -> LRUCache k v
Expand Down
12 changes: 8 additions & 4 deletions cardano-db-sync/src/Cardano/DbSync/Cache/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ data StakeCache = StakeCache
, scLruCache :: !(LRUCache StakeCred DB.StakeAddressId)
}

-- 'CacheStatus' enables functions in this module to be called even if the cache has not been initialized.
-- | 'CacheStatus' enables functions in this module to be called even if the cache has not been initialized.
-- This is used during genesis insertions, where the cache is not yet initiated, and when the user has disabled the cache functionality.
data CacheStatus
= NoCache
| ActiveCache !CacheInternal
| ActiveCache
!Bool
-- ^ The Bool represents if we have surpassed being close to the tip of the chain.
!CacheInternal

data CacheAction
= UpdateCache
Expand Down Expand Up @@ -128,7 +131,7 @@ data CacheEpoch = CacheEpoch

textShowStats :: CacheStatus -> IO Text
textShowStats NoCache = pure "NoCache"
textShowStats (ActiveCache ic) = do
textShowStats (ActiveCache isCacheOptimised ic) = do
stats <- readTVarIO $ cStats ic
stakeHashRaws <- readTVarIO (cStake ic)
pools <- readTVarIO (cPools ic)
Expand All @@ -138,6 +141,7 @@ textShowStats (ActiveCache ic) = do
pure $
mconcat
[ "\nCache Statistics:"
, "\n Caches Optimised: " <> textShow isCacheOptimised
, "\n Stake Addresses: "
, "cache sizes: "
, textShow (Map.size $ scStableCache stakeHashRaws)
Expand Down Expand Up @@ -220,7 +224,7 @@ newEmptyCache CacheCapacity {..} = liftIO $ do
cEpoch <- newTVarIO initCacheEpoch
cTxIds <- newTVarIO (FIFO.empty cacheCapacityTx)

pure . ActiveCache $
pure . ActiveCache False $
CacheInternal
{ cStake = cStake
, cPools = cPools
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ deleteReward trce nw cache epochNo (cred, rwd) = do
where_ (rwdDb ^. Db.RewardType ==. val (Generic.rewardSource rwd))
where_ (rwdDb ^. Db.RewardSpendableEpoch ==. val (unEpochNo epochNo))
where_ (rwdDb ^. Db.RewardPoolId ==. val poolId)
_ -> pure ()
_otherwise -> pure ()

deleteOrphanedRewards :: MonadIO m => EpochNo -> [Db.StakeAddressId] -> ReaderT SqlBackend m ()
deleteOrphanedRewards (EpochNo epochNo) xs =
Expand Down
11 changes: 7 additions & 4 deletions cardano-db-sync/src/Cardano/DbSync/Era/Universal/Block.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Cardano.DbSync.Api
import Cardano.DbSync.Api.Types (InsertOptions (..), SyncEnv (..), SyncOptions (..))
import Cardano.DbSync.Cache (
insertBlockAndCache,
optimiseCaches,
queryPoolKeyWithCache,
queryPrevBlockWithCache,
)
Expand Down Expand Up @@ -64,16 +65,18 @@ insertBlockUniversal ::
ApplyResult ->
ReaderT SqlBackend m (Either SyncNodeError ())
insertBlockUniversal syncEnv shouldLog withinTwoMins withinHalfHour blk details isMember applyResult = do
-- if we're syncing within 2 mins of the tip, we optimise the caches.
newCache <- if isSyncedWithintwoMinutes details then optimiseCaches cache else pure cache
runExceptT $ do
pbid <- case Generic.blkPreviousHash blk of
Nothing -> liftLookupFail (renderErrorMessage (Generic.blkEra blk)) DB.queryGenesis -- this is for networks that fork from Byron on epoch 0.
Just pHash -> queryPrevBlockWithCache (renderErrorMessage (Generic.blkEra blk)) cache pHash
mPhid <- lift $ queryPoolKeyWithCache cache UpdateCache $ coerceKeyRole $ Generic.blkSlotLeader blk
Just pHash -> queryPrevBlockWithCache (renderErrorMessage (Generic.blkEra blk)) newCache pHash
mPhid <- lift $ queryPoolKeyWithCache newCache UpdateCache $ coerceKeyRole $ Generic.blkSlotLeader blk
let epochNo = sdEpochNo details

slid <- lift . DB.insertSlotLeader $ Generic.mkSlotLeader (ioShelley iopts) (Generic.unKeyHashRaw $ Generic.blkSlotLeader blk) (eitherToMaybe mPhid)
blkId <-
lift . insertBlockAndCache cache $
lift . insertBlockAndCache newCache $
DB.Block
{ DB.blockHash = Generic.blkHash blk
, DB.blockEpochNo = Just $ unEpochNo epochNo
Expand Down Expand Up @@ -104,7 +107,7 @@ insertBlockUniversal syncEnv shouldLog withinTwoMins withinHalfHour blk details
when (soptEpochAndCacheEnabled $ envOptions syncEnv)
. newExceptT
$ writeEpochBlockDiffToCache
cache
newCache
EpochBlockDiff
{ ebdBlockId = blkId
, ebdTime = sdSlotTime details
Expand Down
6 changes: 5 additions & 1 deletion cardano-db-sync/src/Cardano/DbSync/Fix/EpochStake.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import qualified Data.Map.Strict as Map
import qualified Data.Strict.Maybe as Strict
import Database.Persist.Sql (SqlBackend)

migrateStakeDistr :: (MonadIO m, MonadBaseControl IO m) => SyncEnv -> Strict.Maybe CardanoLedgerState -> ExceptT SyncNodeError (ReaderT SqlBackend m) Bool
migrateStakeDistr ::
(MonadIO m, MonadBaseControl IO m) =>
SyncEnv ->
Strict.Maybe CardanoLedgerState ->
ExceptT SyncNodeError (ReaderT SqlBackend m) Bool
migrateStakeDistr env mcls =
case (envLedgerEnv env, mcls) of
(HasLedger lenv, Strict.Just cls) -> do
Expand Down
Loading