Skip to content

Commit de74a96

Browse files
committed
add envDbIsolationState for when syncing and following
1 parent 5400ffc commit de74a96

File tree

10 files changed

+50
-19
lines changed

10 files changed

+50
-19
lines changed

cardano-db-sync/src/Cardano/DbSync.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ runDbSync metricsSetters iomgr trce params syncNodeConfigFromFile abortOnPanic =
144144
whenJust (enpMaybeRollback params) $ \slotNo ->
145145
void $ unsafeRollback trce (txOutConfigToTableType txOutConfig) pgConfig slotNo
146146

147-
-- This runMigration is ONLY for delayed migrations during sync (like indexes)
147+
-- These migrations will be ran when near the tip of the chain eg: indexes.
148148
let runNearTipMigration mode = do
149149
msg <- DB.getMaintenancePsqlConf pgConfig
150150
logInfo trce $ "Running NearTip database migrations in mode " <> textShow mode

cardano-db-sync/src/Cardano/DbSync/Api.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ mkSyncEnv metricSetters trce dbEnv syncOptions protoInfo nw nwMagic systemStart
339339
oarq <- newTBQueueIO 1000
340340
epochVar <- newTVarIO initCurrentEpochNo
341341
epochStatistics <- initEpochStatistics
342+
dbIsolationStateVar <- newTVarIO DB.SyncLagging -- For database transaction isolation optimisation
342343
ledgerEnvType <-
343344
case (enpMaybeLedgerStateDir syncNP, hasLedger' syncNodeConfigFromFile) of
344345
(Just dir, True) ->
@@ -370,6 +371,7 @@ mkSyncEnv metricSetters trce dbEnv syncOptions protoInfo nw nwMagic systemStart
370371
, envDbConstraints = dbCNamesVar
371372
, envCurrentEpochNo = epochVar
372373
, envIndexes = indexesVar
374+
, envDbIsolationState = dbIsolationStateVar
373375
, envLedgerEnv = ledgerEnvType
374376
, envNetworkMagic = nwMagic
375377
, envOffChainPoolResultQueue = oprq

cardano-db-sync/src/Cardano/DbSync/Api/Types.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ data SyncEnv = SyncEnv
5050
, envCurrentEpochNo :: !(StrictTVar IO CurrentEpochNo)
5151
, envIndexes :: !(StrictTVar IO Bool)
5252
, envBootstrap :: !(StrictTVar IO Bool)
53+
, envDbIsolationState :: !(StrictTVar IO DB.SyncState)
5354
, envLedgerEnv :: !LedgerEnv
5455
, envNetworkMagic :: !NetworkMagic
5556
, envOffChainPoolResultQueue :: !(StrictTBQueue IO OffChainPoolResult)

cardano-db-sync/src/Cardano/DbSync/DbEvent.hs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
{-# LANGUAGE OverloadedStrings #-}
21
{-# LANGUAGE ScopedTypeVariables #-}
32
{-# LANGUAGE NoImplicitPrelude #-}
43

@@ -63,16 +62,18 @@ data ThreadChannels = ThreadChannels
6362
--
6463
-- This is the primary transaction runner for sequential database operations in db-sync.
6564
-- All operations within the ExceptT stack are executed atomically in one database transaction.
65+
-- Accepts an optional isolation level (Nothing uses RepeatableRead default).
6666
runDbSyncTransaction ::
6767
forall m a.
6868
(MonadUnliftIO m, HasCallStack) =>
6969
Trace IO Text ->
7070
DB.DbEnv ->
71+
Maybe DB.IsolationLevel ->
7172
ExceptT SyncNodeError DB.DbM a ->
7273
m (Either SyncNodeError a)
73-
runDbSyncTransaction tracer dbEnv exceptTAction = do
74+
runDbSyncTransaction tracer dbEnv mIsolationLevel exceptTAction = do
7475
-- Catch database exceptions and convert to Either
75-
eResult <- liftIO $ try $ DB.runDbTransLogged tracer dbEnv (runExceptT exceptTAction)
76+
eResult <- liftIO $ try $ DB.runDbTransLogged tracer dbEnv mIsolationLevel (runExceptT exceptTAction)
7677
case eResult of
7778
Left (dbErr :: DB.DbSessionError) -> do
7879
pure $ Left $ SNErrDbSessionErr mkSyncNodeCallStack dbErr
@@ -88,7 +89,7 @@ runDbSyncTransactionNoLogging ::
8889
m (Either SyncNodeError a)
8990
runDbSyncTransactionNoLogging dbEnv exceptTAction = do
9091
let dbAction = runExceptT exceptTAction
91-
eResult <- liftIO $ try $ DB.runDbTransSilent dbEnv dbAction
92+
eResult <- liftIO $ try $ DB.runDbTransSilent dbEnv Nothing dbAction
9293
case eResult of
9394
Left (dbErr :: DB.DbSessionError) -> do
9495
pure $ Left $ SNErrDbSessionErr mkSyncNodeCallStack dbErr
@@ -135,7 +136,7 @@ runDbSyncTransactionPool ::
135136
m (Either SyncNodeError a)
136137
runDbSyncTransactionPool tracer dbEnv exceptTAction = do
137138
let dbAction = runExceptT exceptTAction
138-
eResult <- liftIO $ try $ DB.runDbPoolTransLogged tracer dbEnv dbAction -- Use pool
139+
eResult <- liftIO $ try $ DB.runDbPoolTransLogged tracer dbEnv Nothing dbAction -- Use pool
139140
case eResult of
140141
Left (dbErr :: DB.DbSessionError) -> do
141142
pure $ Left $ SNErrDbSessionErr mkSyncNodeCallStack dbErr

cardano-db-sync/src/Cardano/DbSync/Default.hs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,17 @@ import Cardano.DbSync.Rollback
4646
import Cardano.DbSync.Types
4747
import Cardano.DbSync.Util
4848
import Cardano.DbSync.Util.Constraint (addConstraintsIfNotExist)
49+
import Control.Concurrent.Class.MonadSTM.Strict (readTVarIO)
4950

5051
insertListBlocks ::
5152
SyncEnv ->
5253
[CardanoBlock] ->
5354
IO (Either SyncNodeError ())
5455
insertListBlocks syncEnv blocks = do
56+
isolationLevel <- determineIsolationLevel syncEnv
5557
-- stop at the exact block number if the option is set
5658
case sioStopAtBlock $ dncInsertOptions $ envSyncNodeConfig syncEnv of
57-
Nothing -> runDbSyncTransaction (getTrace syncEnv) (envDbEnv syncEnv) $ do
59+
Nothing -> runDbSyncTransaction (getTrace syncEnv) (envDbEnv syncEnv) isolationLevel $ do
5860
traverse_ (applyAndInsertBlockMaybe syncEnv (getTrace syncEnv)) blocks
5961
Just targetBlock ->
6062
insertListBlocksWithStopCondition syncEnv blocks targetBlock
@@ -70,7 +72,8 @@ insertListBlocksWithStopCondition syncEnv blocks targetBlock = do
7072
-- Check if we hit the stop condition in this batch
7173
let hitStopCondition = any (\cblk -> unBlockNo (blockNo cblk) >= targetBlock) blocks
7274
-- Process the blocks in transaction
73-
result <- runDbSyncTransaction (getTrace syncEnv) (envDbEnv syncEnv) $ do
75+
isolationLevel <- determineIsolationLevel syncEnv
76+
result <- runDbSyncTransaction (getTrace syncEnv) (envDbEnv syncEnv) isolationLevel $ do
7477
traverse_ (applyAndInsertBlockMaybe syncEnv (getTrace syncEnv)) blocksToProcess
7578
-- If we hit the stop condition and transaction succeeded, shutdown
7679
case result of
@@ -243,6 +246,14 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
243246

244247
blkNo = headerFieldBlockNo $ getHeaderFields cblk
245248

249+
-- | Determine isolation level based on current sync state
250+
determineIsolationLevel :: SyncEnv -> IO (Maybe DB.IsolationLevel)
251+
determineIsolationLevel syncEnv = do
252+
syncState <- readTVarIO (envDbIsolationState syncEnv)
253+
pure $ case syncState of
254+
DB.SyncLagging -> Just DB.ReadCommitted -- Syncing: use ReadCommitted for performance
255+
DB.SyncFollowing -> Nothing -- Following: use default RepeatableRead for consistency
256+
246257
isWithinTwoMin :: SlotDetails -> Bool
247258
isWithinTwoMin sd = isSyncedWithinSeconds sd 120 == SyncFollowing
248259

cardano-db-sync/src/Cardano/DbSync/Era/Byron/Genesis.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ insertValidateByronGenesisDist ::
4545
Byron.Config ->
4646
ExceptT SyncNodeError IO ()
4747
insertValidateByronGenesisDist syncEnv (NetworkName networkName) cfg = do
48-
ExceptT $ runDbSyncTransaction (getTrace syncEnv) (envDbEnv syncEnv) insertAction
48+
-- Genesis insertion is always syncing, use ReadCommitted for better performance
49+
ExceptT $ runDbSyncTransaction (getTrace syncEnv) (envDbEnv syncEnv) (Just DB.ReadCommitted) insertAction
4950
where
5051
tracer = getTrace syncEnv
5152

cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ module Cardano.DbSync.Era.Universal.Insert.Grouped (
1414
insertBlockGroupedData,
1515
insertReverseIndex,
1616
resolveTxInputs,
17+
resolveTxInputsBulk,
1718
resolveScriptHash,
1819
mkmaTxOuts,
1920
) where
2021

2122
import qualified Data.List as List
23+
import qualified Data.Map.Strict as Map
2224
import qualified Data.Text as Text
2325

2426
import Cardano.BM.Trace (logWarning)
@@ -240,6 +242,7 @@ resolveTxInputs syncEnv hasConsumed needsValue groupedOutputs txIn = do
240242
DB.VCTxOutW cTxOut -> (txIn, VC.txOutCoreTxId cTxOut, Left txIn, Nothing)
241243
DB.VATxOutW vTxOut _ -> (txIn, VA.txOutAddressTxId vTxOut, Left txIn, Nothing)
242244

245+
243246
resolveRemainingInputs ::
244247
[ExtendedTxIn] ->
245248
[(DB.TxOutIdW, ExtendedTxOut)] ->

cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/LedgerEvent.hs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import Cardano.DbSync.Types
3131

3232
import Cardano.DbSync.Error (SyncNodeError)
3333
import Cardano.DbSync.Metrics (setDbEpochSyncDuration, setDbEpochSyncNumber)
34-
import Control.Concurrent.Class.MonadSTM.Strict (readTVarIO)
34+
import Control.Concurrent.Class.MonadSTM.Strict (readTVarIO, writeTVar)
3535
import Control.Monad.Extra (whenJust)
3636
import qualified Data.Map.Strict as Map
3737
import qualified Data.Set as Set
@@ -76,8 +76,11 @@ insertNewEpochLedgerEvents syncEnv currentEpochNo@(EpochNo curEpoch) =
7676
currentTime <- liftIO getCurrentTime
7777
-- Get current epoch statistics
7878
epochStats <- liftIO $ readTVarIO (envEpochStatistics syncEnv)
79+
-- Update the database isolation state for transaction optimisation
80+
let syncState = toSyncState ss
81+
liftIO $ atomically $ writeTVar (envDbIsolationState syncEnv) syncState
7982
-- Insert the epoch sync time into the database
80-
insertEpochSyncTime en (toSyncState ss) epochStats currentTime
83+
insertEpochSyncTime en syncState epochStats currentTime
8184
-- Text of the epoch sync time
8285
let epochDurationText = formatEpochDuration (elsStartTime epochStats) currentTime
8386

cardano-db/src/Cardano/Db/Run.hs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,22 +53,25 @@ import Cardano.Db.Types (DbEnv (..), DbM (..))
5353
-- This is the primary runner used for cardano-db-sync block processing.
5454
-- Wraps all operations in a single database transaction with full ACID guarantees.
5555
-- Automatically handles BEGIN/COMMIT/ROLLBACK and provides comprehensive logging.
56+
-- Accepts an optional isolation level (defaults to RepeatableRead).
5657
runDbTransLogged ::
5758
MonadUnliftIO m =>
5859
Trace IO Text ->
5960
DbEnv ->
61+
Maybe IsolationLevel -> -- Optional isolation level
6062
DbM a ->
6163
m a
62-
runDbTransLogged tracer dbEnv action = do
64+
runDbTransLogged tracer dbEnv mIsolationLevel action = do
6365
result <- liftIO $ HsqlS.run transactionSession (dbConnection dbEnv)
6466
case result of
6567
Left sessionErr -> do
6668
liftIO $ logWarning tracer $ "Database transaction error: " <> Text.pack (show sessionErr)
6769
throwIO $ DbSessionError mkDbCallStack ("Database transaction error: " <> formatSessionError sessionErr)
6870
Right dbResult -> pure dbResult
6971
where
72+
isolationLevel = fromMaybe RepeatableRead mIsolationLevel
7073
transactionSession = do
71-
HsqlS.statement () (beginTransactionStmt RepeatableRead)
74+
HsqlS.statement () (beginTransactionStmt isolationLevel)
7275

7376
result <- liftIO $ try @SomeException $ runIohkLogging tracer $ liftIO $ runReaderT (runDbM action) dbEnv
7477
case result of
@@ -83,21 +86,24 @@ runDbTransLogged tracer dbEnv action = do
8386
--
8487
-- Same transaction guarantees as runDbTransLogged but without logging.
8588
-- Useful for performance-critical operations or testing where log output isn't needed.
89+
-- Accepts an optional isolation level (defaults to RepeatableRead).
8690
runDbTransSilent ::
8791
MonadUnliftIO m =>
8892
DbEnv ->
93+
Maybe IsolationLevel -> -- Optional isolation level
8994
DbM a ->
9095
m a
91-
runDbTransSilent dbEnv action = do
96+
runDbTransSilent dbEnv mIsolationLevel action = do
9297
runNoLoggingT $ do
9398
result <- liftIO $ HsqlS.run transactionSession (dbConnection dbEnv)
9499
case result of
95100
Left sessionErr ->
96101
throwIO $ DbSessionError mkDbCallStack ("Database transaction error: " <> formatSessionError sessionErr)
97102
Right dbResult -> pure dbResult
98103
where
104+
isolationLevel = fromMaybe RepeatableRead mIsolationLevel
99105
transactionSession = do
100-
HsqlS.statement () (beginTransactionStmt RepeatableRead)
106+
HsqlS.statement () (beginTransactionStmt isolationLevel)
101107

102108
result <- liftIO $ try @SomeException $ runReaderT (runDbM action) dbEnv
103109
case result of
@@ -159,13 +165,15 @@ runDbDirectSilent dbEnv action = do
159165
-- Uses a connection from the pool rather than the main DbEnv connection.
160166
-- Wraps operations in a transaction with logging. Designed for concurrent operations
161167
-- where multiple threads need independent database connections.
168+
-- Accepts an optional isolation level (defaults to RepeatableRead).
162169
runDbPoolTransLogged ::
163170
MonadUnliftIO m =>
164171
Trace IO Text ->
165172
DbEnv ->
173+
Maybe IsolationLevel -> -- Optional isolation level
166174
DbM a ->
167175
m a
168-
runDbPoolTransLogged tracer dbEnv action = do
176+
runDbPoolTransLogged tracer dbEnv mIsolationLevel action = do
169177
case dbPoolConnection dbEnv of
170178
Nothing -> throwIO $ DbSessionError mkDbCallStack "No connection pool available in DbEnv"
171179
Just pool -> do
@@ -176,8 +184,9 @@ runDbPoolTransLogged tracer dbEnv action = do
176184
Left sessionErr -> throwIO $ DbSessionError mkDbCallStack ("Pool transaction error: " <> formatSessionError sessionErr)
177185
Right dbResult -> pure dbResult
178186
where
187+
isolationLevel = fromMaybe RepeatableRead mIsolationLevel
179188
transactionSession conn = do
180-
HsqlS.statement () (beginTransactionStmt RepeatableRead)
189+
HsqlS.statement () (beginTransactionStmt isolationLevel)
181190
result <- liftIO $ try @SomeException $ do
182191
let tempDbEnv = createDbEnv conn (dbPoolConnection dbEnv) (dbTracer dbEnv)
183192
runReaderT (runDbM action) tempDbEnv
@@ -233,7 +242,7 @@ runDbStandaloneTransSilent source action = do
233242
HsqlCon.release
234243
( \connection -> do
235244
let dbEnv = createDbEnv connection Nothing Nothing
236-
runDbTransSilent dbEnv action
245+
runDbTransSilent dbEnv Nothing action
237246
)
238247

239248
-- | Standalone runner without transaction management

cardano-db/src/Cardano/Db/Statement/Variants/TxOut.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import qualified Data.Text as Text
1515
import qualified Data.Text.Encoding as TextEnc
1616
import qualified Hasql.Decoders as HsqlD
1717
import qualified Hasql.Encoders as HsqlE
18+
import qualified Hasql.Pipeline as HsqlP
1819
import qualified Hasql.Session as HsqlSes
1920
import qualified Hasql.Statement as HsqlStmt
2021

@@ -31,7 +32,6 @@ import Cardano.Db.Statement.Function.InsertBulk (insertBulk)
3132
import Cardano.Db.Statement.Function.Query (adaDecoder, countAll)
3233
import Cardano.Db.Statement.Types (DbInfo (..), Entity (entityVal))
3334
import Cardano.Db.Types (Ada (..), DbLovelace, DbM, DbWord64, dbLovelaceDecoder)
34-
import qualified Hasql.Pipeline as HsqlP
3535

3636
--------------------------------------------------------------------------------
3737
-- TxOut

0 commit comments

Comments
 (0)