Skip to content
This repository was archived by the owner on Nov 24, 2025. It is now read-only.

Commit 535798b

Browse files
committed
[jj-spr] initial version
Created using jj-spr 1.3.6-beta.1
2 parents 93788fe + 3c8081a commit 535798b

File tree

9 files changed

+345
-159
lines changed

9 files changed

+345
-159
lines changed

.github/workflows/applications.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ jobs:
389389
exe:evm-genesis \
390390
exe:tx-list \
391391
exe:run-nodes \
392+
exe:pact-replay \
392393
test:chainweb-tests \
393394
test:multi-node-network-tests \
394395
test:remote-tests \
@@ -435,6 +436,7 @@ jobs:
435436
cp $(cabal list-bin genconf) artifacts/chainweb
436437
cp $(cabal list-bin known-graphs) artifacts/chainweb
437438
cp $(cabal list-bin pact-diff) artifacts/chainweb
439+
cp $(cabal list-bin pact-replay) artifacts/chainweb
438440
cp $(cabal list-bin standalone-pruner) artifacts/chainweb
439441
cp $(cabal list-bin tx-list) artifacts/chainweb
440442
cp $(cabal list-bin run-nodes) artifacts/chainweb

cwtools/cwtools.cabal

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,40 @@ executable db-checksum
174174
, text
175175
, unordered-containers
176176

177+
-- Generate genesis headers.
178+
executable pact-replay
179+
import: warning-flags, debugging-flags
180+
default-language: Haskell2010
181+
ghc-options:
182+
-threaded
183+
-rtsopts
184+
"-with-rtsopts=-N -H1G -A64M"
185+
-Wno-x-partial -Wno-unrecognised-warning-flags
186+
hs-source-dirs:
187+
pact-replay
188+
main-is:
189+
PactReplay.hs
190+
build-depends:
191+
, chainweb
192+
, chainweb:chainweb-test-utils
193+
194+
, async
195+
, base
196+
, chainweb-storage
197+
, constraints
198+
, containers
199+
, filepath
200+
, lens
201+
, loglevel
202+
, optparse-applicative
203+
, resource-pool
204+
, resourcet
205+
, safe-exceptions
206+
, streaming
207+
, unordered-containers
208+
, text
209+
, yet-another-logger
210+
177211
-- Generate genesis headers.
178212
executable ea
179213
import: warning-flags, debugging-flags

cwtools/pact-replay/PactReplay.hs

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
{-# LANGUAGE ApplicativeDo #-}
2+
{-# LANGUAGE BangPatterns #-}
3+
{-# LANGUAGE FlexibleContexts #-}
4+
{-# LANGUAGE ImportQualifiedPost #-}
5+
{-# LANGUAGE LambdaCase #-}
6+
{-# LANGUAGE NumericUnderscores #-}
7+
{-# LANGUAGE OverloadedStrings #-}
8+
{-# LANGUAGE PartialTypeSignatures #-}
9+
{-# LANGUAGE RankNTypes #-}
10+
{-# LANGUAGE ScopedTypeVariables #-}
11+
{-# LANGUAGE TupleSections #-}
12+
{-# LANGUAGE TypeApplications #-}
13+
14+
module Main(main) where
15+
16+
import Chainweb.BlockHeader
17+
import Chainweb.BlockHeaderDB
18+
import Chainweb.BlockHeight
19+
import Chainweb.Core.Brief
20+
import Chainweb.Cut (cutHeaders, unsafeMkCut)
21+
import Chainweb.Cut.Create (limitCut)
22+
import Chainweb.CutDB (cutHashesTable, readHighestCutHeaders)
23+
import Chainweb.Logger
24+
import Chainweb.Pact.PactService qualified as PactService
25+
import Chainweb.Pact.Payload.PayloadStore.RocksDB qualified as Pact.Payload.PayloadStore.RocksDB
26+
import Chainweb.Pact.Types
27+
import Chainweb.Parent
28+
import Chainweb.PayloadProvider (blockHeaderToEvaluationCtx)
29+
import Chainweb.PayloadProvider.Pact
30+
import Chainweb.PayloadProvider.Pact.Genesis (genesisPayload)
31+
import Chainweb.Storage.Table.RocksDB (modernDefaultOptions, withRocksDb)
32+
import Chainweb.Time
33+
import Chainweb.TreeDB qualified as TreeDB
34+
import Chainweb.Utils
35+
import Chainweb.Version
36+
import Chainweb.Version.Registry
37+
import Chainweb.WebBlockHeaderDB
38+
import Control.Concurrent (threadDelay)
39+
import Control.Concurrent.Async (forConcurrently)
40+
import Control.Exception.Safe
41+
import Control.Lens
42+
import Control.Monad
43+
import Control.Monad.IO.Class
44+
import Control.Monad.Trans.Resource
45+
import Data.Constraint
46+
import Data.HashSet qualified as HS
47+
import Data.HashMap.Strict qualified as HM
48+
import Data.IORef
49+
import Data.List qualified as List
50+
import Data.LogMessage
51+
import Data.Proxy
52+
import Data.Text qualified as T
53+
import GHC.Stack
54+
import Options.Applicative
55+
import Streaming qualified as S
56+
import Streaming.Prelude qualified as S
57+
import System.FilePath ((</>))
58+
import System.Logger qualified as L
59+
import System.LogLevel
60+
import Text.Printf
61+
import Utils.Logging
62+
import Utils.Logging.Trace (Trace)
63+
64+
main :: IO ()
65+
main = join $
66+
execParser $ info
67+
(parser <**> helper)
68+
(fullDesc
69+
<> progDesc "Replay Pact blocks checking that we get the correct outputs"
70+
<> header "pact-replay")
71+
72+
getRocksDbDir :: HasCallStack => FilePath -> FilePath
73+
getRocksDbDir base = base </> "0" </> "rocksDb"
74+
75+
getPactDbDir :: HasCallStack => FilePath -> FilePath
76+
getPactDbDir base = base </> "0" </> "sqlite"
77+
78+
isPactChain :: HasVersion => ChainId -> Bool
79+
isPactChain cid = payloadProviderTypeForChain cid == PactProvider
80+
81+
parser :: Parser (IO ())
82+
parser = do
83+
version <- option (findKnownVersion =<< textReader)
84+
(long "chainweb-version"
85+
<> short 'v'
86+
<> help "network ID"
87+
<> metavar (T.unpack $
88+
"[" <> T.intercalate "," (getChainwebVersionName . _versionName <$> knownVersions) <> "]")
89+
)
90+
dbDir <- textOption (long "database-directory" <> help "chainweb database directory")
91+
maybeStart <- optional $ BlockHeight <$> textOption (long "start" <> help "lower block height bound for the replay")
92+
maybeEnd <- optional $ BlockHeight <$> textOption (long "end" <> help "upper block height bound for the replay")
93+
chains :: Dict HasVersion -> [ChainId] <-
94+
fmap const (jsonOption (long "chains"))
95+
<|> pure (\Dict -> filter isPactChain (HS.toList chainIds))
96+
let defaultLogConfig =
97+
-- default the threshold to "info" because the progress log is "info".
98+
L.defaultLogConfig & L.logConfigLogger . L.loggerConfigThreshold .~ L.Info
99+
logConfig <- parserOptionGroup "Logging" (($ defaultLogConfig) <$> L.pLogConfig)
100+
return $ withVersion version $ do
101+
-- we'd use a read-only db, but unfortunately we write to rocksdb even
102+
-- when it's not necessary sometimes during initialization, c.f.
103+
-- initBlockHeaderDb.
104+
withRocksDb (getRocksDbDir dbDir) modernDefaultOptions $ \rdb -> do
105+
L.withHandleBackend_ logText (logConfig ^. L.logConfigBackend) $ \backend -> do
106+
-- do not log tx failures and performance telemetry.
107+
let replayLogHandles = logHandles
108+
[ dropLogHandler (Proxy @PactTxFailureLog)
109+
, dropLogHandler (Proxy @(JsonLog Trace))
110+
] backend
111+
L.withLogger (logConfig ^. L.logConfigLogger) replayLogHandles $ \logger -> do
112+
let cutTable = cutHashesTable rdb
113+
let pdb = Pact.Payload.PayloadStore.RocksDB.newPayloadDb rdb
114+
let wbhdb = mkWebBlockHeaderDb rdb (tabulateChains (mkBlockHeaderDb rdb))
115+
116+
-- start by finding our upper bound
117+
highestCutInDb <- unsafeMkCut <$> readHighestCutHeaders (logFunctionText logger) wbhdb cutTable
118+
upperBoundCut <- case maybeEnd of
119+
Nothing -> return highestCutInDb
120+
Just upperBound -> limitCut wbhdb upperBound highestCutInDb
121+
122+
-- replay all chains concurrently
123+
failuresByChain <- forConcurrently (chains Dict `List.intersect` HM.keys (view cutHeaders upperBoundCut)) $ \cid -> runResourceT $ do
124+
let chainLogger = addLabel ("chain", brief cid) logger
125+
bhdb <- getWebBlockHeaderDb wbhdb cid
126+
127+
PactPayloadProvider _ serviceEnv <- withPactPayloadProvider
128+
cid rdb Nothing chainLogger Nothing mempty pdb
129+
(getPactDbDir dbDir)
130+
defaultPactServiceConfig
131+
(genesisPayload cid)
132+
133+
134+
let upperBoundBlock = upperBoundCut ^?! cutHeaders . ix cid
135+
let upper = HS.singleton (TreeDB.UpperBound $ view blockHash upperBoundBlock)
136+
137+
failureCountRef <- liftIO $ newIORef (0 :: Word)
138+
heightRef <- liftIO $ newIORef (view blockHeight upperBoundBlock)
139+
rateRef <- liftIO $ newIORef (0 :: Double)
140+
_ <- withAsyncR (logProgress chainLogger cid heightRef rateRef)
141+
142+
-- replay all blocks below our upper bound, in descending order.
143+
-- This works well for us because replay failures are
144+
-- most likely to happen nearest the tip of the chains.
145+
liftIO $ TreeDB.branchEntries bhdb Nothing Nothing Nothing Nothing mempty upper $ \blockStream -> do
146+
blockStream
147+
& S.takeWhile (\blk -> maybe True (\start -> view blockHeight blk >= start) maybeStart)
148+
& withParent
149+
150+
& S.mapM (\(h, ph) -> do
151+
152+
-- replay the block
153+
writeIORef heightRef (view blockHeight h)
154+
fmap (h,) $
155+
try @_ @SomeException $
156+
PactService.execReadOnlyReplay chainLogger serviceEnv
157+
(view blockPayloadHash h <$ blockHeaderToEvaluationCtx ph)
158+
)
159+
160+
-- calculate the rate we're replaying and send
161+
-- it to the progress logger thread, updating
162+
-- the rate every 500 blocks.
163+
-- The replay still functionally proceeds on a
164+
-- single block at a time, because it happens
165+
-- *before* the chunking in the pipeline.
166+
& S.chunksOf 500
167+
& mapsM_ (\blkChunk -> do
168+
startTime <- getCurrentTimeIntegral
169+
170+
count S.:> _ S.:> res <- blkChunk
171+
& S.mapM (\case
172+
(h, Left err) -> do
173+
atomicModifyIORef' failureCountRef (\n -> (succ n, ()))
174+
logFunctionText chainLogger Error $ "Error block: " <> brief h <> ": " <> sshow err
175+
return h
176+
(h, Right (Just err)) -> do
177+
atomicModifyIORef' failureCountRef (\n -> (succ n, ()))
178+
logFunctionText chainLogger Error $ "Invalid block " <> brief h <> ": " <> sshow err
179+
return h
180+
(h, Right Nothing) -> return h
181+
)
182+
& S.copy
183+
& S.last
184+
& S.length
185+
186+
endTime <- getCurrentTimeIntegral
187+
let !(TimeSpan (timeTaken :: Micros)) = (endTime `diff` startTime)
188+
let !rate :: Double = int count * 1_000_000 / int timeTaken
189+
190+
writeIORef rateRef rate
191+
192+
return res
193+
)
194+
195+
liftIO $ logFunctionText chainLogger Info $ "finished replaying chain " <> brief cid
196+
197+
liftIO $ readIORef failureCountRef
198+
let failureCount = sum failuresByChain
199+
when (failureCount > 0) $
200+
error $ sshow failureCount <> " blocks failed"
201+
where
202+
logProgress logger cid heightRef rateRef = forever $ do
203+
threadDelay 20_000_000
204+
height <- readIORef heightRef
205+
rate <- readIORef rateRef
206+
logFunctionText logger Info $
207+
"Chain " <> brief cid <>
208+
" rate " <> T.pack (printf "%.2f" rate) <> "/s"
209+
<> " at " <> brief height <> " (desc.)"
210+
211+
-- requires that the input is descending
212+
withParent :: Monad m => S.Stream (S.Of h) m r -> S.Stream (S.Of (h, Parent h)) m r
213+
withParent = \strm -> do
214+
S.lift (S.next strm) >>= \case
215+
Left r -> return r
216+
Right (bh, strm') -> go bh strm'
217+
where
218+
go bh strm = do
219+
S.lift (S.next strm) >>= \case
220+
Left r -> return r
221+
Right (bh', strm') -> do
222+
S.yield (bh, Parent bh')
223+
go bh' strm'
224+
225+
mapsM_ :: Monad m => (forall x. f x -> m x) -> S.Stream f m r -> m r
226+
mapsM_ f = go
227+
where
228+
go strm =
229+
S.inspect strm >>= \case
230+
Left r -> return r
231+
Right fstrm -> do
232+
strm' <- f fstrm
233+
go strm'

node/src/ChainwebNode.hs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -319,13 +319,7 @@ node conf logger = do
319319
rocksDbDir <- getRocksDbDir conf
320320
pactDbDir <- getPactDbDir conf
321321
dbBackupsDir <- getBackupsDir conf
322-
withRocksDb' <-
323-
if _configReadOnlyReplay cwConf
324-
then
325-
withReadOnlyRocksDb <$ logFunctionText logger Info "Opening RocksDB in read-only mode"
326-
else
327-
return withRocksDb
328-
withRocksDb' rocksDbDir modernDefaultOptions $ \rocksDb -> do
322+
withRocksDb rocksDbDir modernDefaultOptions $ \rocksDb -> do
329323
logFunctionText logger Info $ "opened rocksdb in directory " <> sshow rocksDbDir
330324
logFunctionText logger Debug $ "backup config: " <> sshow (_configBackup cwConf)
331325
withChainweb cwConf logger rocksDb pactDbDir dbBackupsDir $ \case
@@ -365,12 +359,11 @@ withNodeLogger logCfg chainwebCfg v f = runManaged $ do
365359
-- Base Backend
366360
baseBackend <- managed
367361
$ withBaseHandleBackend "ChainwebApp" mgr pkgInfoScopes (_logConfigBackend logCfg)
368-
362+
baseBackend
369363
-- we don't log tx failures in replay
370364
let !txFailureHandler =
371365
if isJust (_cutInitialCutFile (_configCuts chainwebCfg))
372366
|| isJust (_cutInitialBlockHeightLimit (_configCuts chainwebCfg))
373-
|| _configReadOnlyReplay chainwebCfg
374367
then [dropLogHandler (Proxy :: Proxy PactTxFailureLog)]
375368
else []
376369

src/Chainweb/BlockHeaderDB.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ module Chainweb.BlockHeaderDB
1919
, Configuration(..)
2020
, BlockHeaderDb
2121
, RankedBlockHeaderDb(..)
22-
, initBlockHeaderDb
2322
, closeBlockHeaderDb
23+
, initBlockHeaderDb
24+
, mkBlockHeaderDb
2425
, withBlockHeaderDb
2526

2627
-- * Misc

src/Chainweb/BlockHeaderDB/Internal.hs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ module Chainweb.BlockHeaderDB.Internal
5252
, BlockHeaderDb(..)
5353
, RankedBlockHeaderDb(..)
5454
, initBlockHeaderDb
55+
, mkBlockHeaderDb
5556
, closeBlockHeaderDb
5657
, withBlockHeaderDb
5758

@@ -274,26 +275,30 @@ dbAddChecked db e = unlessM (tableMember (_chainDbCas db) ek) dbAddCheckedIntern
274275
--
275276
initBlockHeaderDb :: HasVersion => Configuration -> IO BlockHeaderDb
276277
initBlockHeaderDb config = do
278+
let db = mkBlockHeaderDb (_configRocksDb config) (_chainId rootEntry)
277279
dbAddChecked db rootEntry
278280
return db
279281
where
280282
rootEntry = _configRoot config
281-
cid = _chainId rootEntry
282-
cidNs = T.encodeUtf8 (toText cid)
283283

284+
mkBlockHeaderDb :: HasVersion => RocksDb -> ChainId -> BlockHeaderDb
285+
mkBlockHeaderDb rdb cid = db
286+
where
284287
headerTable = newTable
285-
(_configRocksDb config)
288+
rdb
286289
(Codec (runPutS . encodeRankedBlockHeader) (runGetS decodeRankedBlockHeader))
287290
(Codec (runPutS . encodeRankedBlockHash) (runGetS decodeRankedBlockHash))
288291
["BlockHeader", cidNs, "header"]
289292

290293
rankTable = newTable
291-
(_configRocksDb config)
294+
rdb
292295
(Codec (runPutS . encodeBlockHeight) (runGetS decodeBlockHeight))
293296
(Codec (runPutS . encodeBlockHash) (runGetS decodeBlockHash))
294297
["BlockHeader", cidNs, "rank"]
295298

296-
!db = BlockHeaderDb cid
299+
cidNs = T.encodeUtf8 (toText cid)
300+
db = BlockHeaderDb
301+
cid
297302
implicitVersion
298303
headerTable
299304
rankTable

0 commit comments

Comments
 (0)