Skip to content

Commit b5e867f

Browse files
Cmdvkderme
authored andcommitted
add LRU cache to stake address select & inserts
1 parent 47421f1 commit b5e867f

File tree

18 files changed

+286
-203
lines changed

18 files changed

+286
-203
lines changed

cardano-db-sync/src/Cardano/DbSync/Api.hs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ import Ouroboros.Consensus.Protocol.Abstract (ConsensusProtocol)
9797
import Ouroboros.Network.Block (BlockNo (..), Point (..))
9898
import Ouroboros.Network.Magic (NetworkMagic (..))
9999
import qualified Ouroboros.Network.Point as Point
100+
import Cardano.DbSync.Cache.LRU (LRUCacheCapacity(..))
100101

101102
setConsistentLevel :: SyncEnv -> ConsistentLevel -> IO ()
102103
setConsistentLevel env cst = do
@@ -382,7 +383,15 @@ mkSyncEnv ::
382383
IO SyncEnv
383384
mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemStart syncNodeConfigFromFile syncNP ranMigrations runMigrationFnc = do
384385
dbCNamesVar <- newTVarIO =<< dbConstraintNamesExists backend
385-
cache <- if soptCache syncOptions then newEmptyCache 250000 50000 else pure useNoCache
386+
cache <-
387+
if soptCache syncOptions
388+
then newEmptyCache
389+
LRUCacheCapacity
390+
{ lirCapacityStakeHashRaw = 300000,
391+
lruCapacityDatum = 250000,
392+
lruCapacityMultiAsset = 50000
393+
}
394+
else pure cacheStatusNoCache
386395
consistentLevelVar <- newTVarIO Unchecked
387396
fixDataVar <- newTVarIO $ if ranMigrations then DataFixRan else NoneFixRan
388397
indexesVar <- newTVarIO $ enpForceIndexes syncNP

cardano-db-sync/src/Cardano/DbSync/Cache.hs

Lines changed: 80 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -83,106 +83,139 @@ getCacheStatistics cs =
8383

8484
queryOrInsertRewardAccount ::
8585
(MonadBaseControl IO m, MonadIO m) =>
86+
Trace IO Text ->
8687
CacheStatus ->
8788
CacheUpdateAction ->
8889
Ledger.RewardAccount StandardCrypto ->
8990
ReaderT SqlBackend m DB.StakeAddressId
90-
queryOrInsertRewardAccount cacheStatus cacheUA rewardAddr = do
91-
eiAddrId <- queryRewardAccountWithCacheRetBs cacheStatus cacheUA rewardAddr
91+
queryOrInsertRewardAccount trce cacheStatus cacheUA rewardAddr = do
92+
eiAddrId <- queryRewardAccountWithCacheRetBs trce cacheStatus rewardAddr
9293
case eiAddrId of
93-
Left (_err, bs) -> insertStakeAddress rewardAddr (Just bs)
94+
Left (_err, bs) -> insertStakeAddress cacheStatus rewardAddr (Just bs)
9495
Right addrId -> pure addrId
9596

9697
queryOrInsertStakeAddress ::
9798
(MonadBaseControl IO m, MonadIO m) =>
99+
Trace IO Text ->
98100
CacheStatus ->
99101
CacheUpdateAction ->
100102
Network ->
101103
StakeCred ->
102104
ReaderT SqlBackend m DB.StakeAddressId
103-
queryOrInsertStakeAddress cacheStatus cacheUA nw cred =
104-
queryOrInsertRewardAccount cacheStatus cacheUA $ Ledger.RewardAccount nw cred
105+
queryOrInsertStakeAddress trce cacheStatus cacheUA nw cred =
106+
queryOrInsertRewardAccount trce cacheStatus cacheUA $ Ledger.RewardAccount nw cred
105107

106108
-- If the address already exists in the table, it will not be inserted again (due to
107109
-- the uniqueness constraint) but the function will return the 'StakeAddressId'.
108110
insertStakeAddress ::
109111
(MonadBaseControl IO m, MonadIO m) =>
112+
CacheStatus ->
110113
Ledger.RewardAccount StandardCrypto ->
111114
Maybe ByteString ->
112115
ReaderT SqlBackend m DB.StakeAddressId
113-
insertStakeAddress rewardAddr stakeCredBs =
114-
DB.insertStakeAddress $
115-
DB.StakeAddress
116-
{ DB.stakeAddressHashRaw = addrBs
117-
, DB.stakeAddressView = Generic.renderRewardAccount rewardAddr
118-
, DB.stakeAddressScriptHash = Generic.getCredentialScriptHash $ Ledger.raCredential rewardAddr
119-
}
116+
insertStakeAddress cacheStatus rewardAddr stakeCredBs = do
117+
addrId <- DB.insertStakeAddress $
118+
DB.StakeAddress
119+
{ DB.stakeAddressHashRaw = addrBs
120+
, DB.stakeAddressView = Generic.renderRewardAccount rewardAddr
121+
, DB.stakeAddressScriptHash = Generic.getCredentialScriptHash $ Ledger.raCredential rewardAddr
122+
}
123+
case cacheStatus of
124+
NoCache -> pure addrId
125+
CacheActive ci -> do
126+
liftIO $ atomically $ modifyTVar (cStakeRawHashes ci) $
127+
LRU.insert addrBs addrId
128+
pure addrId
120129
where
121130
addrBs = fromMaybe (Ledger.serialiseRewardAccount rewardAddr) stakeCredBs
122131

123132
queryRewardAccountWithCacheRetBs ::
124133
forall m.
125134
MonadIO m =>
135+
Trace IO Text ->
126136
CacheStatus ->
127137
CacheUpdateAction ->
128138
Ledger.RewardAccount StandardCrypto ->
129139
ReaderT SqlBackend m (Either (DB.LookupFail, ByteString) DB.StakeAddressId)
130-
queryRewardAccountWithCacheRetBs cacheStatus cacheUA rwdAcc =
131-
queryStakeAddrWithCacheRetBs cacheStatus cacheUA (Ledger.raNetwork rwdAcc) (Ledger.raCredential rwdAcc)
140+
queryRewardAccountWithCacheRetBs trce cacheStatus cacheUA rwdAcc =
141+
queryStakeAddrWithCacheRetBs trce cacheStatus cacheUA (Ledger.raNetwork rwdAcc) (Ledger.raCredential rwdAcc)
132142

133143
queryStakeAddrWithCache ::
134144
forall m.
135145
MonadIO m =>
146+
Trace IO Text ->
136147
CacheStatus ->
137148
CacheUpdateAction ->
138149
Network ->
139150
StakeCred ->
140151
ReaderT SqlBackend m (Either DB.LookupFail DB.StakeAddressId)
141-
queryStakeAddrWithCache cacheStatus cacheUA nw cred =
142-
mapLeft fst <$> queryStakeAddrWithCacheRetBs cacheStatus cacheUA nw cred
152+
queryStakeAddrWithCache trce cacheStatus cacheUA nw cred =
153+
mapLeft fst <$> queryStakeAddrWithCacheRetBs trce cacheStatus cacheUA nw cred
143154

144155
queryStakeAddrWithCacheRetBs ::
145156
forall m.
146157
MonadIO m =>
158+
Trace IO Text ->
147159
CacheStatus ->
148160
CacheUpdateAction ->
149161
Network ->
150162
StakeCred ->
151163
ReaderT SqlBackend m (Either (DB.LookupFail, ByteString) DB.StakeAddressId)
152-
queryStakeAddrWithCacheRetBs cacheStatus cacheUA nw cred = do
164+
queryStakeAddrWithCacheRetBs trce cacheStatus cacheUA nw cred = do
165+
let !bs = Ledger.serialiseRewardAccount (Ledger.RewardAccount nw cred)
153166
case cacheStatus of
154167
NoCache -> do
155-
let !bs = Ledger.serialiseRewardAccount (Ledger.RewardAccount nw cred)
156168
mapLeft (,bs) <$> queryStakeAddress bs
157-
ActiveCache ci -> do
158-
mp <- liftIO $ readTVarIO (cStakeCreds ci)
159-
(mAddrId, mp') <- queryStakeAddrAux cacheUA mp (cStats ci) nw cred
160-
liftIO $ atomically $ writeTVar (cStakeCreds ci) mp'
161-
pure mAddrId
169+
CacheActive ci -> do
170+
currentCache <- liftIO $ readTVarIO (cStakeRawHashes ci)
171+
let cacheSize = LRU.getSize currentCache
172+
newCache <-
173+
if cacheSize < 1
174+
then do
175+
liftIO $ logInfo trce "----------------- Cache is empty. Querying all addresses. ---------"
176+
queryRes <- DB.queryLatestAddresses cacheSize
177+
pure $ LRU.fromList queryRes currentCache
178+
-- convert the results into the cache
179+
else pure currentCache
180+
case LRU.lookup bs newCache of
181+
Just (addrId, mp') -> do
182+
liftIO $ hitCreds (cStats ci)
183+
liftIO $ atomically $ writeTVar (cStakeRawHashes ci) mp'
184+
pure $ Right addrId
185+
Nothing -> do
186+
liftIO $ missCreds (cStats ci)
187+
liftIO $ atomically $ writeTVar (cStakeRawHashes ci) newCache
188+
queryRes <- mapLeft (,bs) <$> queryStakeAddress bs
189+
case queryRes of
190+
Left _ -> pure queryRes
191+
Right stakeAddrsId -> do
192+
liftIO $ atomically $ modifyTVar (cStakeRawHashes ci) $
193+
LRU.insert bs stakeAddrsId
194+
pure $ Right stakeAddrsId
162195

163-
queryStakeAddrAux ::
164-
MonadIO m =>
165-
CacheUpdateAction ->
166-
StakeAddrCache ->
167-
StrictTVar IO CacheStatistics ->
168-
Network ->
169-
StakeCred ->
170-
ReaderT SqlBackend m (Either (DB.LookupFail, ByteString) DB.StakeAddressId, StakeAddrCache)
171-
queryStakeAddrAux cacheUA mp sts nw cred =
172-
case Map.lookup cred mp of
173-
Just addrId -> do
174-
liftIO $ hitCreds sts
175-
case cacheUA of
176-
EvictAndUpdateCache -> pure (Right addrId, Map.delete cred mp)
177-
_ -> pure (Right addrId, mp)
178-
Nothing -> do
179-
liftIO $ missCreds sts
180-
let !bs = Ledger.serialiseRewardAccount (Ledger.RewardAccount nw cred)
181-
mAddrId <- mapLeft (,bs) <$> queryStakeAddress bs
182-
case (mAddrId, cacheUA) of
183-
(Right addrId, UpdateCache) -> pure (Right addrId, Map.insert cred addrId mp)
184-
(Right addrId, _) -> pure (Right addrId, mp)
185-
(err, _) -> pure (err, mp)
196+
-- queryStakeAddrAux ::
197+
-- MonadIO m =>
198+
-- CacheNew ->
199+
-- StakeAddrCache ->
200+
-- StrictTVar IO CacheStatistics ->
201+
-- Network ->
202+
-- StakeCred ->
203+
-- ReaderT SqlBackend m (Either (DB.LookupFail, ByteString) DB.StakeAddressId, StakeAddrCache)
204+
-- queryStakeAddrAux cacheNew mp sts nw cred =
205+
-- case Map.lookup cred mp of
206+
-- Just addrId -> do
207+
-- liftIO $ hitCreds sts
208+
-- case cacheNew of
209+
-- EvictAndReturn -> pure (Right addrId, Map.delete cred mp)
210+
-- _ -> pure (Right addrId, mp)
211+
-- Nothing -> do
212+
-- liftIO $ missCreds sts
213+
-- let !bs = Ledger.serialiseRewardAccount (Ledger.RewardAccount nw cred)
214+
-- mAddrId <- mapLeft (,bs) <$> queryStakeAddress bs
215+
-- case (mAddrId, cacheNew) of
216+
-- (Right addrId, CacheNew) -> pure (Right addrId, Map.insert cred addrId mp)
217+
-- (Right addrId, _) -> pure (Right addrId, mp)
218+
-- (err, _) -> pure (err, mp)
186219

187220
queryPoolKeyWithCache ::
188221
MonadIO m =>
@@ -372,7 +405,7 @@ queryDatum ::
372405
queryDatum cacheStatus hsh = do
373406
case cacheStatus of
374407
NoCache -> DB.queryDatum $ Generic.dataHashToBytes hsh
375-
ActiveCache ci -> do
408+
CacheActive ci -> do
376409
mp <- liftIO $ readTVarIO (cDatum ci)
377410
case LRU.lookup hsh mp of
378411
Just (datumId, mp') -> do

cardano-db-sync/src/Cardano/DbSync/Cache/Epoch.hs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,27 @@ import Database.Persist.Postgresql (SqlBackend)
2626
-- Epoch Cache
2727
-------------------------------------------------------------------------------------
2828
readCacheEpoch :: MonadIO m => CacheStatus -> m (Maybe CacheEpoch)
29-
readCacheEpoch cache =
30-
case cache of
29+
readCacheEpoch cacheStatus =
30+
case cacheStatus of
3131
NoCache -> pure Nothing
3232
ActiveCache ci -> do
3333
cacheEpoch <- liftIO $ readTVarIO (cEpoch ci)
3434
pure $ Just cacheEpoch
3535

3636
readEpochBlockDiffFromCache :: MonadIO m => CacheStatus -> m (Maybe EpochBlockDiff)
37-
readEpochBlockDiffFromCache cache =
38-
case cache of
37+
38+
readEpochBlockDiffFromCache cacheStatus =
39+
case cacheStatus of
3940
NoCache -> pure Nothing
4041
ActiveCache ci -> do
4142
cE <- liftIO $ readTVarIO (cEpoch ci)
4243
case (ceMapEpoch cE, ceEpochBlockDiff cE) of
4344
(_, epochInternal) -> pure epochInternal
4445

4546
readLastMapEpochFromCache :: CacheStatus -> IO (Maybe DB.Epoch)
46-
readLastMapEpochFromCache cache =
47-
case cache of
47+
48+
readLastMapEpochFromCache cacheStatus =
49+
case cacheStatus of
4850
NoCache -> pure Nothing
4951
ActiveCache ci -> do
5052
cE <- readTVarIO (cEpoch ci)
@@ -57,22 +59,23 @@ readLastMapEpochFromCache cache =
5759
Just (_, ep) -> pure $ Just ep
5860

5961
rollbackMapEpochInCache :: MonadIO m => CacheInternal -> DB.BlockId -> m (Either SyncNodeError ())
60-
rollbackMapEpochInCache cache blockId = do
61-
cE <- liftIO $ readTVarIO (cEpoch cache)
62+
rollbackMapEpochInCache cacheInternal blockId = do
63+
cE <- liftIO $ readTVarIO (cEpoch cacheInternal)
6264
-- split the map and delete anything after blockId including it self as new blockId might be
6365
-- given when inserting the block again when doing rollbacks.
6466
let (newMapEpoch, _) = split blockId (ceMapEpoch cE)
65-
writeToCache cache (CacheEpoch newMapEpoch (ceEpochBlockDiff cE))
67+
writeToCache cacheInternal (CacheEpoch newMapEpoch (ceEpochBlockDiff cE))
6668

6769
writeEpochBlockDiffToCache ::
6870
MonadIO m =>
6971
CacheStatus ->
7072
EpochBlockDiff ->
7173
ReaderT SqlBackend m (Either SyncNodeError ())
72-
writeEpochBlockDiffToCache cache epCurrent =
73-
case cache of
74-
NoCache -> pure $ Left $ SNErrDefault "writeEpochBlockDiffToCache: CacheStatus is NoCache"
75-
ActiveCache ci -> do
74+
75+
writeEpochBlockDiffToCache cacheStatus epCurrent =
76+
case cacheStatus of
77+
NoCache -> pure $ Left $ SNErrDefault "writeEpochBlockDiffToCache: Cache is NoCache"
78+
CacheActive ci -> do
7679
cE <- liftIO $ readTVarIO (cEpoch ci)
7780
case (ceMapEpoch cE, ceEpochBlockDiff cE) of
7881
(epochLatest, _) -> writeToCache ci (CacheEpoch epochLatest (Just epCurrent))
@@ -86,17 +89,17 @@ writeToMapEpochCache ::
8689
CacheStatus ->
8790
DB.Epoch ->
8891
ReaderT SqlBackend m (Either SyncNodeError ())
89-
writeToMapEpochCache syncEnv cache latestEpoch = do
92+
writeToMapEpochCache syncEnv cacheStatus latestEpoch = do
9093
-- this can also be tought of as max rollback number
9194
let securityParam =
9295
case envLedgerEnv syncEnv of
9396
HasLedger hle -> getSecurityParameter $ leProtocolInfo hle
9497
NoLedger nle -> getSecurityParameter $ nleProtocolInfo nle
95-
case cache of
96-
NoCache -> pure $ Left $ SNErrDefault "writeToMapEpochCache: CacheStatus is NoCache"
98+
case cacheStatus of
99+
NoCache -> pure $ Left $ SNErrDefault "writeToMapEpochCache: Cache is NoCache"
97100
ActiveCache ci -> do
98101
-- get EpochBlockDiff so we can use the BlockId we stored when inserting blocks
99-
epochInternalCE <- readEpochBlockDiffFromCache cache
102+
epochInternalCE <- readEpochBlockDiffFromCache cacheStatus
100103
case epochInternalCE of
101104
Nothing -> pure $ Left $ SNErrDefault "writeToMapEpochCache: No epochInternalEpochCache"
102105
Just ei -> do

0 commit comments

Comments
 (0)