Skip to content

Commit f951e20

Browse files
committed
bulk consummed txout
1 parent 137bfca commit f951e20

File tree

3 files changed

+83
-33
lines changed

3 files changed

+83
-33
lines changed

cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{-# LANGUAGE FlexibleContexts #-}
22
{-# LANGUAGE OverloadedStrings #-}
3+
{-# LANGUAGE RecordWildCards #-}
34
{-# LANGUAGE NoImplicitPrelude #-}
45

56
module Cardano.DbSync.Era.Universal.Insert.Grouped (
@@ -17,7 +18,7 @@ module Cardano.DbSync.Era.Universal.Insert.Grouped (
1718
import qualified Data.List as List
1819
import qualified Data.Text as Text
1920

20-
import Cardano.BM.Trace (Trace, logWarning)
21+
import Cardano.BM.Trace (logWarning)
2122
import Cardano.Db (DbLovelace (..), MinIds (..))
2223
import qualified Cardano.Db as DB
2324
import qualified Cardano.Db.Schema.Variants.TxOutAddress as VA
@@ -26,6 +27,7 @@ import Cardano.DbSync.Api
2627
import Cardano.DbSync.Api.Types (InsertOptions (..), SyncEnv (..), SyncOptions (..))
2728
import Cardano.DbSync.Cache (queryTxIdWithCacheEither)
2829
import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
30+
import Cardano.DbSync.Era.Shelley.Generic.Util (unTxHash)
2931
import Cardano.DbSync.Era.Shelley.Query
3032
import Cardano.Prelude
3133

@@ -98,9 +100,18 @@ insertBlockGroupedData syncEnv grouped = do
98100
then pure []
99101
else DB.insertBulkTxIn $ etiTxIn <$> groupedTxIn grouped
100102
whenConsumeOrPruneTxOut syncEnv $ do
103+
-- Resolve remaining inputs
101104
etis <- resolveRemainingInputs (groupedTxIn grouped) $ zip txOutIds (fst <$> groupedTxOut grouped)
102-
updateTuples <- mapM (prepareUpdates tracer) etis
103-
DB.updateListTxOutConsumedByTxId $ catMaybes updateTuples
105+
-- Categorise resolved inputs for bulk vs individual processing
106+
let (hashBasedUpdates, idBasedUpdates, failedInputs) = categorizeResolvedInputs etis
107+
-- Bulk process hash-based updates
108+
unless (null hashBasedUpdates) $ DB.updateConsumedByTxHashBulk txOutVariantType hashBasedUpdates
109+
-- Individual process ID-based updates
110+
unless (null idBasedUpdates) $ do
111+
DB.updateListTxOutConsumedByTxId idBasedUpdates
112+
-- Log failures
113+
mapM_ (liftIO . logWarning tracer . ("Failed to find output for " <>) . Text.pack . show) failedInputs
114+
104115
void . DB.insertBulkTxMetadata removeJsonbFromSchema $ groupedTxMetadata grouped
105116
void . DB.insertBulkMaTxMint $ groupedTxMint grouped
106117
pure $ makeMinId txInIds txOutIds maTxOutIds
@@ -109,6 +120,24 @@ insertBlockGroupedData syncEnv grouped = do
109120
txOutVariantType = getTxOutVariantType syncEnv
110121
removeJsonbFromSchema = ioRemoveJsonbFromSchema $ soptInsertOptions $ envOptions syncEnv
111122

123+
categorizeResolvedInputs :: [ExtendedTxIn] -> ([DB.BulkConsumedByHash], [(DB.TxOutIdW, DB.TxId)], [ExtendedTxIn])
124+
categorizeResolvedInputs etis =
125+
let (hashBased, idBased, failed) = foldr categorizeOne ([], [], []) etis
126+
in (hashBased, idBased, failed)
127+
where
128+
categorizeOne ExtendedTxIn {..} (hAcc, iAcc, fAcc) =
129+
case etiTxOutId of
130+
Right txOutId ->
131+
(hAcc, (txOutId, DB.txInTxInId etiTxIn) : iAcc, fAcc)
132+
Left genericTxIn ->
133+
let bulkData =
134+
DB.BulkConsumedByHash
135+
{ bchTxHash = unTxHash (Generic.txInTxId genericTxIn)
136+
, bchOutputIndex = Generic.txInIndex genericTxIn
137+
, bchConsumingTxId = DB.txInTxInId etiTxIn
138+
}
139+
in (bulkData : hAcc, iAcc, fAcc)
140+
112141
makeMinId :: [DB.TxInId] -> [DB.TxOutIdW] -> [DB.MaTxOutIdW] -> DB.MinIdsWrapper
113142
makeMinId txInIds txOutIds maTxOutIds =
114143
case txOutVariantType of
@@ -148,17 +177,6 @@ mkmaTxOuts _txOutVariantType (txOutId, mmtos) = mkmaTxOut <$> mmtos
148177
, VA.maTxOutAddressTxOutId = txOutId'
149178
}
150179

151-
prepareUpdates ::
152-
MonadIO m =>
153-
Trace IO Text ->
154-
ExtendedTxIn ->
155-
m (Maybe (DB.TxOutIdW, DB.TxId))
156-
prepareUpdates trce eti = case etiTxOutId eti of
157-
Right txOutId -> pure $ Just (txOutId, DB.txInTxInId (etiTxIn eti))
158-
Left _ -> do
159-
liftIO $ logWarning trce $ "Failed to find output for " <> Text.pack (show eti)
160-
pure Nothing
161-
162180
insertReverseIndex ::
163181
MonadIO m =>
164182
DB.BlockId ->

cardano-db/src/Cardano/Db/Statement/ConsumedTxOut.hs

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
module Cardano.Db.Statement.ConsumedTxOut where
1111

1212
import Cardano.BM.Trace (Trace, logInfo)
13-
import Cardano.Prelude (Int64, textShow)
13+
import Cardano.Prelude (ByteString, Int64, textShow)
1414
import Contravariant.Extras (contrazip2, contrazip3)
1515
import Control.Exception (throwIO)
1616
import Control.Monad (unless, when)
@@ -567,36 +567,68 @@ deletePageEntries txOutVariantType entries =
567567

568568
--------------------------------------------------------------------------------
569569

570-
-- Statement for updating TxOut entries with consumed_by_tx_id
571-
updatePageEntriesStmt ::
570+
-- | Data for bulk consumption using tx hash
571+
data BulkConsumedByHash = BulkConsumedByHash
572+
{ bchTxHash :: !ByteString
573+
, bchOutputIndex :: !Word64
574+
, bchConsumingTxId :: !Id.TxId
575+
}
576+
577+
-- | Bulk update consumed_by_tx_id using tx hash + index
578+
updateConsumedByTxHashBulk ::
579+
MonadIO m =>
580+
TxOutVariantType ->
581+
[BulkConsumedByHash] ->
582+
DbAction m ()
583+
updateConsumedByTxHashBulk txOutVariantType consumedData =
584+
unless (null consumedData) $ do
585+
let dbCallStack = mkDbCallStack "updateConsumedByTxHashBulk"
586+
case txOutVariantType of
587+
TxOutVariantCore ->
588+
runDbSession dbCallStack $
589+
HsqlSes.statement consumedData (updateConsumedByTxHashBulkStmt @SVC.TxOutCore)
590+
TxOutVariantAddress ->
591+
runDbSession dbCallStack $
592+
HsqlSes.statement consumedData (updateConsumedByTxHashBulkStmt @SVA.TxOutAddress)
593+
594+
updateConsumedByTxHashBulkStmt ::
572595
forall a.
573596
(DbInfo a) =>
574-
HsqlStmt.Statement [ConsumedTriplet] ()
575-
updatePageEntriesStmt =
597+
HsqlStmt.Statement [BulkConsumedByHash] ()
598+
updateConsumedByTxHashBulkStmt =
576599
HsqlStmt.Statement sql encoder HsqlD.noResult True
577600
where
578601
tableN = tableName (Proxy @a)
579602
sql =
580603
TextEnc.encodeUtf8 $
581604
Text.concat
582-
[ "WITH entries AS ("
583-
, " SELECT unnest($1::bigint[]) as tx_out_tx_id,"
584-
, " unnest($2::int[]) as tx_out_index,"
585-
, " unnest($3::bigint[]) as tx_in_tx_id"
605+
[ "WITH consumption_data AS ("
606+
, " SELECT unnest($1::bytea[]) as tx_hash,"
607+
, " unnest($2::bigint[]) as output_index,"
608+
, " unnest($3::bigint[]) as consuming_tx_id"
586609
, ")"
587610
, "UPDATE " <> tableN
588-
, "SET consumed_by_tx_id = entries.tx_in_tx_id"
589-
, "WHERE (tx_id, index) IN (SELECT tx_out_tx_id, tx_out_index FROM entries)"
611+
, "SET consumed_by_tx_id = consumption_data.consuming_tx_id"
612+
, "FROM consumption_data"
613+
, "INNER JOIN tx ON tx.hash = consumption_data.tx_hash"
614+
, "WHERE " <> tableN <> ".tx_id = tx.id"
615+
, " AND " <> tableN <> ".index = consumption_data.output_index"
590616
]
617+
encoder = contramap extractBulkData bulkConsumedByHashEncoder
591618

592-
encoder = contramap extract encodeConsumedTripletBulk
619+
extractBulkData :: [BulkConsumedByHash] -> ([ByteString], [Word64], [Id.TxId])
620+
extractBulkData xs =
621+
( map bchTxHash xs
622+
, map bchOutputIndex xs
623+
, map bchConsumingTxId xs
624+
)
593625

594-
extract :: [ConsumedTriplet] -> ([Id.TxId], [Word64], [Id.TxId])
595-
extract xs =
596-
( map ctTxOutTxId xs
597-
, map ctTxOutIndex xs
598-
, map ctTxInTxId xs
599-
)
626+
bulkConsumedByHashEncoder :: HsqlE.Params ([ByteString], [Word64], [Id.TxId])
627+
bulkConsumedByHashEncoder =
628+
contrazip3
629+
(bulkEncoder $ HsqlE.nonNullable HsqlE.bytea)
630+
(bulkEncoder $ HsqlE.nonNullable $ fromIntegral >$< HsqlE.int8)
631+
(bulkEncoder $ HsqlE.nonNullable $ Id.getTxId >$< HsqlE.int8)
600632

601633
--------------------------------------------------------------------------------
602634

cardano-db/test/Test/IO/Cardano/Db/Rollback.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ createAndInsertBlocks blockCount =
134134

135135
txIds <- mapM insertTx (mkTxs blkId 8)
136136
let txId = case txIds of
137-
(x:_) -> x
137+
(x : _) -> x
138138
[] -> error "mkTxs returned empty list" -- This shouldn't happen with mkTxs blkId 8
139139
void $ insertTxIn (TxIn txId txOutId 0 Nothing)
140140
void $ insertTxOut (mkTxOutCore blkId txId)

0 commit comments

Comments
 (0)