Skip to content

Commit ee922bc

Browse files
committed
error with variant in MinIds fixed
1 parent 51d60fb commit ee922bc

File tree

11 files changed

+130
-93
lines changed

11 files changed

+130
-93
lines changed

cabal.project

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,6 @@ if impl (ghc >= 9.12)
8282
-- https://github.com/kapralVV/Unique/issues/11
8383
, Unique:hashable
8484

85-
-- https://github.com/Gabriella439/Haskell-Pipes-Safe-Library/pull/70
86-
, pipes-safe:base
87-
88-
-- https://github.com/haskell-hvr/int-cast/issues/10
89-
, int-cast:base
90-
9185
-- The two following one-liners will cut off / restore the remainder of this file (for nix-shell users):
9286
-- when using the "cabal" wrapper script provided by nix-shell.
9387
-- --------------------------- 8< --------------------------

cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ getDBSyncPGPass = enpPGPassSource . dbSyncParams
231231

232232
queryDBSync :: DBSyncEnv -> DB.DbM a -> IO a
233233
queryDBSync env = do
234-
DB.runDbStandaloneTransSilent (getDBSyncPGPass env)
234+
DB.runDbStandaloneDirectSilent (getDBSyncPGPass env)
235235

236236
getPoolLayer :: DBSyncEnv -> IO PoolDataLayer
237237
getPoolLayer env = do

cardano-db-sync/src/Cardano/DbSync/Api.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ mkSyncEnv metricSetters trce dbEnv syncOptions protoInfo nw nwMagic systemStart
339339
oarq <- newTBQueueIO 1000
340340
epochVar <- newTVarIO initCurrentEpochNo
341341
epochStatistics <- initEpochStatistics
342-
dbIsolationStateVar <- newTVarIO DB.SyncLagging -- For database transaction isolation optimisation
342+
dbIsolationStateVar <- newTVarIO DB.SyncLagging -- For database transaction isolation optimisation
343343
ledgerEnvType <-
344344
case (enpMaybeLedgerStateDir syncNP, hasLedger' syncNodeConfigFromFile) of
345345
(Just dir, True) ->

cardano-db-sync/src/Cardano/DbSync/Database.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ runDbThread syncEnv queue = do
6767

6868
-- Handle the result of running the actions
6969
case result of
70-
Left err -> logError tracer $ show err
70+
Left err -> do
71+
logError tracer $ show err
72+
throwIO err
7173
Right Continue -> processQueue -- Continue processing
7274
Right Done -> pure () -- Stop processing
7375

@@ -83,8 +85,8 @@ runDbThread syncEnv queue = do
8385
updateBlockMetrics :: IO ()
8486
updateBlockMetrics = do
8587
let metricsSetters = envMetricSetters syncEnv
86-
void $ async $ DB.runDbDirectLogged (fromMaybe mempty $ DB.dbTracer $ envDbEnv syncEnv) (envDbEnv syncEnv) $ do
87-
mBlock <- DB.queryLatestBlock
88+
void $ async $ do
89+
mBlock <- DB.runDbPoolLogged (fromMaybe mempty $ DB.dbTracer $ envDbEnv syncEnv) (envDbEnv syncEnv) DB.queryLatestBlock
8890
liftIO $ whenJust mBlock $ \block -> do
8991
let blockNo = BlockNo $ fromMaybe 0 $ DB.blockBlockNo block
9092
slotNo = SlotNo $ fromMaybe 0 $ DB.blockSlotNo block

cardano-db-sync/src/Cardano/DbSync/Default.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,8 @@ determineIsolationLevel :: SyncEnv -> IO (Maybe DB.IsolationLevel)
251251
determineIsolationLevel syncEnv = do
252252
syncState <- readTVarIO (envDbIsolationState syncEnv)
253253
pure $ case syncState of
254-
DB.SyncLagging -> Just DB.ReadCommitted -- Syncing: use ReadCommitted for performance
255-
DB.SyncFollowing -> Nothing -- Following: use default RepeatableRead for consistency
254+
DB.SyncLagging -> Just DB.ReadCommitted -- Syncing: use ReadCommitted for performance
255+
DB.SyncFollowing -> Nothing -- Following: use default RepeatableRead for consistency
256256

257257
isWithinTwoMin :: SlotDetails -> Bool
258258
isWithinTwoMin sd = isSyncedWithinSeconds sd 120 == SyncFollowing

cardano-db-sync/src/Cardano/DbSync/Era/Universal/Insert/Grouped.hs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ insertBlockGroupedData ::
9797
insertBlockGroupedData syncEnv grouped = do
9898
disInOut <- liftIO $ getDisableInOutState syncEnv
9999

100-
-- Parallel preparation of independent data
101100
-- Parallel preparation of independent data
102101
(preparedTxIn, preparedMetadata, preparedMint, txOutChunks) <- liftIO $ do
103102
a1 <- async $ pure $ prepareTxInProcessing syncEnv grouped
@@ -200,7 +199,8 @@ resolveTxInputs syncEnv hasConsumed needsValue groupedOutputs txIn = do
200199
case eTxId of
201200
Right txId -> do
202201
-- Now get the TxOutId separately
203-
eTxOutId <- lift $ DB.resolveInputTxOutIdFromTxId txId (Generic.txInIndex txIn)
202+
let txOutVariantType = getTxOutVariantType syncEnv
203+
eTxOutId <- lift $ DB.resolveInputTxOutIdFromTxId txOutVariantType txId (Generic.txInIndex txIn)
204204
case eTxOutId of
205205
Right txOutId -> pure $ Right $ convertFoundTxOutId (txId, txOutId)
206206
Left err -> pure $ Left err
@@ -240,7 +240,6 @@ resolveTxInputs syncEnv hasConsumed needsValue groupedOutputs txIn = do
240240
DB.VCTxOutW cTxOut -> (txIn, VC.txOutCoreTxId cTxOut, Left txIn, Nothing)
241241
DB.VATxOutW vTxOut _ -> (txIn, VA.txOutAddressTxId vTxOut, Left txIn, Nothing)
242242

243-
244243
resolveRemainingInputs ::
245244
[ExtendedTxIn] ->
246245
[(DB.TxOutIdW, ExtendedTxOut)] ->
@@ -387,32 +386,10 @@ processUtxoConsumption syncEnv grouped txOutIds = do
387386
-- Log failures
388387
mapM_ (liftIO . logWarning tracer . ("Failed to find output for " <>) . Text.pack . show) failedInputs
389388

390-
-- | Helper function to categorize resolved inputs for parallel processing
391-
categorizeResolvedInputs :: [ExtendedTxIn] -> ([DB.BulkConsumedByHash], [(DB.TxOutIdW, DB.TxId)], [ExtendedTxIn])
392-
categorizeResolvedInputs etis =
393-
let (hashBased, idBased, failed) = foldr categorizeOne ([], [], []) etis
394-
in (hashBased, idBased, failed)
395-
where
396-
categorizeOne ExtendedTxIn {..} (hAcc, iAcc, fAcc) =
397-
case etiTxOutId of
398-
Right txOutId ->
399-
(hAcc, (txOutId, DB.txInTxInId etiTxIn) : iAcc, fAcc)
400-
Left genericTxIn ->
401-
let bulkData =
402-
DB.BulkConsumedByHash
403-
{ bchTxHash = unTxHash (Generic.txInTxId genericTxIn)
404-
, bchOutputIndex = Generic.txInIndex genericTxIn
405-
, bchConsumingTxId = DB.txInTxInId etiTxIn
406-
}
407-
in (bulkData : hAcc, iAcc, fAcc)
408-
409389
-----------------------------------------------------------------------------------------------------------------------------------
410390
-- PARALLEL PROCESSING HELPER FUNCTIONS (NO PIPELINES)
411391
-----------------------------------------------------------------------------------------------------------------------------------
412392

413-
-- Pipelines aren't suitable here due to data dependencies.
414-
-- The current approach using async for truly independent operations is optimal.
415-
416393
-- | Helper function to create MinIds result
417394
makeMinId :: SyncEnv -> [DB.TxInId] -> [DB.TxOutIdW] -> [DB.MaTxOutIdW] -> DB.MinIdsWrapper
418395
makeMinId syncEnv txInIds txOutIds maTxOutIds =
@@ -431,3 +408,25 @@ makeMinId syncEnv txInIds txOutIds maTxOutIds =
431408
, minTxOutId = listToMaybe txOutIds
432409
, minMaTxOutId = listToMaybe maTxOutIds
433410
}
411+
412+
-- | Helper function to categorize resolved inputs for parallel processing
413+
-- Note: Inputs with Left (unresolved to TxOutId) are treated as hash-based updates
414+
-- and also tracked as potentially failed for logging purposes.
415+
categorizeResolvedInputs :: [ExtendedTxIn] -> ([DB.BulkConsumedByHash], [(DB.TxOutIdW, DB.TxId)], [ExtendedTxIn])
416+
categorizeResolvedInputs =
417+
foldr categorizeOne ([], [], [])
418+
where
419+
categorizeOne eti@ExtendedTxIn {..} (hAcc, iAcc, fAcc) =
420+
case etiTxOutId of
421+
Right txOutId ->
422+
-- Successfully resolved to a TxOutId
423+
(hAcc, (txOutId, DB.txInTxInId etiTxIn) : iAcc, fAcc)
424+
Left genericTxIn ->
425+
-- Try to resolve by hash, but also track as potentially failed
426+
let bulkData =
427+
DB.BulkConsumedByHash
428+
{ bchTxHash = unTxHash (Generic.txInTxId genericTxIn)
429+
, bchOutputIndex = Generic.txInIndex genericTxIn
430+
, bchConsumingTxId = DB.txInTxInId etiTxIn
431+
}
432+
in (bulkData : hAcc, iAcc, eti : fAcc)

cardano-db/src/Cardano/Db/Run.hs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ runDbTransLogged ::
5858
MonadUnliftIO m =>
5959
Trace IO Text ->
6060
DbEnv ->
61-
Maybe IsolationLevel -> -- Optional isolation level
61+
Maybe IsolationLevel -> -- Optional isolation level
6262
DbM a ->
6363
m a
6464
runDbTransLogged tracer dbEnv mIsolationLevel action = do
@@ -90,7 +90,7 @@ runDbTransLogged tracer dbEnv mIsolationLevel action = do
9090
runDbTransSilent ::
9191
MonadUnliftIO m =>
9292
DbEnv ->
93-
Maybe IsolationLevel -> -- Optional isolation level
93+
Maybe IsolationLevel -> -- Optional isolation level
9494
DbM a ->
9595
m a
9696
runDbTransSilent dbEnv mIsolationLevel action = do
@@ -170,7 +170,7 @@ runDbPoolTransLogged ::
170170
MonadUnliftIO m =>
171171
Trace IO Text ->
172172
DbEnv ->
173-
Maybe IsolationLevel -> -- Optional isolation level
173+
Maybe IsolationLevel -> -- Optional isolation level
174174
DbM a ->
175175
m a
176176
runDbPoolTransLogged tracer dbEnv mIsolationLevel action = do
@@ -198,6 +198,36 @@ runDbPoolTransLogged tracer dbEnv mIsolationLevel action = do
198198
HsqlS.statement () commitTransactionStmt
199199
pure value
200200

201+
runDbPoolLogged ::
202+
MonadUnliftIO m =>
203+
Trace IO Text ->
204+
DbEnv ->
205+
DbM a ->
206+
m a
207+
runDbPoolLogged tracer dbEnv action = do
208+
case dbPoolConnection dbEnv of
209+
Nothing -> throwIO $ DbSessionError mkDbCallStack "No connection pool available in DbEnv"
210+
Just pool -> do
211+
runIohkLogging tracer $ do
212+
liftIO $ withResource pool $ \conn -> do
213+
result <- HsqlS.run (transactionSession conn) conn
214+
case result of
215+
Left sessionErr -> throwIO $ DbSessionError mkDbCallStack ("Pool transaction error: " <> formatSessionError sessionErr)
216+
Right dbResult -> pure dbResult
217+
where
218+
transactionSession conn = do
219+
HsqlS.statement () (beginTransactionStmt RepeatableRead)
220+
result <- liftIO $ try @SomeException $ do
221+
let tempDbEnv = createDbEnv conn (dbPoolConnection dbEnv) (dbTracer dbEnv)
222+
runReaderT (runDbM action) tempDbEnv
223+
case result of
224+
Left err -> do
225+
HsqlS.statement () rollbackTransactionStmt
226+
liftIO $ throwIO err
227+
Right value -> do
228+
HsqlS.statement () commitTransactionStmt
229+
pure value
230+
201231
-- | External service database runner with error handling
202232
--
203233
-- Designed for external services (like SMASH server) that manage their own connection pools.

cardano-db/src/Cardano/Db/Schema/Ids.hs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,15 @@ newtype TxId = TxId {getTxId :: Int64}
4545
newtype TxMetadataId = TxMetadataId {getTxMetadataId :: Int64}
4646
deriving (Eq, Show, Ord)
4747

48+
-----------------------------------------------------------------------------------------------------------------------------------
4849
newtype TxInId = TxInId {getTxInId :: Int64}
4950
deriving (Eq, Show, Ord)
5051

52+
instance Read TxInId where
53+
readsPrec p s = [(TxInId x, r) | (x, r) <- readsPrec p s]
54+
55+
-----------------------------------------------------------------------------------------------------------------------------------
56+
5157
newtype CollateralTxInId = CollateralTxInId {getCollateralTxInId :: Int64}
5258
deriving (Eq, Show, Ord)
5359

@@ -97,13 +103,26 @@ newtype ExtraMigrationsId = ExtraMigrationsId {getExtraMigrationsId :: Int64}
97103
-- VARIANTS
98104
-----------------------------------------------------------------------------------------------------------------------------------
99105

106+
-----------------------------------------------------------------------------------------------------------------------------------
107+
100108
-- | TxOut variants
101109
newtype TxOutCoreId = TxOutCoreId {getTxOutCoreId :: Int64}
102110
deriving (Eq, Ord, Show)
103111

112+
instance Read TxOutCoreId where
113+
readsPrec p s = [(TxOutCoreId x, r) | (x, r) <- readsPrec p s]
114+
115+
-----------------------------------------------------------------------------------------------------------------------------------
116+
117+
-----------------------------------------------------------------------------------------------------------------------------------
104118
newtype TxOutAddressId = TxOutAddressId {getTxOutAddressId :: Int64}
105119
deriving (Eq, Ord, Show)
106120

121+
instance Read TxOutAddressId where
122+
readsPrec p s = [(TxOutAddressId x, r) | (x, r) <- readsPrec p s]
123+
124+
-----------------------------------------------------------------------------------------------------------------------------------
125+
107126
newtype TxOutUtxoHdId = TxOutUtxoHdId {getTxOutUtxoHdId :: Int64}
108127
deriving (Eq, Ord, Show)
109128

@@ -123,13 +142,26 @@ newtype CollateralTxOutUtxoHdId = CollateralTxOutUtxoHdId {getCollateralTxOutUtx
123142
newtype CollateralTxOutUtxoHdAddressId = CollateralTxOutUtxoHdAddressId {getCollateralTxOutUtxoHdAddressId :: Int64}
124143
deriving (Eq, Ord, Show)
125144

145+
-----------------------------------------------------------------------------------------------------------------------------------
146+
126147
-- | Multi-asset variants
127148
newtype MaTxOutCoreId = MaTxOutCoreId {getMaTxOutCoreId :: Int64}
128149
deriving (Eq, Ord, Show)
129150

151+
instance Read MaTxOutCoreId where
152+
readsPrec p s = [(MaTxOutCoreId x, r) | (x, r) <- readsPrec p s]
153+
154+
-----------------------------------------------------------------------------------------------------------------------------------
155+
156+
-----------------------------------------------------------------------------------------------------------------------------------
130157
newtype MaTxOutAddressId = MaTxOutAddressId {getMaTxOutAddressId :: Int64}
131158
deriving (Eq, Ord, Show)
132159

160+
instance Read MaTxOutAddressId where
161+
readsPrec p s = [(MaTxOutAddressId x, r) | (x, r) <- readsPrec p s]
162+
163+
-----------------------------------------------------------------------------------------------------------------------------------
164+
133165
newtype MaTxOutUtxoHdId = MaTxOutUtxoHdId {getMaTxOutUtxoHdId :: Int64}
134166
deriving (Eq, Ord, Show)
135167

cardano-db/src/Cardano/Db/Schema/MinIds.hs

Lines changed: 23 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -143,57 +143,33 @@ minIdsAddressToText minIds =
143143
textToMinIds :: TxOutVariantType -> Text -> Maybe MinIdsWrapper
144144
textToMinIds txOutVariantType txt =
145145
case Text.split (== ':') txt of
146-
[tminTxInId, tminTxOutId, tminMaTxOutId, typeId] ->
146+
[tminTxInId, tminTxOutId, tminMaTxOutId] ->
147147
let
148148
mTxInId =
149149
if Text.null tminTxInId
150150
then Nothing
151151
else Just $ Id.TxInId $ read $ Text.unpack tminTxInId
152152

153-
mTxOutId =
154-
if Text.null tminTxOutId
155-
then Nothing
156-
else case Text.head tminTxOutId of
157-
'C' ->
158-
Just $
159-
VCTxOutIdW $
160-
Id.TxOutCoreId $
161-
read $
162-
Text.unpack $
163-
Text.tail tminTxOutId
164-
'V' ->
165-
Just $
166-
VATxOutIdW $
167-
Id.TxOutAddressId $
168-
read $
169-
Text.unpack $
170-
Text.tail tminTxOutId
171-
_ -> Nothing
172-
173-
mMaTxOutId =
174-
if Text.null tminMaTxOutId
175-
then Nothing
176-
else case Text.head tminMaTxOutId of
177-
'C' ->
178-
Just $
179-
CMaTxOutIdW $
180-
Id.MaTxOutCoreId $
181-
read $
182-
Text.unpack $
183-
Text.tail tminMaTxOutId
184-
'V' ->
185-
Just $
186-
VMaTxOutIdW $
187-
Id.MaTxOutAddressId $
188-
read $
189-
Text.unpack $
190-
Text.tail tminMaTxOutId
191-
_ -> Nothing
192-
193-
minIds = MinIds mTxInId mTxOutId mMaTxOutId
153+
-- Based on txOutVariantType, parse the appropriate ID types
154+
(mTxOutId, mMaTxOutId, wrapper) = case txOutVariantType of
155+
TxOutVariantCore ->
156+
( if Text.null tminTxOutId
157+
then Nothing
158+
else Just $ VCTxOutIdW $ Id.TxOutCoreId $ read $ Text.unpack tminTxOutId
159+
, if Text.null tminMaTxOutId
160+
then Nothing
161+
else Just $ CMaTxOutIdW $ Id.MaTxOutCoreId $ read $ Text.unpack tminMaTxOutId
162+
, CMinIdsWrapper
163+
)
164+
TxOutVariantAddress ->
165+
( if Text.null tminTxOutId
166+
then Nothing
167+
else Just $ VATxOutIdW $ Id.TxOutAddressId $ read $ Text.unpack tminTxOutId
168+
, if Text.null tminMaTxOutId
169+
then Nothing
170+
else Just $ VMaTxOutIdW $ Id.MaTxOutAddressId $ read $ Text.unpack tminMaTxOutId
171+
, VMinIdsWrapper
172+
)
194173
in
195-
case (txOutVariantType, typeId) of
196-
(TxOutVariantCore, "C") -> Just $ CMinIdsWrapper minIds
197-
(TxOutVariantAddress, "V") -> Just $ VMinIdsWrapper minIds
198-
_otherwise -> Nothing
199-
_otherwise -> Nothing
174+
Just $ wrapper $ MinIds mTxInId mTxOutId mMaTxOutId
175+
_ -> Nothing

cardano-db/src/Cardano/Db/Statement/ConsumedTxOut.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,9 +403,9 @@ findMaxTxInIdStmt =
403403
[ "WITH target_block_no AS ("
404404
, " SELECT MAX(block_no) - $1 AS target_block_no FROM block"
405405
, ")"
406-
, "SELECT MAX(tx.id) AS max_tx_id"
407-
, "FROM tx"
408-
, "INNER JOIN block ON tx.block_id = block.id"
406+
, "SELECT MAX(tx.id) AS max_tx_id "
407+
, "FROM tx "
408+
, "INNER JOIN block ON tx.block_id = block.id "
409409
, "WHERE block.block_no <= (SELECT target_block_no FROM target_block_no)"
410410
]
411411

0 commit comments

Comments
 (0)