Skip to content

Commit 19ba860

Browse files
committed
start adding pool queries
1 parent 422bf43 commit 19ba860

File tree

8 files changed

+86
-50
lines changed

8 files changed

+86
-50
lines changed

cardano-db-sync/src/Cardano/DbSync.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,12 @@ runSyncNode metricsSetters trce iomgr dbConnSetting runIndexesMigrationFnc syncN
204204
( \dbConn -> do
205205
runOrThrowIO $ runExceptT $ do
206206
let isLogingEnabled = dncEnableDbLogging syncNodeConfigFromFile
207-
dbEnv =
207+
-- Create connection pool for parallel operations
208+
pool <- liftIO $ DB.createHasqlConnectionPool [dbConnSetting] 4 -- 4 connections for reasonable parallelism
209+
let dbEnv =
208210
if isLogingEnabled
209-
then DB.DbEnv dbConn isLogingEnabled (Just trce)
210-
else DB.DbEnv dbConn isLogingEnabled Nothing
211+
then DB.createDbEnv dbConn pool (Just trce)
212+
else DB.createDbEnv dbConn pool Nothing
211213
genCfg <- readCardanoGenesisConfig syncNodeConfigFromFile
212214
isJsonbInSchema <- liftDbError $ DB.queryJsonbInSchemaExists dbConn
213215
logProtocolMagicId trce $ genesisProtocolMagicId genCfg

cardano-db-sync/src/Cardano/DbSync/Era/Byron/Genesis.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ insertValidateByronGenesisDist ::
4747
insertValidateByronGenesisDist syncEnv (NetworkName networkName) cfg = do
4848
-- Setting this to True will log all 'Persistent' operations which is great
4949
-- for debugging, but otherwise *way* too chatty.
50-
if DB.dbEnableLogging $ envDbEnv syncEnv
51-
then liftDbIO $ DB.runDbIohkLogging tracer (envDbEnv syncEnv) insertAction
52-
else liftDbIO $ DB.runDbIohkNoLogging (envDbEnv syncEnv) insertAction
50+
case DB.dbTracer $ envDbEnv syncEnv of
51+
Just trce -> liftDbIO $ DB.runDbIohkLogging trce (envDbEnv syncEnv) insertAction
52+
Nothing -> liftDbIO $ DB.runDbIohkNoLogging (envDbEnv syncEnv) insertAction
5353
where
5454
tracer = getTrace syncEnv
5555

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Genesis.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ insertValidateShelleyGenesisDist syncEnv networkName cfg shelleyInitiation = do
6969
liftIO $ logError tracer $ show SNErrIgnoreShelleyInitiation
7070
throwError SNErrIgnoreShelleyInitiation
7171

72-
if DB.dbEnableLogging $ envDbEnv syncEnv
73-
then liftDbIO $ DB.runDbIohkLogging tracer (envDbEnv syncEnv) (insertAction prunes)
74-
else liftDbIO $ DB.runDbIohkNoLogging (envDbEnv syncEnv) (insertAction prunes)
72+
case DB.dbTracer $ envDbEnv syncEnv of
73+
Just trce -> liftDbIO $ DB.runDbIohkLogging trce (envDbEnv syncEnv) (insertAction prunes)
74+
Nothing -> liftDbIO $ DB.runDbIohkNoLogging (envDbEnv syncEnv) (insertAction prunes)
7575
where
7676
tracer = getTrace syncEnv
7777

cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ insertBlockGroupedData syncEnv grouped = do
162162
a2 <- async $ pure $ prepareMetadataProcessing syncEnv grouped
163163
a3 <- async $ pure $ prepareMintProcessing syncEnv grouped
164164
a4 <- async $ pure $ chunksOf maxBulkSize $ etoTxOut . fst <$> groupedTxOut grouped
165-
165+
166166
r1 <- wait a1
167167
r2 <- wait a2
168168
r3 <- wait a3
@@ -172,20 +172,20 @@ insertBlockGroupedData syncEnv grouped = do
172172
-- Sequential TxOut processing (generates required IDs)
173173
txOutIds <- concat <$> mapM (DB.insertBulkTxOut disInOut) txOutChunks
174174

175-
-- PHASE 3: Execute independent operations (TxIn, Metadata, Mint) in parallel
175+
-- Execute independent operations (TxIn, Metadata, Mint) in parallel
176176
txInIds <- executePreparedTxIn preparedTxIn
177-
178-
-- PHASE 4: Pipeline TxOut-dependent operations (MaTxOut + UTxO consumption)
177+
178+
-- TxOut-dependent operations (MaTxOut + UTxO consumption)
179179
maTxOutIds <- processMaTxOuts syncEnv txOutIds grouped
180-
181-
-- PHASE 5: Execute remaining independent operations in parallel
180+
181+
-- Execute remaining independent operations in parallel with pools
182182
liftIO $ do
183-
a1 <- async $ DB.runDbActionIO (envDbEnv syncEnv) (executePreparedMetadata preparedMetadata)
184-
a2 <- async $ DB.runDbActionIO (envDbEnv syncEnv) (executePreparedMint preparedMint)
183+
a1 <- async $ DB.runPoolDbAction (envDbEnv syncEnv) (executePreparedMetadata preparedMetadata)
184+
a2 <- async $ DB.runPoolDbAction (envDbEnv syncEnv) (executePreparedMint preparedMint)
185185
_ <- wait a1
186186
void $ wait a2
187-
188-
-- PHASE 6: Process UTxO consumption (depends on txOutIds)
187+
188+
-- Process UTxO consumption (depends on txOutIds)
189189
processUtxoConsumption syncEnv grouped txOutIds
190190

191191
pure $ makeMinId syncEnv txInIds txOutIds maTxOutIds
@@ -363,7 +363,7 @@ data PreparedTxIn = PreparedTxIn
363363
, ptiSkip :: !Bool
364364
}
365365

366-
-- | Prepared Metadata data for async execution
366+
-- | Prepared Metadata data for async execution
367367
data PreparedMetadata = PreparedMetadata
368368
{ pmChunks :: ![[DB.TxMetadata]]
369369
, pmRemoveJsonb :: !Bool
@@ -399,7 +399,7 @@ prepareMintProcessing _syncEnv grouped =
399399

400400
-- | Execute prepared TxIn operations
401401
executePreparedTxIn :: MonadIO m => PreparedTxIn -> DB.DbAction m [DB.TxInId]
402-
executePreparedTxIn prepared =
402+
executePreparedTxIn prepared =
403403
if ptiSkip prepared
404404
then pure []
405405
else concat <$> mapM DB.insertBulkTxIn (ptiChunks prepared)
@@ -418,8 +418,9 @@ executePreparedMint prepared =
418418
processMaTxOuts :: MonadIO m => SyncEnv -> [DB.TxOutIdW] -> BlockGroupedData -> DB.DbAction m [DB.MaTxOutIdW]
419419
processMaTxOuts syncEnv txOutIds grouped = do
420420
let txOutVariantType = getTxOutVariantType syncEnv
421-
maTxOuts = concatMap (mkmaTxOuts txOutVariantType) $
422-
zip txOutIds (snd <$> groupedTxOut grouped)
421+
maTxOuts =
422+
concatMap (mkmaTxOuts txOutVariantType) $
423+
zip txOutIds (snd <$> groupedTxOut grouped)
423424
maTxOutChunks = chunksOf maxBulkSize maTxOuts
424425
concat <$> mapM DB.insertBulkMaTxOut maTxOutChunks
425426

@@ -428,7 +429,7 @@ processUtxoConsumption :: MonadIO m => SyncEnv -> BlockGroupedData -> [DB.TxOutI
428429
processUtxoConsumption syncEnv grouped txOutIds = do
429430
let tracer = getTrace syncEnv
430431
txOutVariantType = getTxOutVariantType syncEnv
431-
432+
432433
whenConsumeOrPruneTxOut syncEnv $ do
433434
-- Resolve remaining inputs
434435
etis <- resolveRemainingInputs (groupedTxIn grouped) $ zip txOutIds (fst <$> groupedTxOut grouped)

cardano-db-sync/src/Cardano/DbSync/OffChain.hs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,11 @@ runFetchOffChainPoolThread syncEnv syncNodeConfigFromFile = do
240240
HsqlC.release
241241
( \dbConn -> forever $ do
242242
-- Create a new DbEnv for this thread
243-
let dbEnv = DB.DbEnv dbConn (dncEnableDbLogging syncNodeConfigFromFile) $ Just trce
243+
pool <- DB.createHasqlConnectionPool [connSetting] 4 -- 4 connections for reasonable parallelism
244+
let dbEnv =
245+
if dncEnableDbLogging syncNodeConfigFromFile
246+
then DB.createDbEnv dbConn pool (Just trce)
247+
else DB.createDbEnv dbConn pool Nothing
244248
-- Create a new SyncEnv with the new DbEnv but preserving all other fields
245249
threadSyncEnv = syncEnv {envDbEnv = dbEnv}
246250
tDelay
@@ -275,7 +279,11 @@ runFetchOffChainVoteThread syncEnv syncNodeConfigFromFile = do
275279
HsqlC.release
276280
( \dbConn -> do
277281
-- Create a new DbEnv for this thread
278-
let dbEnv = DB.DbEnv dbConn (dncEnableDbLogging syncNodeConfigFromFile) $ Just trce
282+
pool <- DB.createHasqlConnectionPool [connSetting] 4 -- 4 connections for reasonable parallelism
283+
let dbEnv =
284+
if dncEnableDbLogging syncNodeConfigFromFile
285+
then DB.createDbEnv dbConn pool (Just trce)
286+
else DB.createDbEnv dbConn pool Nothing
279287
-- Create a new SyncEnv with the new DbEnv but preserving all other fields
280288
let threadSyncEnv = syncEnv {envDbEnv = dbEnv}
281289
-- Use the thread-specific SyncEnv for all operations

cardano-db/src/Cardano/Db/Run.hs

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
{-# LANGUAGE FlexibleContexts #-}
33
{-# LANGUAGE LambdaCase #-}
44
{-# LANGUAGE OverloadedStrings #-}
5+
{-# LANGUAGE RankNTypes #-}
56

67
module Cardano.Db.Run where
78

@@ -127,7 +128,8 @@ runDbActionWithIsolation dbEnv isolationLevel action = do
127128
Right _ -> do
128129
-- Run action with async exception protection via onException
129130
-- If interrupted (Ctrl+C), the onException handler will rollback
130-
result <- onException
131+
result <-
132+
onException
131133
(restore (runInIO $ runReaderT (runExceptT (runDbAction action)) dbEnv))
132134
(restore $ rollbackTransaction dbEnv)
133135
case result of
@@ -196,16 +198,9 @@ runPoolDbIohkLogging ::
196198
m (Either DbError a)
197199
runPoolDbIohkLogging connPool tracer action = do
198200
conn <- liftIO $ withResource connPool pure
199-
let dbEnv = mkDbEnv conn
201+
let dbEnv = createDbEnv conn connPool (Just tracer)
200202
runIohkLogging tracer $
201203
runDbActionWithIsolation dbEnv RepeatableRead action
202-
where
203-
mkDbEnv conn =
204-
DbEnv
205-
{ dbConnection = conn
206-
, dbEnableLogging = True
207-
, dbTracer = Just tracer
208-
}
209204

210205
runDbNoLogging :: MonadUnliftIO m => PGPassSource -> DbAction m a -> m a
211206
runDbNoLogging source action = do
@@ -217,9 +212,11 @@ runDbNoLogging source action = do
217212
bracket
218213
(acquireConnection [connSetting])
219214
HsqlCon.release
220-
( \connection -> runInIO $ do
221-
let dbEnv = DbEnv connection False Nothing
222-
runDbConnWithIsolation action dbEnv RepeatableRead
215+
( \connection -> do
216+
pool <- createHasqlConnectionPool [connSetting] 4 -- 4 connections for reasonable parallelism
217+
runInIO $ do
218+
let dbEnv = createDbEnv connection pool Nothing
219+
runDbConnWithIsolation action dbEnv RepeatableRead
223220
)
224221

225222
runDbNoLoggingEnv :: MonadUnliftIO m => DbAction m a -> m a
@@ -235,7 +232,8 @@ runWithConnectionNoLogging source action = do
235232
(acquireConnection [connSetting])
236233
HsqlCon.release
237234
( \connection -> do
238-
let dbEnv = DbEnv connection False Nothing
235+
pool <- createHasqlConnectionPool [connSetting] 4 -- 4 connections for reasonable parallelism
236+
let dbEnv = createDbEnv connection pool Nothing
239237
runNoLoggingT $ runDbConnWithIsolation action dbEnv RepeatableRead
240238
)
241239

@@ -295,3 +293,33 @@ createHasqlConnectionPool settings numConnections = do
295293
Left err -> throwIO $ userError $ "Connection error: " <> show err
296294
Right conn -> pure conn
297295
releaseConn = HsqlCon.release
296+
297+
-- Helper to create DbEnv with both single connection and pool
298+
createDbEnv :: HsqlCon.Connection -> Pool HsqlCon.Connection -> Maybe (Trace IO Text) -> DbEnv
299+
createDbEnv conn pool tracer =
300+
DbEnv
301+
{ dbConnection = conn
302+
, dbPoolConnection = pool
303+
, dbTracer = tracer
304+
}
305+
306+
-- Pool-aware database action runners for async operations
307+
runPoolDbAction :: forall a m. MonadUnliftIO m => DbEnv -> DbAction m a -> m a
308+
runPoolDbAction dbEnv action = do
309+
withRunInIO $ \runInIO -> do
310+
conn <- withResource (dbPoolConnection dbEnv) pure
311+
let poolDbEnv = dbEnv {dbConnection = conn, dbTracer = Nothing} -- No logging for pool operations to avoid contention
312+
result <- runInIO $ runReaderT (runExceptT (runDbAction action)) poolDbEnv
313+
case result of
314+
Left err -> throwIO err
315+
Right val -> pure val
316+
317+
runPoolDbActionWithLogging :: forall a m. MonadUnliftIO m => DbEnv -> DbAction m a -> m a
318+
runPoolDbActionWithLogging dbEnv action = do
319+
withRunInIO $ \runInIO -> do
320+
conn <- withResource (dbPoolConnection dbEnv) pure
321+
let poolDbEnv = dbEnv {dbConnection = conn} -- Keep original logging settings
322+
result <- runInIO $ runReaderT (runExceptT (runDbAction action)) poolDbEnv
323+
case result of
324+
Left err -> throwIO err
325+
Right val -> pure val

cardano-db/src/Cardano/Db/Statement/Function/Core.hs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ where
1616
import Cardano.BM.Trace (logInfo)
1717
import Cardano.Db.Error (DbCallStack (..), DbError (..))
1818
import Cardano.Db.Types (DbAction (..), DbEnv (..))
19-
import Cardano.Prelude (MonadError (..), MonadIO (..), Text, ask, for_, when)
19+
import Cardano.Prelude (MonadError (..), MonadIO (..), Text, ask)
2020
import qualified Data.Text as Text
2121
import Data.Time (diffUTCTime, getCurrentTime)
2222
import GHC.Stack (HasCallStack, SrcLoc (..), callStack, getCallStack)
@@ -57,11 +57,7 @@ import qualified Hasql.Session as HsqlS
5757
runDbSession :: MonadIO m => DbCallStack -> HsqlS.Session a -> DbAction m a
5858
runDbSession dbCallStack@DbCallStack {..} session = DbAction $ do
5959
dbEnv <- ask
60-
let logMsg msg =
61-
when (dbEnableLogging dbEnv) $
62-
for_ (dbTracer dbEnv) $
63-
\tracer -> liftIO $ logInfo tracer msg
64-
locationInfo =
60+
let locationInfo =
6561
" Function: "
6662
<> dbCsFncName
6763
<> " at "
@@ -71,15 +67,15 @@ runDbSession dbCallStack@DbCallStack {..} session = DbAction $ do
7167
<> ":"
7268
<> Text.pack (show dbCsLine)
7369

74-
if dbEnableLogging dbEnv
75-
then do
70+
case dbTracer dbEnv of
71+
Nothing -> run dbEnv
72+
Just tracer -> do
7673
start <- liftIO getCurrentTime
7774
result <- run dbEnv
7875
end <- liftIO getCurrentTime
7976
let duration = diffUTCTime end start
80-
logMsg $ "Query: " <> dbCsFncName <> locationInfo <> " in " <> Text.pack (show duration)
77+
liftIO $ logInfo tracer $ "Query: " <> dbCsFncName <> locationInfo <> " in " <> Text.pack (show duration)
8178
pure result
82-
else run dbEnv
8379
where
8480
run dbEnv = do
8581
result <- liftIO $ HsqlS.run session (dbConnection dbEnv)

cardano-db/src/Cardano/Db/Types.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import Data.Either (fromRight)
3030
import Data.Fixed (Micro, showFixed)
3131
import Data.Functor.Contravariant ((>$<))
3232
import Data.Int (Int64)
33+
import Data.Pool (Pool)
3334
import Data.Scientific (Scientific (..), scientific, toBoundedInteger)
3435
import Data.Text (Text)
3536
import qualified Data.Text as Text
@@ -60,7 +61,7 @@ newtype DbAction m a = DbAction
6061
----------------------------------------------------------------------------
6162
data DbEnv = DbEnv
6263
{ dbConnection :: !HsqlCon.Connection
63-
, dbEnableLogging :: !Bool
64+
, dbPoolConnection :: !(Pool HsqlCon.Connection)
6465
, dbTracer :: !(Maybe (Trace IO Text))
6566
}
6667

0 commit comments

Comments
 (0)