Skip to content

Free parking beacon sync capture replay #3531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions execution_chain/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type
noCommand
`import`
`import-rlp`
`capture-log`

RpcFlag* {.pure.} = enum
## RPC flags
Expand Down Expand Up @@ -345,14 +346,6 @@ type
defaultValue: 4'u64
name: "debug-persist-batch-size" .}: uint64

beaconSyncTargetFile* {.
hidden
desc: "Load a file containg an rlp-encoded object \"(Header,Hash32)\" " &
"to be used " &
"as the first target before any other request from the CL " &
"is accepted"
name: "debug-beacon-sync-target-file" .}: Option[InputFile]

rocksdbMaxOpenFiles {.
hidden
defaultValue: defaultMaxOpenFiles
Expand Down Expand Up @@ -510,6 +503,37 @@ type
defaultValueDesc: "\"jwt.hex\" in the data directory (see --data-dir)"
name: "jwt-secret" .}: Option[InputFile]

beaconSyncTraceFile* {.
separator: "\pBEACON SYNC OPTIONS:"
desc: "Enable tracer and write capture data to the argument file"
name: "beacon-sync-trace-file" .}: Option[OutFile]

beaconSyncTraceSessions* {.
defaultValue: 1
desc: "Run a trace for this many sessions " &
"(i.e. from activation to suspension)"
name: "beacon-sync-trace-sessions" .}: int

beaconSyncReplayFile* {.
desc: "Read from trace capture file for full replay"
name: "beacon-sync-replay-file" .}: Option[InputFile]

beaconSyncReplayNoisyFrom* {.
desc: "Extra replay logging starting with argument record number"
name: "beacon-sync-replay-noisy-from" .}: Option[uint]

beaconSyncReplayFakeImport* {.
desc: "Suppress block import (for test runs)"
defaultValue: false
name: "beacon-sync-replay-fake-import" .}: bool

beaconSyncTargetFile* {.
hidden
desc: "Load a file containg an rlp-encoded object " &
"\"(Header,Hash32)\" to be used as the first target before " &
"any other request from the CL is accepted"
name: "debug-beacon-sync-target-file" .}: Option[InputFile]

of `import`:
maxBlocks* {.
desc: "Maximum number of blocks to import"
Expand Down Expand Up @@ -567,6 +591,12 @@ type
desc: "One or more RLP encoded block(s) files"
name: "blocks-file" }: seq[InputFile]

of `capture-log`:
beaconSyncCaptureFile* {.
argument
desc: "Read from capture file for log output"
name: "beacon-sync-capture-file" .}: Option[InputFile]

func parseHexOrDec256(p: string): UInt256 {.raises: [ValueError].} =
if startsWith(p, "0x"):
parse(p, UInt256, 16)
Expand Down
25 changes: 23 additions & 2 deletions execution_chain/core/chain/header_chain_cache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ const
# Private debugging and print functions
# ------------------------------------------------------------------------------

import pkg/stew/byteutils

type ClMesg {.used.} = object
head: Header
fin: Hash32

func bnStr(w: BlockNumber): string =
"#" & $w

Expand Down Expand Up @@ -238,14 +244,18 @@ proc persistInfo(hc: HeaderChainRef) =

proc persistClear(hc: HeaderChainRef) =
## Clear persistent database
let w = hc.kvt.getInfo.valueOr: return
let w = hc.kvt.getInfo.valueOr:
trace "HeaderChain.persistClear", nHeaders="n/a", hc=hc.toStr
return
for bn in w.least .. w.last:
hc.kvt.delHeader(bn)
# Occasionally flush the current data
if (bn - w.least) mod MaxDeleteBatch == 0:
hc.kvt.persist()
hc.kvt.delInfo()
hc.kvt.persist()
trace "HeaderChain.persistClear", least=w.least.bnStr, last=w.last.bnStr,
nHeaders=(w.last - w.least + 1), hc=hc.toStr

# ------------------------------------------------------------------------------
# Private functions
Expand Down Expand Up @@ -283,6 +293,7 @@ proc tryFcParent(hc: HeaderChainRef; hdr: Header): HeaderChainMode =
if baseNum + 1 < hdr.number:
return collecting # inconclusive

trace "HeaderChain.tryFcParent: orhpaned", base=baseNum.bnStr, hc=hc.toStr
return orphan # maybe on the wrong branch

# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -325,8 +336,15 @@ proc headUpdateFromCL(hc: HeaderChainRef; h: Header; f: Hash32) =
metrics.set(nec_sync_dangling, h.number.int64)

# Inform client app about that a new session has started.
hc.notify()
hc.chain.pendingFCU = f
hc.notify()

when false:
# vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
trace "fcUpdateFromCL: Replay target", base=hc.chain.baseNumber.bnStr,
latest=hc.chain.latestNumber.bnStr, trg=h.bnStr, fin=f.short,
scrum=encodePayload(ClMesg(head: h, fin: f)).toHex
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

# For logging and metrics
hc.session.consHeadNum = h.number
Expand Down Expand Up @@ -552,6 +570,9 @@ proc put*(
hc.session.ante = rev[revTopInx]
metrics.set(nec_sync_dangling, hc.session.ante.number.int64)

trace "HeaderChain.put: saved headers", offset, revTopInx,
base=hc.chain.baseNumber.bnStr, hc=hc.toStr

# Save updates. persist to DB
hc.persistInfo()

Expand Down
36 changes: 34 additions & 2 deletions execution_chain/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import
../execution_chain/compile_info

import
std/[os, osproc, strutils, net, options],
std/[os, osproc, net, options, streams],
chronicles,
eth/net/nat,
metrics,
Expand All @@ -28,6 +28,7 @@ import
./db/core_db/persistent,
./db/storage_types,
./sync/wire_protocol,
./sync/beacon/replay,
./common/chain_config_hash,
./portal/portal

Expand Down Expand Up @@ -120,6 +121,24 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
nimbus.beaconSyncRef = BeaconSyncRef.init(
nimbus.ethNode, nimbus.fc, conf.maxPeers)

# Optional tracer
if conf.beaconSyncTraceFile.isSome():
nimbus.beaconSyncRef.tracerInit(
conf.beaconSyncTraceFile.unsafeGet.string, conf.beaconSyncTraceSessions)

# Optional replay
if conf.beaconSyncReplayFile.isSome():
if conf.beaconSyncTraceFile.isSome():
fatal "Cannot have both "&
"--beacon-sync-trace-file and --beacon-sync-replay-file"
if conf.beaconSyncTargetFile.isSome():
fatal "Cannot have both "&
"--beacon-sync-target-file and --beacon-sync-replay-file"
nimbus.beaconSyncRef.replayInit(
conf.beaconSyncReplayFile.unsafeGet.string,
conf.beaconSyncReplayNoisyFrom.get(high uint),
conf.beaconSyncReplayFakeImport)

# Optional for pre-setting the sync target (i.e. debugging)
if conf.beaconSyncTargetFile.isSome():
nimbus.beaconSyncRef.targetInit conf.beaconSyncTargetFile.unsafeGet.string
Expand Down Expand Up @@ -198,6 +217,18 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
version = FullVersionStr,
conf

case conf.cmd
of NimbusCmd.`capture-log`:
if conf.beaconSyncCaptureFile.isNone():
fatal "Capture file is required, use --beacon-sync-capture-file"
quit(QuitFailure)
let st = conf.beaconSyncCaptureFile.unsafeGet.string.newFileStream fmRead
ReplayReaderRef.init(st).captureLog(proc(): bool =
nimbus.state == NimbusState.Stopping)
return
else:
discard

# Trusted setup is needed for processing Cancun+ blocks
# If user not specify the trusted setup, baked in
# trusted setup will be loaded, lazily.
Expand Down Expand Up @@ -276,7 +307,8 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
setupP2P(nimbus, conf, com)
setupRpc(nimbus, conf, com)

if conf.maxPeers > 0 and conf.engineApiServerEnabled():
if (conf.maxPeers > 0 and conf.engineApiServerEnabled()) or
conf.beaconSyncReplayFile.isSome():
# Not starting syncer if there is definitely no way to run it. This
# avoids polling (i.e. waiting for instructions) and some logging.
if not nimbus.beaconSyncRef.start():
Expand Down
99 changes: 88 additions & 11 deletions execution_chain/sync/beacon.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,76 @@ import
pkg/stew/[interval_set, sorted_set],
../core/chain,
../networking/p2p,
./beacon/[worker, worker_desc],
./beacon/worker/blocks/blocks_fetch,
./beacon/worker/blocks/blocks_import,
./beacon/worker/headers/headers_fetch,
./beacon/worker/update,
./beacon/[trace, replay, worker, worker_desc],
./[sync_desc, sync_sched, wire_protocol]


logScope:
topics = "beacon sync"

type
BeaconSyncRef* = RunnerSyncRef[BeaconCtxData,BeaconBuddyData]

# ------------------------------------------------------------------------------
# Interceptable handlers
# ------------------------------------------------------------------------------

proc schedDaemonCB(
ctx: BeaconCtxRef;
): Future[Duration]
{.async: (raises: []).} =
return worker.runDaemon(ctx, "RunDaemon") # async/template

proc schedStartCB(buddy: BeaconBuddyRef): bool =
return worker.start(buddy, "RunStart")

proc schedStopCB(buddy: BeaconBuddyRef) =
worker.stop(buddy, "RunStop")

proc schedPoolCB(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
return worker.runPool(buddy, last, laps, "RunPool")

proc schedPeerCB(
buddy: BeaconBuddyRef;
): Future[Duration]
{.async: (raises: []).} =
return worker.runPeer(buddy, "RunPeer") # async/template

proc noOpBuddy(buddy: BeaconBuddyRef) = discard

proc noOpCtx(ctx: BeaconCtxRef; maybePeer: Opt[BeaconBuddyRef]) = discard

# ------------------------------------------------------------------------------
# Virtual methods/interface, `mixin` functions
# ------------------------------------------------------------------------------

proc runSetup(ctx: BeaconCtxRef): bool =
worker.setup(ctx, "RunSetup")
return worker.setup(ctx, "RunSetup")

proc runRelease(ctx: BeaconCtxRef) =
worker.release(ctx, "RunRelease")

proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
return worker.runDaemon(ctx, "RunDaemon")

proc runTicker(ctx: BeaconCtxRef) =
worker.runTicker(ctx, "RunTicker")


proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
return await ctx.handler.schedDaemon(ctx)

proc runStart(buddy: BeaconBuddyRef): bool =
worker.start(buddy, "RunStart")
return buddy.ctx.handler.schedStart(buddy)

proc runStop(buddy: BeaconBuddyRef) =
worker.stop(buddy, "RunStop")
buddy.ctx.handler.schedStop(buddy)

proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
worker.runPool(buddy, last, laps, "RunPool")
return buddy.ctx.handler.schedPool(buddy, last, laps)

proc runPeer(buddy: BeaconBuddyRef): Future[Duration] {.async: (raises: []).} =
return worker.runPeer(buddy, "RunPeer")
return await buddy.ctx.handler.schedPeer(buddy)

# ------------------------------------------------------------------------------
# Public functions
Expand All @@ -66,17 +99,61 @@ proc init*(
var desc = T()
desc.initSync(ethNode, maxPeers)
desc.ctx.pool.chain = chain

# Set up handlers so they can be overlayed
desc.ctx.pool.handlers = BeaconHandlersRef(
version: 0,
activate: updateActivateCB,
suspend: updateSuspendCB,
schedDaemon: schedDaemonCB,
schedStart: schedStartCB,
schedStop: schedStopCB,
schedPool: schedPoolCB,
schedPeer: schedPeerCB,
getBlockHeaders: getBlockHeadersCB,
syncBlockHeaders: noOpBuddy,
getBlockBodies: getBlockBodiesCB,
syncBlockBodies: noOpBuddy,
importBlock: importBlockCB,
syncImportBlock: noOpCtx)

desc

proc tracerInit*(desc: BeaconSyncRef; outFile: string, nSessions: int) =
## Set up tracer (not be called when replay is enabled)
if not desc.ctx.traceSetup(outFile, nSessions):
fatal "Cannot set up trace handlers -- STOP", fileName=outFile, nSessions
quit(QuitFailure)

proc replayInit*(
desc: BeaconSyncRef;
inFile: string;
startNoisy = high(uint);
fakeImport = false;
) =
## Set up replay (not be called when trace is enabled)
if not desc.ctx.replaySetup(inFile, startNoisy):
fatal "Cannot set up replay handlers -- STOP", fileName=inFile
quit(QuitFailure)

proc targetInit*(desc: BeaconSyncRef; rlpFile: string) =
## Set up inital sprint (intended for debugging)
doAssert desc.ctx.handler.version == 0
desc.ctx.initalTargetFromFile(rlpFile, "targetInit").isOkOr:
raiseAssert error

proc start*(desc: BeaconSyncRef): bool =
desc.startSync()
if desc.startSync():
desc.ctx.traceStart()
desc.ctx.replayStart()
return true
# false

proc stop*(desc: BeaconSyncRef) {.async.} =
desc.ctx.traceStop()
desc.ctx.traceRelease()
desc.ctx.replayStop()
desc.ctx.replayRelease()
await desc.stopSync()

# ------------------------------------------------------------------------------
Expand Down
26 changes: 26 additions & 0 deletions execution_chain/sync/beacon/replay.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Nimbus
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.

## Replay environment

{.push raises:[].}

import
./replay/replay_reader/reader_init,
./replay/[replay_desc, replay_reader, replay_setup, replay_start_stop]

export
ReplayReaderRef,
captureLog,
init,
replay_setup,
replay_start_stop

# End
Loading
Loading