This repository was archived by the owner on Nov 24, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 113
Expand file tree
/
Copy pathChainwebNode.hs
More file actions
554 lines (486 loc) · 19.8 KB
/
ChainwebNode.hs
File metadata and controls
554 lines (486 loc) · 19.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
-- |
-- Module: ChainwebNode
-- Copyright: Copyright © 2018 Kadena LLC.
-- License: MIT
-- Maintainer: Lars Kuhtz <lars@kadena.io>
-- Stability: experimental
--
-- TODO
--
module Main
(
-- * Configuration
ChainwebNodeConfiguration(..)
-- * Monitor
, runCutMonitor
, runRtsMonitor
-- * Chainweb Node
, node
, withNodeLogger
-- * Main function
, main
) where
import Configuration.Utils hiding (Error)
import Configuration.Utils.Validation (validateFilePath)
import Control.Concurrent
import Control.Concurrent.Async
import Control.DeepSeq
import Control.Exception
import Control.Lens hiding ((.=))
import Control.Monad
import Control.Monad.Managed
import Data.Maybe
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time
import Data.Typeable
import GHC.Generics hiding (from)
import GHC.Stack
import GHC.Stats
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTPS
import System.Directory
import System.FilePath
import System.IO
import qualified System.Logger as L
import System.LogLevel
import System.Mem
-- internal modules
import Chainweb.BlockHeader
import Chainweb.BlockHeaderDB.PruneForks (PruneStats(..))
import Chainweb.Chainweb
import Chainweb.Chainweb.Configuration
import Chainweb.Chainweb.CutResources
import Chainweb.Counter
import Chainweb.Cut.CutHashes
import Chainweb.CutDB
import Chainweb.Logger
import Chainweb.Logging.Config
import Chainweb.Logging.Miner
import Chainweb.Pact.Mempool.InMemTypes (MempoolStats(..))
import Chainweb.Pact.Backend.DbCache (DbCacheStats)
import Chainweb.Pact.RestAPI.Server (PactCmdLog(..))
import Chainweb.Pact.Types
import Chainweb.Time
import Data.Time.Format.ISO8601
import Chainweb.Utils
import Chainweb.Utils.RequestLog
import Chainweb.Version
import Chainweb.Version.Mainnet
import Chainweb.Version.Testnet04 (testnet04)
import Chainweb.Storage.Table.RocksDB
import Data.LogMessage
import P2P.Node
import PkgInfo
import Utils.Logging
import Utils.Logging.Config
import Utils.Logging.Trace
import Utils.CheckRLimits
import Utils.InstallSignalHandlers
-- -------------------------------------------------------------------------- --
-- Configuration
data ChainwebNodeConfiguration = ChainwebNodeConfiguration
{ _nodeConfigChainweb :: !ChainwebConfiguration
, _nodeConfigLog :: !LogConfig
, _nodeConfigDatabaseDirectory :: !(Maybe FilePath)
}
deriving (Show, Eq, Generic)
makeLenses ''ChainwebNodeConfiguration
defaultChainwebNodeConfiguration :: ChainwebNodeConfiguration
defaultChainwebNodeConfiguration = ChainwebNodeConfiguration
{ _nodeConfigChainweb = defaultChainwebConfiguration Mainnet01
, _nodeConfigLog = defaultLogConfig
& logConfigLogger . L.loggerConfigThreshold .~ level
, _nodeConfigDatabaseDirectory = Nothing
}
where
level = L.Info
validateChainwebNodeConfiguration :: ConfigValidation ChainwebNodeConfiguration []
validateChainwebNodeConfiguration o = do
validateLogConfig $ _nodeConfigLog o
validateChainwebConfiguration $ _nodeConfigChainweb o
mapM_ (validateFilePath "databaseDirectory") (_nodeConfigDatabaseDirectory o)
instance ToJSON ChainwebNodeConfiguration where
toJSON o = object
[ "chainweb" .= _nodeConfigChainweb o
, "logging" .= _nodeConfigLog o
, "databaseDirectory" .= _nodeConfigDatabaseDirectory o
]
instance FromJSON (ChainwebNodeConfiguration -> ChainwebNodeConfiguration) where
parseJSON = withObject "ChainwebNodeConfig" $ \o -> id
<$< nodeConfigChainweb %.: "chainweb" % o
<*< nodeConfigLog %.: "logging" % o
<*< nodeConfigDatabaseDirectory ..: "databaseDirectory" % o
pChainwebNodeConfiguration :: MParser ChainwebNodeConfiguration
pChainwebNodeConfiguration = id
<$< nodeConfigChainweb %:: pChainwebConfiguration
<*< parserOptionGroup "Logging" (nodeConfigLog %:: pLogConfig)
<*< nodeConfigDatabaseDirectory .:: fmap Just % textOption
% long "database-directory"
<> help "directory where the databases are persisted"
getRocksDbDir :: HasCallStack => ChainwebNodeConfiguration -> IO FilePath
getRocksDbDir conf = (\base -> base </> "0" </> "rocksDb") <$> getDbBaseDir conf
getPactDbDir :: HasCallStack => ChainwebNodeConfiguration -> IO FilePath
getPactDbDir conf = (\base -> base </> "0" </> "sqlite") <$> getDbBaseDir conf
getBackupsDir :: HasCallStack => ChainwebNodeConfiguration -> IO FilePath
getBackupsDir conf = (</> "backups") <$> getDbBaseDir conf
getDbBaseDir :: HasCallStack => ChainwebNodeConfiguration -> IO FilePath
getDbBaseDir conf = case _nodeConfigDatabaseDirectory conf of
Nothing -> getXdgDirectory XdgData
$ "chainweb-node" </> sshow (_versionName v)
Just d -> return d
where
v = _configChainwebVersion $ _nodeConfigChainweb conf
-- -------------------------------------------------------------------------- --
-- Monitors
-- | Run a monitor function with a logger forever. If the monitor function exist
-- or fails the event is logged and the function is restarted.
--
-- In order to prevent the function to spin in case of a persistent failure
-- cause, only 10 immediate restart are allowed. After that restart is throttled
-- to at most one restart every 10 seconds.
--
runMonitorLoop :: Logger logger => Text -> logger -> IO () -> IO ()
runMonitorLoop actionLabel logger = runForeverThrottled
(logFunction logger)
actionLabel
10 -- 10 bursts in case of failure
(10 * mega) -- allow restart every 10 seconds in case of failure
runCutMonitor
:: HasVersion
=> Logger logger
=> logger
-> CutDb logger
-> IO ()
runCutMonitor logger db = L.withLoggerLabel ("component", "cut-monitor") logger $ \l ->
runMonitorLoop "ChainwebNode.runCutMonitor" l $ do
logFunctionJson l Info . cutToCutHashes Nothing
=<< _cut db
threadDelay 15_000_000
data BlockUpdate = BlockUpdate
{ _blockUpdateBlockHeader :: !(ObjectEncoded BlockHeader)
, _blockUpdateOrphaned :: !Bool
, _blockUpdateTxCount :: !Int
, _blockUpdateDifficultyDouble :: !Double
}
deriving (Show, Eq, Ord, Generic, NFData)
instance HasVersion => ToJSON BlockUpdate where
toEncoding o = pairs
$ "header" .= _blockUpdateBlockHeader o
<> "orphaned" .= _blockUpdateOrphaned o
<> "txCount" .= _blockUpdateTxCount o
<> "difficultyDouble" .= _blockUpdateDifficultyDouble o
toJSON o = object
[ "header" .= _blockUpdateBlockHeader o
, "orphaned" .= _blockUpdateOrphaned o
, "txCount" .= _blockUpdateTxCount o
, "difficultyDouble" .= _blockUpdateDifficultyDouble o
]
{-# INLINE toEncoding #-}
{-# INLINE toJSON #-}
-- runBlockUpdateMonitor :: Logger logger => logger -> CutDb -> IO ()
-- runBlockUpdateMonitor logger db = L.withLoggerLabel ("component", "block-update-monitor") logger $ \l ->
-- runMonitorLoop "ChainwebNode.runBlockUpdateMonitor" l $ do
-- blockDiffStream db
-- & S.mapM toUpdate
-- & S.mapM_ (logFunctionJson l Info)
-- where
-- toUpdate :: Either BlockHeader BlockHeader -> IO BlockUpdate
-- toUpdate (Right bh) = BlockUpdate
-- <$> pure (ObjectEncoded bh) -- _blockUpdateBlockHeader
-- <*> pure False -- _blockUpdateOrphaned
-- <*> txCount bh -- _blockUpdateTxCount
-- <*> pure (difficultyToDouble (targetToDifficulty (view blockTarget bh))) -- _blockUpdateDifficultyDouble
-- toUpdate (Left bh) = BlockUpdate
-- <$> pure (ObjectEncoded bh) -- _blockUpdateBlockHeader
-- <*> pure True -- _blockUpdateOrphaned
-- <*> ((0 -) <$> txCount bh) -- _blockUpdateTxCount
-- <*> pure (difficultyToDouble (targetToDifficulty (view blockTarget bh))) -- _blockUpdateDifficultyDouble
-- type CutLog = HM.HashMap ChainId (ObjectEncoded BlockHeader)
-- This instances are OK, since this is the "Main" module of an application
--
deriving instance NFData GCDetails
deriving instance NFData RTSStats
deriving instance ToJSON GCDetails
deriving instance ToJSON RTSStats
runRtsMonitor :: Logger logger => logger -> IO ()
runRtsMonitor logger = L.withLoggerLabel ("component", "rts-monitor") logger go
where
go l = getRTSStatsEnabled >>= \case
False -> do
logFunctionText l Warn "RTS Stats isn't enabled. Run with '+RTS -T' to enable it."
True -> do
runMonitorLoop "Chainweb.Node.runRtsMonitor" l $ do
logFunctionText l Debug "logging RTS stats"
stats <- getRTSStats
logFunctionJson logger Info stats
approximateThreadDelay 60_000_000 {- 1 minute -}
runQueueMonitor :: Logger logger => logger -> CutDb logger -> IO ()
runQueueMonitor logger cutDb = L.withLoggerLabel ("component", "queue-monitor") logger go
where
go l = do
runMonitorLoop "ChainwebNode.runQueueMonitor" l $ do
logFunctionText l Debug "logging cut queue stats"
stats <- getQueueStats cutDb
logFunctionJson logger Info stats
approximateThreadDelay 60_000_000 {- 1 minute -}
data DbStats = DbStats
{ dbStatsName :: !Text
, dbStatsSize :: !Integer
} deriving (Generic, NFData, ToJSON)
runDatabaseMonitor :: Logger logger => logger -> FilePath -> FilePath -> IO ()
runDatabaseMonitor logger rocksDbDir pactDbDir = L.withLoggerLabel ("component", "database-monitor") logger go
where
go l = do
runMonitorLoop "ChainwebNode.runDatabaseMonitor" l $ do
logFunctionText l Debug "logging database stats"
logFunctionJson l Info . DbStats "rocksDb" =<< sizeOf rocksDbDir
logFunctionJson l Info . DbStats "pactDb" =<< sizeOf pactDbDir
approximateThreadDelay 1_200_000_000 {- 20 minutes -}
sizeOf path = do
dir <- doesDirectoryExist path
file <- doesFileExist path
if dir then
fmap sum . traverse (sizeOf . (path </>)) =<< listDirectory path
else if file then
getFileSize path
else
pure 0
-- -------------------------------------------------------------------------- --
-- Run Node
node :: HasCallStack => HasVersion => Logger logger => ChainwebNodeConfiguration -> logger -> IO ()
node conf logger = do
rocksDbDir <- getRocksDbDir conf
pactDbDir <- getPactDbDir conf
dbBackupsDir <- getBackupsDir conf
withRocksDb rocksDbDir modernDefaultOptions $ \rocksDb -> do
logFunctionText logger Info $ "opened rocksdb in directory " <> sshow rocksDbDir
logFunctionText logger Debug $ "backup config: " <> sshow (_configBackup cwConf)
withChainweb cwConf logger rocksDb pactDbDir dbBackupsDir $ \case
RewoundToCut _ -> return ()
StartedChainweb cw -> do
let telemetryEnabled =
_enableConfigEnabled $ _logConfigTelemetryBackend $ _nodeConfigLog conf
concurrentlies_
[ runChainweb cw (\_ -> return ())
-- we should probably push 'onReady' deeper here but this should be ok
, when telemetryEnabled $
runCutMonitor (_chainwebLogger cw) (_cutResCutDb $ _chainwebCutResources cw)
, when telemetryEnabled $
runQueueMonitor (_chainwebLogger cw) (_cutResCutDb $ _chainwebCutResources cw)
, when telemetryEnabled $
runRtsMonitor (_chainwebLogger cw)
-- , when telemetryEnabled $
-- runBlockUpdateMonitor (_chainwebLogger cw) (_cutResCutDb $ _chainwebCutResources cw)
, when telemetryEnabled $
runDatabaseMonitor (_chainwebLogger cw) rocksDbDir pactDbDir
]
where
cwConf = _nodeConfigChainweb conf
withNodeLogger
:: HasVersion
=> LogConfig
-> ChainwebConfiguration
-> ChainwebVersion
-> (L.Logger SomeLogMessage -> IO ())
-> IO ()
withNodeLogger logCfg chainwebCfg v f = runManaged $ do
-- This manager is used only for logging backends
mgr <- liftIO HTTPS.newTlsManager
-- Base Backend
baseBackend <- managed
$ withBaseHandleBackend "ChainwebApp" mgr pkgInfoScopes (_logConfigBackend logCfg)
baseBackend
-- we don't log tx failures in replay
let !txFailureHandler =
if isJust (_cutInitialCutFile (_configCuts chainwebCfg))
|| isJust (_cutInitialBlockHeightLimit (_configCuts chainwebCfg))
then [dropLogHandler (Proxy :: Proxy PactTxFailureLog)]
else []
-- Telemetry Backends
monitorBackend <- managed
$ mkTelemetryLogger @CutHashes mgr teleLogConfig
p2pInfoBackend <- managed
$ mkTelemetryLogger @P2pSessionInfo mgr teleLogConfig
rtsBackend <- managed
$ mkTelemetryLogger @RTSStats mgr teleLogConfig
counterBackend <- managed $ configureHandler
(withJsonHandleBackend @CounterLog "counters" mgr pkgInfoScopes)
teleLogConfig
endpointBackend <- managed
$ mkTelemetryLogger @PactCmdLog mgr teleLogConfig
newBlockBackend <- managed
$ mkTelemetryLogger @NewMinedBlock mgr teleLogConfig
orphanedBlockBackend <- managed
$ mkTelemetryLogger @OrphanedBlock mgr teleLogConfig
-- miningStatsBackend <- managed
-- $ mkTelemetryLogger @MiningStats mgr teleLogConfig
requestLogBackend <- managed
$ mkTelemetryLogger @RequestResponseLog mgr teleLogConfig
queueStatsBackend <- managed
$ mkTelemetryLogger @QueueStats mgr teleLogConfig
traceBackend <- managed
$ mkTelemetryLogger @Trace mgr teleLogConfig
mempoolStatsBackend <- managed
$ mkTelemetryLogger @MempoolStats mgr teleLogConfig
blockUpdateBackend <- managed
$ mkTelemetryLogger @BlockUpdate mgr teleLogConfig
dbCacheBackend <- managed
$ mkTelemetryLogger @DbCacheStats mgr teleLogConfig
dbStatsBackend <- managed
$ mkTelemetryLogger @DbStats mgr teleLogConfig
p2pNodeStatsBackend <- managed
$ mkTelemetryLogger @P2pNodeStats mgr teleLogConfig
topLevelStatusBackend <- managed
$ mkTelemetryLogger @ChainwebStatus mgr teleLogConfig
pruneStatsBackend <- managed
$ mkTelemetryLogger @PruneStats mgr teleLogConfig
logger <- managed
$ L.withLogger (_logConfigLogger logCfg) $ logHandles
(concat
[ [ logFilterHandle (_logConfigFilter logCfg) ]
, txFailureHandler
,
[ logHandler monitorBackend
, logHandler p2pInfoBackend
, logHandler rtsBackend
, logHandler counterBackend
, logHandler endpointBackend
, logHandler newBlockBackend
, logHandler orphanedBlockBackend
-- , logHandler miningStatsBackend
, logHandler requestLogBackend
, logHandler queueStatsBackend
, logHandler traceBackend
, logHandler mempoolStatsBackend
, logHandler blockUpdateBackend
, logHandler dbCacheBackend
, logHandler dbStatsBackend
, logHandler p2pNodeStatsBackend
, logHandler topLevelStatusBackend
, logHandler pruneStatsBackend
]
]) baseBackend
liftIO $ f
$ maybe id (\x -> addLabel ("cluster", toText x)) (_logConfigClusterId logCfg)
$ addLabel ("chainwebVersion", sshow (_versionName v))
$ logger
where
teleLogConfig = _logConfigTelemetryBackend logCfg
mkTelemetryLogger
:: forall a b
. (Typeable a, ToJSON a)
=> HTTP.Manager
-> EnableConfig BackendConfig
-> (Backend (JsonLog a) -> IO b)
-> IO b
mkTelemetryLogger mgr = configureHandler
$ withJsonHandleBackend @a (sshow $ typeRep $ Proxy @a) mgr pkgInfoScopes
-- -------------------------------------------------------------------------- --
-- Service Date
newtype ServiceDate = ServiceDate Text
instance Show ServiceDate where
show (ServiceDate t) = "Service interval end: " <> T.unpack t
instance Exception ServiceDate where
fromException = asyncExceptionFromException
toException = asyncExceptionToException
withServiceDate
:: ChainwebVersion
-> (LogLevel -> Text -> IO ())
-> Maybe UTCTime
-> IO a
-> IO a
withServiceDate v lf msd inner = case msd of
Nothing -> do
inner
Just sd -> do
if _versionCode v == _versionCode mainnet || _versionCode v == _versionCode testnet04
then do
race (timer sd) inner >>= \case
Left () -> error "Service date thread terminated unexpectedly"
Right a -> return a
else do
inner
where
timer t = runForever lf "ServiceDate" $ do
now <- getCurrentTime
when (now >= t) $ do
lf Error shutdownMessage
throw $ ServiceDate shutdownMessage
let w = diffUTCTime t now
let micros = round $ w * 1_000_000
lf Warn warning
threadDelay $ min (10 * 60 * 1_000_000) micros
where
warning :: Text
warning = T.concat
[ "This version of chainweb node will stop working at " <> sshow t <> "."
, " Please upgrade to a new version before that date."
]
shutdownMessage :: Text
shutdownMessage = T.concat
[ "Shutting down. This version of chainweb was only valid until " <> sshow t <> "."
, " Please upgrade to a new version."
]
-- -------------------------------------------------------------------------- --
-- Encode Package Info into Log mesage scopes
pkgInfoScopes :: [(Text, Text)]
pkgInfoScopes =
[ ("revision", revision)
, ("branch", branch)
, ("compiler", compiler)
, ("optimisation", optimisation)
, ("architecture", arch)
, ("package", package)
]
-- -------------------------------------------------------------------------- --
-- main
mainInfo :: ProgramInfo ChainwebNodeConfiguration
mainInfo = programInfoValidate
"Chainweb Node"
pChainwebNodeConfiguration
defaultChainwebNodeConfiguration
validateChainwebNodeConfiguration
handles :: [Handler a] -> IO a -> IO a
handles = flip catches
main :: IO ()
main = do
installFatalSignalHandlers [ sigHUP, sigTERM, sigXCPU, sigXFSZ ]
checkRLimits
runWithPkgInfoConfiguration mainInfo pkgInfo $ \conf -> do
withVersion (_configChainwebVersion $ _nodeConfigChainweb conf) $ do
hSetBuffering stderr LineBuffering
withNodeLogger (_nodeConfigLog conf) (_nodeConfigChainweb conf) implicitVersion $ \logger -> do
logFunctionJson logger Info ProcessStarted
handles
[ Handler $ \(e :: SomeAsyncException) ->
logFunctionJson logger Info (ProcessDied $ show e) >> throwIO e
, Handler $ \(e :: SomeException) ->
logFunctionJson logger Error (ProcessDied $ show e) >> throwIO e
] $ do
kt <- mapM iso8601ParseM (_versionServiceDate implicitVersion)
withServiceDate (_configChainwebVersion (_nodeConfigChainweb conf)) (logFunctionText logger) kt $ void $
race (node conf logger) (gcRunner (logFunctionText logger))
where
gcRunner lf = runForever lf "GarbageCollect" $ do
performMajorGC
threadDelay (30 * 1_000_000)