Skip to content

Commit 1f39c5e

Browse files
committed
Improve concurrency and exceptions handling
Fixes #1438. When connection with node restarts, the db-sync threads are no longer killed. Also when an exception is thrown in a DBSync thread, it is no longer caught by the chainsync client
1 parent 1f74478 commit 1f39c5e

File tree

12 files changed

+287
-264
lines changed

12 files changed

+287
-264
lines changed

cardano-db-sync/cardano-db-sync.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ library
124124

125125
build-depends: base >= 4.14 && < 4.17
126126
, aeson
127+
, async
127128
, binary
128129
, bytestring
129130
, base16-bytestring

cardano-db-sync/src/Cardano/DbSync.hs

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,26 @@ module Cardano.DbSync (
2727
) where
2828

2929
import Cardano.BM.Trace (Trace, logError, logInfo, logWarning)
30+
import qualified Cardano.Crypto as Crypto
3031
import Cardano.Db (textShow)
3132
import qualified Cardano.Db as Db
3233
import Cardano.DbSync.Api
33-
import Cardano.DbSync.Api.Types (InsertOptions (..), SyncOptions (..))
34-
import Cardano.DbSync.Config (configureLogging)
34+
import Cardano.DbSync.Api.Types (InsertOptions (..), RunMigration, SyncOptions (..), envLedgerEnv)
35+
import Cardano.DbSync.Config (configureLogging, readSyncNodeConfig)
36+
import Cardano.DbSync.Config.Cardano
3537
import Cardano.DbSync.Config.Types (
3638
ConfigFile (..),
3739
GenesisFile (..),
3840
LedgerStateDir (..),
3941
NetworkName (..),
4042
SocketPath (..),
4143
SyncCommand (..),
44+
SyncNodeConfig (..),
4245
SyncNodeParams (..),
4346
)
47+
import Cardano.DbSync.Database
48+
import Cardano.DbSync.DbAction
49+
import Cardano.DbSync.Era
4450
import Cardano.DbSync.Era.Shelley.Offline.Http (
4551
FetchError (..),
4652
SimplifiedPoolOfflineData (..),
@@ -49,19 +55,27 @@ import Cardano.DbSync.Era.Shelley.Offline.Http (
4955
renderFetchError,
5056
spodJson,
5157
)
58+
import Cardano.DbSync.Error
59+
import Cardano.DbSync.Ledger.State
5260
import Cardano.DbSync.Rollback (unsafeRollback)
53-
import Cardano.DbSync.Sync (runSyncNode)
61+
import Cardano.DbSync.Sync (runSyncNodeClient)
5462
import Cardano.DbSync.Tracing.ToObjectOrphans ()
5563
import Cardano.DbSync.Types
5664
import Cardano.DbSync.Util (readAbortOnPanic)
5765
import Cardano.Prelude hiding (Nat, (%))
66+
import Cardano.Slotting.Slot (EpochNo (..))
67+
import Control.Concurrent.Async
5868
import Control.Monad.Extra (whenJust)
5969
import Control.Monad.Trans.Except.Exit (orDie)
6070
import Control.Monad.Trans.Except.Extra (newExceptT)
6171
import qualified Data.Text as Text
6272
import Data.Version (showVersion)
73+
import Database.Persist.Postgresql (ConnectionString, withPostgresqlConn)
74+
import qualified Ouroboros.Consensus.HardFork.Simple as HardFork
6375
import Ouroboros.Network.NodeToClient (IOManager, withIOManager)
6476
import Paths_cardano_db_sync (version)
77+
import System.Directory (createDirectoryIfMissing)
78+
import Prelude (id)
6579

6680
runDbSyncNode :: MetricSetters -> [(Text, Text)] -> SyncNodeParams -> IO ()
6781
runDbSyncNode metricsSetters knownMigrations params =
@@ -137,6 +151,78 @@ runDbSync metricsSetters knownMigrations iomgr trce params aop = do
137151

138152
syncOpts = extractSyncOptions params aop
139153

154+
runSyncNode ::
155+
MetricSetters ->
156+
Trace IO Text ->
157+
IOManager ->
158+
ConnectionString ->
159+
-- | migrations were ran on startup
160+
Bool ->
161+
-- | run migration function
162+
RunMigration ->
163+
SyncNodeParams ->
164+
SyncOptions ->
165+
IO ()
166+
runSyncNode metricsSetters trce iomgr dbConnString ranMigrations runMigrationFnc syncNodeParams syncOptions = do
167+
syncNodeConfig <- readSyncNodeConfig configFile
168+
whenJust maybeLedgerDir $
169+
\enpLedgerStateDir -> do
170+
createDirectoryIfMissing True (unLedgerStateDir enpLedgerStateDir)
171+
172+
logInfo trce $ "Using byron genesis file from: " <> (show . unGenesisFile $ dncByronGenesisFile syncNodeConfig)
173+
logInfo trce $ "Using shelley genesis file from: " <> (show . unGenesisFile $ dncShelleyGenesisFile syncNodeConfig)
174+
logInfo trce $ "Using alonzo genesis file from: " <> (show . unGenesisFile $ dncAlonzoGenesisFile syncNodeConfig)
175+
Db.runIohkLogging trce $
176+
withPostgresqlConn dbConnString $ \backend -> liftIO $ do
177+
orDie renderSyncNodeError $ do
178+
genCfg <- readCardanoGenesisConfig syncNodeConfig
179+
logProtocolMagicId trce $ genesisProtocolMagicId genCfg
180+
181+
syncEnv <-
182+
ExceptT $
183+
mkSyncEnvFromConfig
184+
trce
185+
dbConnString
186+
backend
187+
syncOptions
188+
genCfg
189+
syncNodeParams
190+
ranMigrations
191+
runMigrationFnc
192+
liftIO $ runExtraMigrationsMaybe syncEnv
193+
unless (enpShouldUseLedger syncNodeParams) $ liftIO $ do
194+
logInfo trce "Migrating to a no ledger schema"
195+
Db.noLedgerMigrations backend trce
196+
insertValidateGenesisDist syncEnv (dncNetworkName syncNodeConfig) genCfg (useShelleyInit syncNodeConfig)
197+
198+
-- communication channel between datalayer thread and chainsync-client thread
199+
threadChannels <- liftIO newThreadChannels
200+
liftIO $
201+
mapConcurrently_
202+
id
203+
[ runDbThread syncEnv metricsSetters threadChannels
204+
, runSyncNodeClient metricsSetters syncEnv iomgr trce threadChannels (enpSocketPath syncNodeParams)
205+
, runOfflineFetchThread syncEnv
206+
, runLedgerStateWriteThread (getTrace syncEnv) (envLedgerEnv syncEnv)
207+
]
208+
where
209+
useShelleyInit :: SyncNodeConfig -> Bool
210+
useShelleyInit cfg =
211+
case dncShelleyHardFork cfg of
212+
HardFork.TriggerHardForkAtEpoch (EpochNo 0) -> True
213+
_ -> False
214+
215+
configFile = enpConfigFile syncNodeParams
216+
maybeLedgerDir = enpMaybeLedgerStateDir syncNodeParams
217+
218+
logProtocolMagicId :: Trace IO Text -> Crypto.ProtocolMagicId -> ExceptT SyncNodeError IO ()
219+
logProtocolMagicId tracer pm =
220+
liftIO . logInfo tracer $
221+
mconcat
222+
[ "NetworkMagic: "
223+
, textShow (Crypto.unProtocolMagicId pm)
224+
]
225+
140226
-- -------------------------------------------------------------------------------------------------
141227

142228
extractSyncOptions :: SyncNodeParams -> Bool -> SyncOptions

cardano-db-sync/src/Cardano/DbSync/Api.hs

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ module Cardano.DbSync.Api (
2929
getHasConsumed,
3030
getPrunes,
3131
mkSyncEnvFromConfig,
32-
replaceConnection,
3332
verifySnapshotPoint,
34-
getBackend,
3533
getInsertOptions,
3634
getTrace,
3735
getTopLevelConfig,
@@ -155,8 +153,7 @@ runExtraMigrationsMaybe env = do
155153
extraMigr <- liftIO $ readTVarIO $ envExtraMigrations env
156154
logInfo (getTrace env) $ textShow extraMigr
157155
unless (emRan extraMigr) $ do
158-
backend <- getBackend env
159-
DB.runDbIohkNoLogging backend $
156+
DB.runDbIohkNoLogging (envBackend env) $
160157
DB.runExtraMigrations
161158
(getTrace env)
162159
(getSafeBlockNoDiff env)
@@ -199,10 +196,6 @@ defaultInsertOptions = fullInsertOptions
199196
turboInsertOptions :: InsertOptions
200197
turboInsertOptions = InsertOptions False False False False
201198

202-
replaceConnection :: SyncEnv -> SqlBackend -> IO ()
203-
replaceConnection env sqlBackend = do
204-
atomically $ writeTVar (envBackend env) $ Strict.Just sqlBackend
205-
206199
initEpochState :: EpochState
207200
initEpochState =
208201
EpochState
@@ -259,13 +252,6 @@ getInsertOptions = soptInsertOptions . envOptions
259252
getSlotHash :: SqlBackend -> SlotNo -> IO [(SlotNo, ByteString)]
260253
getSlotHash backend = DB.runDbIohkNoLogging backend . DB.querySlotHash
261254

262-
getBackend :: SyncEnv -> IO SqlBackend
263-
getBackend env = do
264-
mBackend <- readTVarIO $ envBackend env
265-
case mBackend of
266-
Strict.Just conn -> pure conn
267-
Strict.Nothing -> panic "sql connection not initiated"
268-
269255
hasLedgerState :: SyncEnv -> Bool
270256
hasLedgerState syncEnv =
271257
case envLedgerEnv syncEnv of
@@ -287,18 +273,16 @@ getDbLatestBlockInfo backend = do
287273
}
288274

289275
getDbTipBlockNo :: SyncEnv -> IO (Point.WithOrigin BlockNo)
290-
getDbTipBlockNo env =
291-
getBackend env
292-
>>= getDbLatestBlockInfo
293-
<&> maybe Point.Origin (Point.At . bBlockNo)
276+
getDbTipBlockNo env = do
277+
mblk <- getDbLatestBlockInfo (envBackend env)
278+
pure $ maybe Point.Origin (Point.At . bBlockNo) mblk
294279

295280
logDbState :: SyncEnv -> IO ()
296281
logDbState env = do
297-
backend <- getBackend env
298-
mblk <- getDbLatestBlockInfo backend
282+
mblk <- getDbLatestBlockInfo (envBackend env)
299283
case mblk of
300-
Nothing -> logInfo tracer "Cardano.Db is empty"
301-
Just tip -> logInfo tracer $ mconcat ["Cardano.Db tip is at ", showTip tip]
284+
Nothing -> logInfo tracer "Database is empty"
285+
Just tip -> logInfo tracer $ mconcat ["Database tip is at ", showTip tip]
302286
where
303287
showTip :: TipInfo -> Text
304288
showTip tipInfo =
@@ -314,15 +298,15 @@ logDbState env = do
314298

315299
getCurrentTipBlockNo :: SyncEnv -> IO (WithOrigin BlockNo)
316300
getCurrentTipBlockNo env = do
317-
backend <- getBackend env
318-
maybeTip <- getDbLatestBlockInfo backend
301+
maybeTip <- getDbLatestBlockInfo (envBackend env)
319302
case maybeTip of
320303
Just tip -> pure $ At (bBlockNo tip)
321304
Nothing -> pure Origin
322305

323306
mkSyncEnv ::
324307
Trace IO Text ->
325308
ConnectionString ->
309+
SqlBackend ->
326310
SyncOptions ->
327311
ProtocolInfo IO CardanoBlock ->
328312
Ledger.Network ->
@@ -332,9 +316,8 @@ mkSyncEnv ::
332316
Bool ->
333317
RunMigration ->
334318
IO SyncEnv
335-
mkSyncEnv trce connString syncOptions protoInfo nw nwMagic systemStart syncNodeParams ranMigrations runMigrationFnc = do
319+
mkSyncEnv trce connString backend syncOptions protoInfo nw nwMagic systemStart syncNodeParams ranMigrations runMigrationFnc = do
336320
cache <- if soptCache syncOptions then newEmptyCache 250000 50000 else pure uninitiatedCache
337-
backendVar <- newTVarIO Strict.Nothing
338321
consistentLevelVar <- newTVarIO Unchecked
339322
fixDataVar <- newTVarIO $ if ranMigrations then DataFixRan else NoneFixRan
340323
indexesVar <- newTVarIO $ enpForceIndexes syncNodeParams
@@ -370,7 +353,7 @@ mkSyncEnv trce connString syncOptions protoInfo nw nwMagic systemStart syncNodeP
370353
, envSystemStart = systemStart
371354
, envConnString = connString
372355
, envRunDelayedMigration = runMigrationFnc
373-
, envBackend = backendVar
356+
, envBackend = backend
374357
, envOptions = syncOptions
375358
, envConsistentLevel = consistentLevelVar
376359
, envIsFixed = fixDataVar
@@ -387,6 +370,7 @@ mkSyncEnv trce connString syncOptions protoInfo nw nwMagic systemStart syncNodeP
387370
mkSyncEnvFromConfig ::
388371
Trace IO Text ->
389372
ConnectionString ->
373+
SqlBackend ->
390374
SyncOptions ->
391375
GenesisConfig ->
392376
SyncNodeParams ->
@@ -395,7 +379,7 @@ mkSyncEnvFromConfig ::
395379
-- | run migration function
396380
RunMigration ->
397381
IO (Either SyncNodeError SyncEnv)
398-
mkSyncEnvFromConfig trce connString syncOptions genCfg syncNodeParams ranMigration runMigrationFnc =
382+
mkSyncEnvFromConfig trce connString backend syncOptions genCfg syncNodeParams ranMigration runMigrationFnc =
399383
case genCfg of
400384
GenesisCardano _ bCfg sCfg _
401385
| unProtocolMagicId (Byron.configProtocolMagicId bCfg) /= Shelley.sgNetworkMagic (scConfig sCfg) ->
@@ -419,6 +403,7 @@ mkSyncEnvFromConfig trce connString syncOptions genCfg syncNodeParams ranMigrati
419403
<$> mkSyncEnv
420404
trce
421405
connString
406+
backend
422407
syncOptions
423408
(mkProtocolInfoCardano genCfg [])
424409
(Shelley.sgNetworkId $ scConfig sCfg)
@@ -437,8 +422,7 @@ getLatestPoints env = do
437422
verifySnapshotPoint env snapshotPoints
438423
NoLedger _ -> do
439424
-- Brings the 5 latest.
440-
dbBackend <- getBackend env
441-
lastPoints <- DB.runDbIohkNoLogging dbBackend DB.queryLatestPoints
425+
lastPoints <- DB.runDbIohkNoLogging (envBackend env) DB.queryLatestPoints
442426
pure $ mapMaybe convert lastPoints
443427
where
444428
convert (Nothing, _) = Nothing
@@ -450,8 +434,7 @@ verifySnapshotPoint env snapPoints =
450434
where
451435
validLedgerFileToPoint :: SnapshotPoint -> IO (Maybe (CardanoPoint, Bool))
452436
validLedgerFileToPoint (OnDisk lsf) = do
453-
backend <- getBackend env
454-
hashes <- getSlotHash backend (lsfSlotNo lsf)
437+
hashes <- getSlotHash (envBackend env) (lsfSlotNo lsf)
455438
let valid = find (\(_, h) -> lsfHash lsf == hashToAnnotation h) hashes
456439
case valid of
457440
Just (slot, hash) | slot == lsfSlotNo lsf -> pure $ convertToDiskPoint slot hash
@@ -460,8 +443,7 @@ verifySnapshotPoint env snapPoints =
460443
case pnt of
461444
GenesisPoint -> pure Nothing
462445
BlockPoint slotNo hsh -> do
463-
backend <- getBackend env
464-
hashes <- getSlotHash backend slotNo
446+
hashes <- getSlotHash (envBackend env) slotNo
465447
let valid = find (\(_, dbHash) -> getHeaderHash hsh == dbHash) hashes
466448
case valid of
467449
Just (dbSlotNo, _) | slotNo == dbSlotNo -> pure $ Just (pnt, True)

cardano-db-sync/src/Cardano/DbSync/Api/Types.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ data SyncEnv = SyncEnv
3939
, envSystemStart :: !SystemStart
4040
, envConnString :: ConnectionString
4141
, envRunDelayedMigration :: RunMigration
42-
, envBackend :: !(StrictTVar IO (Strict.Maybe SqlBackend))
42+
, envBackend :: !SqlBackend
4343
, envConsistentLevel :: !(StrictTVar IO ConsistentLevel)
4444
, envIsFixed :: !(StrictTVar IO FixesRan)
4545
, envIndexes :: !(StrictTVar IO Bool)

0 commit comments

Comments
 (0)