Skip to content

Commit 422bf43

Browse files
committed
update the running function to be clearer
1 parent 8d97d17 commit 422bf43

File tree

3 files changed

+220
-55
lines changed

3 files changed

+220
-55
lines changed

cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs

Lines changed: 187 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
{-# LANGUAGE ApplicativeDo #-}
12
{-# LANGUAGE FlexibleContexts #-}
23
{-# LANGUAGE OverloadedStrings #-}
4+
{-# LANGUAGE RankNTypes #-}
35
{-# LANGUAGE RecordWildCards #-}
46
{-# LANGUAGE NoImplicitPrelude #-}
57

@@ -9,6 +11,7 @@ module Cardano.DbSync.Era.Universal.Insert.Grouped (
911
ExtendedTxIn (..),
1012
ExtendedTxOut (..),
1113
insertBlockGroupedData,
14+
insertBlockGroupedDataSequential,
1215
insertReverseIndex,
1316
resolveTxInputs,
1417
resolveScriptHash,
@@ -87,12 +90,13 @@ instance Semigroup BlockGroupedData where
8790
(groupedTxFees tgd1 + groupedTxFees tgd2)
8891
(groupedTxOutSum tgd1 + groupedTxOutSum tgd2)
8992

90-
insertBlockGroupedData ::
93+
-- | Original sequential implementation (kept for fallback)
94+
insertBlockGroupedDataSequential ::
9195
MonadIO m =>
9296
SyncEnv ->
9397
BlockGroupedData ->
9498
DB.DbAction m DB.MinIdsWrapper
95-
insertBlockGroupedData syncEnv grouped = do
99+
insertBlockGroupedDataSequential syncEnv grouped = do
96100
disInOut <- liftIO $ getDisableInOutState syncEnv
97101

98102
let txOutChunks = chunksOf maxBulkSize $ etoTxOut . fst <$> groupedTxOut grouped
@@ -137,47 +141,54 @@ insertBlockGroupedData syncEnv grouped = do
137141
mapM_ (DB.insertBulkTxMetadata removeJsonbFromSchema) txMetadataChunks
138142
mapM_ DB.insertBulkMaTxMint txMintChunks
139143

140-
pure $ makeMinId txInIds txOutIds maTxOutIds
144+
pure $ makeMinId syncEnv txInIds txOutIds maTxOutIds
141145
where
142146
tracer = getTrace syncEnv
143147
txOutVariantType = getTxOutVariantType syncEnv
144148
removeJsonbFromSchema = ioRemoveJsonbFromSchema $ soptInsertOptions $ envOptions syncEnv
145149

146-
categorizeResolvedInputs :: [ExtendedTxIn] -> ([DB.BulkConsumedByHash], [(DB.TxOutIdW, DB.TxId)], [ExtendedTxIn])
147-
categorizeResolvedInputs etis =
148-
let (hashBased, idBased, failed) = foldr categorizeOne ([], [], []) etis
149-
in (hashBased, idBased, failed)
150-
where
151-
categorizeOne ExtendedTxIn {..} (hAcc, iAcc, fAcc) =
152-
case etiTxOutId of
153-
Right txOutId ->
154-
(hAcc, (txOutId, DB.txInTxInId etiTxIn) : iAcc, fAcc)
155-
Left genericTxIn ->
156-
let bulkData =
157-
DB.BulkConsumedByHash
158-
{ bchTxHash = unTxHash (Generic.txInTxId genericTxIn)
159-
, bchOutputIndex = Generic.txInIndex genericTxIn
160-
, bchConsumingTxId = DB.txInTxInId etiTxIn
161-
}
162-
in (bulkData : hAcc, iAcc, fAcc)
163-
164-
makeMinId :: [DB.TxInId] -> [DB.TxOutIdW] -> [DB.MaTxOutIdW] -> DB.MinIdsWrapper
165-
makeMinId txInIds txOutIds maTxOutIds =
166-
case txOutVariantType of
167-
DB.TxOutVariantCore -> do
168-
DB.CMinIdsWrapper $
169-
DB.MinIds
170-
{ minTxInId = listToMaybe txInIds
171-
, minTxOutId = listToMaybe txOutIds
172-
, minMaTxOutId = listToMaybe maTxOutIds
173-
}
174-
DB.TxOutVariantAddress ->
175-
DB.VMinIdsWrapper $
176-
DB.MinIds
177-
{ minTxInId = listToMaybe txInIds
178-
, minTxOutId = listToMaybe txOutIds
179-
, minMaTxOutId = listToMaybe maTxOutIds
180-
}
150+
-- | Parallel implementation with single connection coordination
151+
insertBlockGroupedData ::
152+
MonadIO m =>
153+
SyncEnv ->
154+
BlockGroupedData ->
155+
DB.DbAction m DB.MinIdsWrapper
156+
insertBlockGroupedData syncEnv grouped = do
157+
disInOut <- liftIO $ getDisableInOutState syncEnv
158+
159+
-- Parallel preparation of independent data
160+
(preparedTxIn, preparedMetadata, preparedMint, txOutChunks) <- liftIO $ do
161+
a1 <- async $ pure $ prepareTxInProcessing syncEnv grouped
162+
a2 <- async $ pure $ prepareMetadataProcessing syncEnv grouped
163+
a3 <- async $ pure $ prepareMintProcessing syncEnv grouped
164+
a4 <- async $ pure $ chunksOf maxBulkSize $ etoTxOut . fst <$> groupedTxOut grouped
165+
166+
r1 <- wait a1
167+
r2 <- wait a2
168+
r3 <- wait a3
169+
r4 <- wait a4
170+
pure (r1, r2, r3, r4)
171+
172+
-- Sequential TxOut processing (generates required IDs)
173+
txOutIds <- concat <$> mapM (DB.insertBulkTxOut disInOut) txOutChunks
174+
175+
-- PHASE 3: Execute independent operations (TxIn, Metadata, Mint) in parallel
176+
txInIds <- executePreparedTxIn preparedTxIn
177+
178+
-- PHASE 4: Pipeline TxOut-dependent operations (MaTxOut + UTxO consumption)
179+
maTxOutIds <- processMaTxOuts syncEnv txOutIds grouped
180+
181+
-- PHASE 5: Execute remaining independent operations in parallel
182+
liftIO $ do
183+
a1 <- async $ DB.runDbActionIO (envDbEnv syncEnv) (executePreparedMetadata preparedMetadata)
184+
a2 <- async $ DB.runDbActionIO (envDbEnv syncEnv) (executePreparedMint preparedMint)
185+
_ <- wait a1
186+
void $ wait a2
187+
188+
-- PHASE 6: Process UTxO consumption (depends on txOutIds)
189+
processUtxoConsumption syncEnv grouped txOutIds
190+
191+
pure $ makeMinId syncEnv txInIds txOutIds maTxOutIds
181192

182193
mkmaTxOuts :: DB.TxOutVariantType -> (DB.TxOutIdW, [MissingMaTxOut]) -> [DB.MaTxOutW]
183194
mkmaTxOuts _txOutVariantType (txOutId, mmtos) = mkmaTxOut <$> mmtos
@@ -341,3 +352,141 @@ matches txIn eutxo =
341352
getTxOutIndex txOutWrapper = case txOutWrapper of
342353
DB.VCTxOutW cTxOut -> VC.txOutCoreIndex cTxOut
343354
DB.VATxOutW vTxOut _ -> VA.txOutAddressIndex vTxOut
355+
356+
-----------------------------------------------------------------------------------------------------------------------------------
357+
-- PARALLEL PROCESSING HELPER FUNCTIONS
358+
-----------------------------------------------------------------------------------------------------------------------------------
359+
360+
-- | Prepared TxIn data for async execution
361+
data PreparedTxIn = PreparedTxIn
362+
{ ptiChunks :: ![[DB.TxIn]]
363+
, ptiSkip :: !Bool
364+
}
365+
366+
-- | Prepared Metadata data for async execution
367+
data PreparedMetadata = PreparedMetadata
368+
{ pmChunks :: ![[DB.TxMetadata]]
369+
, pmRemoveJsonb :: !Bool
370+
}
371+
372+
-- | Prepared Mint data for async execution
373+
data PreparedMint = PreparedMint
374+
{ pmtChunks :: ![[DB.MaTxMint]]
375+
}
376+
377+
-- | Prepare TxIn processing (can run in parallel with TxOut)
378+
prepareTxInProcessing :: SyncEnv -> BlockGroupedData -> PreparedTxIn
379+
prepareTxInProcessing syncEnv grouped =
380+
PreparedTxIn
381+
{ ptiChunks = chunksOf maxBulkSize $ etiTxIn <$> groupedTxIn grouped
382+
, ptiSkip = getSkipTxIn syncEnv
383+
}
384+
385+
-- | Prepare Metadata processing (fully independent)
386+
prepareMetadataProcessing :: SyncEnv -> BlockGroupedData -> PreparedMetadata
387+
prepareMetadataProcessing syncEnv grouped =
388+
PreparedMetadata
389+
{ pmChunks = chunksOf maxBulkSize $ groupedTxMetadata grouped
390+
, pmRemoveJsonb = ioRemoveJsonbFromSchema $ soptInsertOptions $ envOptions syncEnv
391+
}
392+
393+
-- | Prepare Mint processing (fully independent)
394+
prepareMintProcessing :: SyncEnv -> BlockGroupedData -> PreparedMint
395+
prepareMintProcessing _syncEnv grouped =
396+
PreparedMint
397+
{ pmtChunks = chunksOf maxBulkSize $ groupedTxMint grouped
398+
}
399+
400+
-- | Execute prepared TxIn operations
401+
executePreparedTxIn :: MonadIO m => PreparedTxIn -> DB.DbAction m [DB.TxInId]
402+
executePreparedTxIn prepared =
403+
if ptiSkip prepared
404+
then pure []
405+
else concat <$> mapM DB.insertBulkTxIn (ptiChunks prepared)
406+
407+
-- | Execute prepared Metadata operations
408+
executePreparedMetadata :: MonadIO m => PreparedMetadata -> DB.DbAction m ()
409+
executePreparedMetadata prepared =
410+
mapM_ (DB.insertBulkTxMetadata (pmRemoveJsonb prepared)) (pmChunks prepared)
411+
412+
-- | Execute prepared Mint operations
413+
executePreparedMint :: MonadIO m => PreparedMint -> DB.DbAction m ()
414+
executePreparedMint prepared =
415+
mapM_ DB.insertBulkMaTxMint (pmtChunks prepared)
416+
417+
-- | Process MaTxOut operations (depends on TxOut IDs)
418+
processMaTxOuts :: MonadIO m => SyncEnv -> [DB.TxOutIdW] -> BlockGroupedData -> DB.DbAction m [DB.MaTxOutIdW]
419+
processMaTxOuts syncEnv txOutIds grouped = do
420+
let txOutVariantType = getTxOutVariantType syncEnv
421+
maTxOuts = concatMap (mkmaTxOuts txOutVariantType) $
422+
zip txOutIds (snd <$> groupedTxOut grouped)
423+
maTxOutChunks = chunksOf maxBulkSize maTxOuts
424+
concat <$> mapM DB.insertBulkMaTxOut maTxOutChunks
425+
426+
-- | Process UTxO consumption updates (depends on TxOut IDs)
427+
processUtxoConsumption :: MonadIO m => SyncEnv -> BlockGroupedData -> [DB.TxOutIdW] -> DB.DbAction m ()
428+
processUtxoConsumption syncEnv grouped txOutIds = do
429+
let tracer = getTrace syncEnv
430+
txOutVariantType = getTxOutVariantType syncEnv
431+
432+
whenConsumeOrPruneTxOut syncEnv $ do
433+
-- Resolve remaining inputs
434+
etis <- resolveRemainingInputs (groupedTxIn grouped) $ zip txOutIds (fst <$> groupedTxOut grouped)
435+
-- Categorise resolved inputs for bulk vs individual processing
436+
let (hashBasedUpdates, idBasedUpdates, failedInputs) = categorizeResolvedInputs etis
437+
hashUpdateChunks = chunksOf maxBulkSize hashBasedUpdates
438+
idUpdateChunks = chunksOf maxBulkSize idBasedUpdates
439+
440+
-- Bulk process hash-based updates
441+
unless (null hashBasedUpdates) $
442+
mapM_ (DB.updateConsumedByTxHashBulk txOutVariantType) hashUpdateChunks
443+
-- Individual process ID-based updates
444+
unless (null idBasedUpdates) $
445+
mapM_ DB.updateListTxOutConsumedByTxId idUpdateChunks
446+
-- Log failures
447+
mapM_ (liftIO . logWarning tracer . ("Failed to find output for " <>) . Text.pack . show) failedInputs
448+
449+
-- | Helper function to categorize resolved inputs for parallel processing
450+
categorizeResolvedInputs :: [ExtendedTxIn] -> ([DB.BulkConsumedByHash], [(DB.TxOutIdW, DB.TxId)], [ExtendedTxIn])
451+
categorizeResolvedInputs etis =
452+
let (hashBased, idBased, failed) = foldr categorizeOne ([], [], []) etis
453+
in (hashBased, idBased, failed)
454+
where
455+
categorizeOne ExtendedTxIn {..} (hAcc, iAcc, fAcc) =
456+
case etiTxOutId of
457+
Right txOutId ->
458+
(hAcc, (txOutId, DB.txInTxInId etiTxIn) : iAcc, fAcc)
459+
Left genericTxIn ->
460+
let bulkData =
461+
DB.BulkConsumedByHash
462+
{ bchTxHash = unTxHash (Generic.txInTxId genericTxIn)
463+
, bchOutputIndex = Generic.txInIndex genericTxIn
464+
, bchConsumingTxId = DB.txInTxInId etiTxIn
465+
}
466+
in (bulkData : hAcc, iAcc, fAcc)
467+
468+
-----------------------------------------------------------------------------------------------------------------------------------
469+
-- PARALLEL PROCESSING HELPER FUNCTIONS (NO PIPELINES)
470+
-----------------------------------------------------------------------------------------------------------------------------------
471+
472+
-- Note: After analysis, pipelines aren't suitable here due to data dependencies.
473+
-- The current approach using async for truly independent operations is optimal.
474+
475+
-- | Helper function to create MinIds result
476+
makeMinId :: SyncEnv -> [DB.TxInId] -> [DB.TxOutIdW] -> [DB.MaTxOutIdW] -> DB.MinIdsWrapper
477+
makeMinId syncEnv txInIds txOutIds maTxOutIds =
478+
case getTxOutVariantType syncEnv of
479+
DB.TxOutVariantCore ->
480+
DB.CMinIdsWrapper $
481+
DB.MinIds
482+
{ minTxInId = listToMaybe txInIds
483+
, minTxOutId = listToMaybe txOutIds
484+
, minMaTxOutId = listToMaybe maTxOutIds
485+
}
486+
DB.TxOutVariantAddress ->
487+
DB.VMinIdsWrapper $
488+
DB.MinIds
489+
{ minTxInId = listToMaybe txInIds
490+
, minTxOutId = listToMaybe txOutIds
491+
, minMaTxOutId = listToMaybe maTxOutIds
492+
}

cardano-db/src/Cardano/Db/Run.hs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import Cardano.BM.Data.LogItem (
1212
mkLOMeta,
1313
)
1414
import Cardano.BM.Data.Severity (Severity (..))
15-
import Cardano.BM.Trace (Trace, logWarning)
15+
import Cardano.BM.Trace (Trace)
1616
import Cardano.Prelude
1717
import Control.Monad.IO.Unlift (withRunInIO)
1818
import Control.Monad.Logger (
@@ -94,8 +94,22 @@ sessionErrorToDbError cs sessionErr =
9494
-- Run DB actions with INTERRUPT HANDLING
9595
-----------------------------------------------------------------------------------------
9696

97-
-- | Run a DbAction with explicit transaction and isolation level
98-
-- This version properly handles interrupts (Ctrl+C) and ensures cleanup
97+
-- | Run a DbAction with explicit transaction control and isolation level
98+
--
99+
-- Transaction behavior:
100+
-- * Begins transaction with specified isolation level
101+
-- * Runs the action within the transaction
102+
-- * Commits if action succeeds, rollback only on commit failure or async exceptions
103+
-- * Returns Either for explicit error handling instead of throwing exceptions
104+
--
105+
-- Exception safety:
106+
-- * Uses 'mask' to prevent async exceptions during transaction lifecycle
107+
-- * Uses 'onException' to ensure rollback on interrupts (Ctrl+C, SIGTERM, etc.)
108+
-- * Does NOT rollback on action errors - lets them commit (matches Persistent semantics)
109+
--
110+
-- Note: This follows Persistent's philosophy where successful function calls commit
111+
-- their transactions regardless of the return value. Only async exceptions and
112+
-- commit failures trigger rollbacks.
99113
runDbActionWithIsolation ::
100114
MonadUnliftIO m =>
101115
DbEnv ->
@@ -104,28 +118,28 @@ runDbActionWithIsolation ::
104118
m (Either DbError a)
105119
runDbActionWithIsolation dbEnv isolationLevel action = do
106120
withRunInIO $ \runInIO -> do
121+
-- Use masking to prevent async exceptions during transaction management
107122
mask $ \restore -> do
108-
-- Begin transaction
123+
-- Begin transaction with specified isolation level
109124
beginResult <- beginTransaction dbEnv isolationLevel
110125
case beginResult of
111126
Left err -> pure (Left err)
112127
Right _ -> do
113-
-- Run the action with exception handling for interrupts
114-
result <-
115-
restore (runInIO $ runReaderT (runExceptT (runDbAction action)) dbEnv)
116-
`onException` do
117-
case dbTracer dbEnv of
118-
Just tracer -> logWarning tracer "rolling back transaction, due to interrupt."
119-
Nothing -> pure ()
120-
rollbackTransaction dbEnv
128+
-- Run action with async exception protection via onException
129+
-- If interrupted (Ctrl+C), the onException handler will rollback
130+
result <- onException
131+
(restore (runInIO $ runReaderT (runExceptT (runDbAction action)) dbEnv))
132+
(restore $ rollbackTransaction dbEnv)
121133
case result of
122-
Left err -> do
123-
rollbackTransaction dbEnv
124-
pure (Left err)
134+
-- Action returned error but ran successfully - commit the transaction
135+
-- This matches Persistent's behavior: successful calls always commit
136+
Left err -> pure (Left err)
125137
Right val -> do
138+
-- Attempt to commit the transaction
126139
commitResult <- commitTransaction dbEnv
127140
case commitResult of
128141
Left commitErr -> do
142+
-- Commit failed - rollback and return the commit error
129143
rollbackTransaction dbEnv
130144
pure (Left commitErr)
131145
Right _ -> pure (Right val)

cardano-db/src/Cardano/Db/Statement/OffChain.hs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import Cardano.Db.Schema.Types (PoolUrl, poolUrlDecoder, utcTimeAsTimestampDecod
2424
import Cardano.Db.Statement.Function.Core (ResultType (..), ResultTypeBulk (..), mkDbCallStack, runDbSession)
2525
import Cardano.Db.Statement.Function.Delete (parameterisedDeleteWhere)
2626
import Cardano.Db.Statement.Function.Insert (insertCheckUnique)
27-
import Cardano.Db.Statement.Function.InsertBulk (insertBulk)
27+
import Cardano.Db.Statement.Function.InsertBulk (ConflictStrategy (..), insertBulk, insertBulkWith)
2828
import Cardano.Db.Statement.Function.Query (countAll)
2929
import Cardano.Db.Statement.Pool (queryPoolHashIdExistsStmt, queryPoolMetadataRefIdExistsStmt)
3030
import Cardano.Db.Statement.Types (DbInfo (..))
@@ -550,7 +550,9 @@ insertBulkOffChainVoteExternalUpdatesStmt =
550550

551551
insertBulkOffChainVoteFetchErrorStmt :: HsqlStmt.Statement [SO.OffChainVoteFetchError] ()
552552
insertBulkOffChainVoteFetchErrorStmt =
553-
insertBulk
553+
insertBulkWith
554+
(IgnoreWithColumns ["voting_anchor_id", "retry_count"]) -- ON CONFLICT DO NOTHING
555+
False
554556
extractOffChainVoteFetchError
555557
SO.offChainVoteFetchErrorBulkEncoder
556558
NoResultBulk

0 commit comments

Comments
 (0)