Skip to content

Commit f8da18e

Browse files
committed
improve pipelines, add stop_at_block config
1 parent 7663150 commit f8da18e

File tree

23 files changed

+430
-1006
lines changed

23 files changed

+430
-1006
lines changed

cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,8 @@ emptyMetricsSetters =
384384
, metricsSetDbQueueLength = \_ -> pure ()
385385
, metricsSetDbBlockHeight = \_ -> pure ()
386386
, metricsSetDbSlotHeight = \_ -> pure ()
387+
, metricsSetDbEpochSyncDuration = \_ -> pure ()
388+
, metricsSetDbEpochSyncNumber = \_ -> pure ()
387389
}
388390

389391
withFullConfig ::

cardano-db-sync/src/Cardano/DbSync.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ runSyncNode metricsSetters trce iomgr dbConnSetting runNearTipMigrationFnc syncN
216216
syncEnv <-
217217
ExceptT $
218218
mkSyncEnvFromConfig
219+
metricsSetters
219220
trce
220221
dbEnv
221222
syncOptions
@@ -244,7 +245,7 @@ runSyncNode metricsSetters trce iomgr dbConnSetting runNearTipMigrationFnc syncN
244245
liftIO $
245246
race_
246247
-- We split the main thread into two parts to allow for graceful shutdown of the main App db thread.
247-
(runDbThread syncEnv metricsSetters threadChannels)
248+
(runDbThread syncEnv threadChannels)
248249
( mapConcurrently_
249250
id
250251
[ runSyncNodeClient metricsSetters syncEnv iomgr trce threadChannels (enpSocketPath syncNodeParams)

cardano-db-sync/src/Cardano/DbSync/Api.hs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ getCurrentTipBlockNo env = do
297297
Nothing -> pure Origin
298298

299299
mkSyncEnv ::
300+
MetricSetters ->
300301
Trace IO Text ->
301302
DB.DbEnv ->
302303
SyncOptions ->
@@ -308,7 +309,7 @@ mkSyncEnv ::
308309
SyncNodeParams ->
309310
RunMigration ->
310311
IO SyncEnv
311-
mkSyncEnv trce dbEnv syncOptions protoInfo nw nwMagic systemStart syncNodeConfigFromFile syncNP runNearTipMigrationFnc = do
312+
mkSyncEnv metricSetters trce dbEnv syncOptions protoInfo nw nwMagic systemStart syncNodeConfigFromFile syncNP runNearTipMigrationFnc = do
312313
dbCNamesVar <- newTVarIO =<< DB.runDbTransactionIohkNoLogging dbEnv DB.queryRewardAndEpochStakeConstraints
313314
cache <-
314315
if soptCache syncOptions
@@ -356,6 +357,7 @@ mkSyncEnv trce dbEnv syncOptions protoInfo nw nwMagic systemStart syncNodeConfig
356357
pure $
357358
SyncEnv
358359
{ envDbEnv = dbEnv
360+
, envMetricSetters = metricSetters
359361
, envBootstrap = bootstrapVar
360362
, envCache = cache
361363
, envEpochStatistics = epochStatistics
@@ -379,6 +381,7 @@ mkSyncEnv trce dbEnv syncOptions protoInfo nw nwMagic systemStart syncNodeConfig
379381
isTxOutConsumedBootstrap' = isTxOutConsumedBootstrap . sioTxOut . dncInsertOptions
380382

381383
mkSyncEnvFromConfig ::
384+
MetricSetters ->
382385
Trace IO Text ->
383386
DB.DbEnv ->
384387
SyncOptions ->
@@ -388,7 +391,7 @@ mkSyncEnvFromConfig ::
388391
-- | run migration function
389392
RunMigration ->
390393
IO (Either SyncNodeError SyncEnv)
391-
mkSyncEnvFromConfig trce dbEnv syncOptions genCfg syncNodeConfigFromFile syncNodeParams runNearTipMigrationFnc =
394+
mkSyncEnvFromConfig metricsSetters trce dbEnv syncOptions genCfg syncNodeConfigFromFile syncNodeParams runNearTipMigrationFnc =
392395
case genCfg of
393396
GenesisCardano _ bCfg sCfg _ _
394397
| unProtocolMagicId (Byron.configProtocolMagicId bCfg) /= Shelley.sgNetworkMagic (scConfig sCfg) ->
@@ -414,6 +417,7 @@ mkSyncEnvFromConfig trce dbEnv syncOptions genCfg syncNodeConfigFromFile syncNod
414417
| otherwise ->
415418
Right
416419
<$> mkSyncEnv
420+
metricsSetters
417421
trce
418422
dbEnv
419423
syncOptions

cardano-db-sync/src/Cardano/DbSync/Api/Ledger.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ storePage syncEnv percQuantum (n, ls) = do
130130
txOuts <- mapM (prepareTxOut syncEnv) ls
131131
txOutIds <- lift $ DB.insertBulkTxOut False $ etoTxOut . fst <$> txOuts
132132
let maTxOuts = concatMap (mkmaTxOuts txOutVariantType) $ zip txOutIds (snd <$> txOuts)
133-
void . lift $ DB.insertBulkMaTxOut maTxOuts
133+
void . lift $ DB.insertBulkMaTxOutPiped [maTxOuts]
134134
where
135135
txOutVariantType = getTxOutVariantType syncEnv
136136
trce = getTrace syncEnv

cardano-db-sync/src/Cardano/DbSync/Api/Types.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import Cardano.DbSync.Config.Types (SyncNodeConfig)
3232
import Cardano.DbSync.Ledger.Types (HasLedgerEnv)
3333
import Cardano.DbSync.LocalStateQuery (NoLedgerEnv)
3434
import Cardano.DbSync.Types (
35+
MetricSetters,
3536
OffChainPoolResult,
3637
OffChainPoolWorkQueue,
3738
OffChainVoteResult,
@@ -41,6 +42,7 @@ import Cardano.DbSync.Types (
4142
-- | SyncEnv is the main environment for the whole application.
4243
data SyncEnv = SyncEnv
4344
{ envDbEnv :: !DB.DbEnv
45+
, envMetricSetters :: !MetricSetters
4446
, envCache :: !CacheStatus
4547
, envEpochStatistics :: !(StrictTVar IO EpochStatistics)
4648
, envConsistentLevel :: !(StrictTVar IO ConsistentLevel)

cardano-db-sync/src/Cardano/DbSync/Config/Types.hs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ data SyncInsertOptions = SyncInsertOptions
191191
, sioPoolStats :: PoolStatsConfig
192192
, sioJsonType :: JsonTypeConfig
193193
, sioRemoveJsonbFromSchema :: RemoveJsonbFromSchemaConfig
194+
, sioStopAtBlock :: Maybe Word64
194195
}
195196
deriving (Eq, Show)
196197

@@ -458,6 +459,7 @@ parseOverrides obj baseOptions = do
458459
<*> obj .:? "pool_stat" .!= sioPoolStats baseOptions
459460
<*> obj .:? "json_type" .!= sioJsonType baseOptions
460461
<*> obj .:? "remove_jsonb_from_schema" .!= sioRemoveJsonbFromSchema baseOptions
462+
<*> obj .:? "stop_at_block" .!= sioStopAtBlock baseOptions
461463

462464
instance ToJSON SyncInsertConfig where
463465
toJSON (SyncInsertConfig preset options) =
@@ -479,6 +481,7 @@ optionsToList SyncInsertOptions {..} =
479481
, toJsonIfSet "pool_stat" sioPoolStats
480482
, toJsonIfSet "json_type" sioJsonType
481483
, toJsonIfSet "remove_jsonb_from_schema" sioRemoveJsonbFromSchema
484+
, toJsonIfSet "stop_at_block" sioStopAtBlock
482485
]
483486

484487
toJsonIfSet :: ToJSON a => Text -> a -> Maybe Pair
@@ -500,6 +503,7 @@ instance FromJSON SyncInsertOptions where
500503
<*> obj .:? "pool_stat" .!= sioPoolStats def
501504
<*> obj .:? "json_type" .!= sioJsonType def
502505
<*> obj .:? "remove_jsonb_from_schema" .!= sioRemoveJsonbFromSchema def
506+
<*> obj .:? "stop_at_block" .!= sioStopAtBlock def
503507

504508
instance ToJSON SyncInsertOptions where
505509
toJSON SyncInsertOptions {..} =
@@ -516,6 +520,7 @@ instance ToJSON SyncInsertOptions where
516520
, "pool_stat" .= sioPoolStats
517521
, "json_type" .= sioJsonType
518522
, "remove_jsonb_from_schema" .= sioRemoveJsonbFromSchema
523+
, "stop_at_block" .= sioStopAtBlock
519524
]
520525

521526
instance ToJSON RewardsConfig where
@@ -745,6 +750,7 @@ instance Default SyncInsertOptions where
745750
, sioPoolStats = PoolStatsConfig False
746751
, sioJsonType = JsonTypeText
747752
, sioRemoveJsonbFromSchema = RemoveJsonbFromSchemaConfig False
753+
, sioStopAtBlock = Nothing
748754
}
749755

750756
fullInsertOptions :: SyncInsertOptions
@@ -763,6 +769,7 @@ fullInsertOptions =
763769
, sioPoolStats = PoolStatsConfig True
764770
, sioJsonType = JsonTypeText
765771
, sioRemoveJsonbFromSchema = RemoveJsonbFromSchemaConfig False
772+
, sioStopAtBlock = Nothing
766773
}
767774

768775
onlyUTxOInsertOptions :: SyncInsertOptions
@@ -781,6 +788,7 @@ onlyUTxOInsertOptions =
781788
, sioPoolStats = PoolStatsConfig False
782789
, sioJsonType = JsonTypeText
783790
, sioRemoveJsonbFromSchema = RemoveJsonbFromSchemaConfig False
791+
, sioStopAtBlock = Nothing
784792
}
785793

786794
onlyGovInsertOptions :: SyncInsertOptions
@@ -807,6 +815,7 @@ disableAllInsertOptions =
807815
, sioGovernance = GovernanceConfig False
808816
, sioJsonType = JsonTypeText
809817
, sioRemoveJsonbFromSchema = RemoveJsonbFromSchemaConfig False
818+
, sioStopAtBlock = Nothing
810819
}
811820

812821
addressTypeToEnableDisable :: IsString s => TxOutVariantType -> s

cardano-db-sync/src/Cardano/DbSync/Database.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ data NextState
3333

3434
runDbThread ::
3535
SyncEnv ->
36-
MetricSetters ->
3736
ThreadChannels ->
3837
IO ()
39-
runDbThread syncEnv metricsSetters queue = do
38+
runDbThread syncEnv queue = do
4039
logInfo tracer "Starting DB thread"
4140
logException tracer "runDbThread: " processQueue
4241
logInfo tracer "Shutting down DB thread"
@@ -83,6 +82,7 @@ runDbThread syncEnv metricsSetters queue = do
8382
processQueue -- Continue processing
8483
updateBlockMetrics :: IO ()
8584
updateBlockMetrics = do
85+
let metricsSetters = envMetricSetters syncEnv
8686
void $ async $ DB.runDbTransactionIohkNoLogging (envDbEnv syncEnv) $ do
8787
mBlock <- DB.queryLatestBlock
8888
liftIO $ whenJust mBlock $ \block -> do

cardano-db-sync/src/Cardano/DbSync/Default.hs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import qualified Cardano.Db as DB
2929
import Cardano.DbSync.Api
3030
import Cardano.DbSync.Api.Ledger
3131
import Cardano.DbSync.Api.Types (ConsistentLevel (..), InsertOptions (..), LedgerEnv (..), SyncEnv (..), SyncOptions (..))
32+
import Cardano.DbSync.Config.Types (dncInsertOptions, sioStopAtBlock)
3233
import Cardano.DbSync.DbEvent (runDbSyncTransaction)
3334
import Cardano.DbSync.Epoch (epochHandler)
3435
import Cardano.DbSync.Era.Byron.Insert (insertByronBlock)
@@ -37,7 +38,7 @@ import Cardano.DbSync.Era.Universal.Block (insertBlockUniversal)
3738
import Cardano.DbSync.Era.Universal.Epoch (hasEpochStartEvent, hasNewEpochEvent)
3839
import Cardano.DbSync.Era.Universal.Insert.Certificate (mkAdaPots)
3940
import Cardano.DbSync.Era.Universal.Insert.LedgerEvent (insertNewEpochLedgerEvents)
40-
import Cardano.DbSync.Error (SyncNodeError (..))
41+
import Cardano.DbSync.Error (SyncNodeError (..), mkSyncNodeCallStack)
4142
import Cardano.DbSync.Ledger.State (applyBlockAndSnapshot, defaultApplyResult)
4243
import Cardano.DbSync.Ledger.Types (ApplyResult (..))
4344
import Cardano.DbSync.LocalStateQuery
@@ -46,30 +47,13 @@ import Cardano.DbSync.Types
4647
import Cardano.DbSync.Util
4748
import Cardano.DbSync.Util.Constraint (addConstraintsIfNotExist)
4849

49-
-- insertListBlocks ::
50-
-- SyncEnv ->
51-
-- [CardanoBlock] ->
52-
-- IO (Either SyncNodeError ())
53-
-- insertListBlocks syncEnv blocks = do
54-
-- runDbSyncTransaction (getTrace syncEnv) (envDbEnv syncEnv) $ do
55-
-- traverse_ (applyAndInsertBlockMaybe syncEnv (getTrace syncEnv)) blocks
56-
5750
insertListBlocks ::
5851
SyncEnv ->
5952
[CardanoBlock] ->
6053
IO (Either SyncNodeError ())
61-
insertListBlocks syncEnv = go
62-
where
63-
go [] = pure $ Right ()
64-
go (block : rest) = do
65-
result <- processBlock block
66-
case result of
67-
Left err -> pure $ Left err
68-
Right _ -> go rest
69-
70-
processBlock block =
71-
runDbSyncTransaction (getTrace syncEnv) (envDbEnv syncEnv) $
72-
applyAndInsertBlockMaybe syncEnv (getTrace syncEnv) block
54+
insertListBlocks syncEnv blocks = do
55+
runDbSyncTransaction (getTrace syncEnv) (envDbEnv syncEnv) $ do
56+
traverse_ (applyAndInsertBlockMaybe syncEnv (getTrace syncEnv)) blocks
7357

7458
applyAndInsertBlockMaybe ::
7559
SyncEnv ->
@@ -142,6 +126,17 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
142126
let !withinTwoMin = isWithinTwoMin details
143127
let !withinHalfHour = isWithinHalfHour details
144128
insertNewEpochLedgerEvents syncEnv (sdEpochNo details) (apEvents applyResult)
129+
130+
-- Check stop condition after successful block insertion
131+
let stopAtBlock = sioStopAtBlock $ dncInsertOptions $ envSyncNodeConfig syncEnv
132+
case stopAtBlock of
133+
Just targetBlock | unBlockNo blkNo >= targetBlock -> do
134+
liftIO $
135+
logInfo tracer $
136+
"Reached stop condition at block " <> textShow targetBlock <> ". Stopping db-sync gracefully."
137+
throwError $ SNErrDefault (mkSyncNodeCallStack "insertBlock") "Stop condition reached"
138+
_ -> pure ()
139+
145140
let isNewEpochEvent = hasNewEpochEvent (apEvents applyResult)
146141
let isStartEventOrRollback = hasEpochStartEvent (apEvents applyResult) || firstAfterRollback
147142
let isMember poolId = Set.member poolId (apPoolsRegistered applyResult)

cardano-db-sync/src/Cardano/DbSync/Era/Byron/Insert.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ insertByronTx' syncEnv blkId tx blockIndex = do
317317
-- Update consumed TxOut records if enabled
318318
whenConsumeOrPruneTxOut syncEnv $
319319
lift $
320-
DB.updateListTxOutConsumedByTxId (prepUpdate txId <$> resolvedInputs)
320+
DB.updateListTxOutConsumedByTxIdBP [prepUpdate txId <$> resolvedInputs]
321321

322322
-- Return fee amount for caching/epoch calculations
323323
pure $ unDbLovelace $ vfFee valFee

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Generic/StakeDist.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ genericStakeSlice pInfo epochBlockNo lstate isMigration
136136
epochSliceSize =
137137
max minSliceSize defaultEpochSliceSize
138138
where
139-
-- On mainnet this is 21600
139+
-- On mainnet this is 2160
140140
expectedBlocks :: Word64
141141
expectedBlocks = 10 * k
142142

0 commit comments

Comments
 (0)