Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions beacon_chain/consensus_object_pools/envelope_quarantine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func popOrphan*(
func delOrphan*(self: var EnvelopeQuarantine, blck: gloas.SignedBeaconBlock) =
self.orphans.del(blck.root)

iterator peekMissing*(self: EnvelopeQuarantine): Eth2Digest =
for v in self.missing:
yield v

func cleanupOrphans*(self: var EnvelopeQuarantine, finalizedSlot: Slot) =
var toDel: seq[Eth2Digest]

Expand Down
10 changes: 9 additions & 1 deletion beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,14 @@ proc storePayload(

ok()

proc addPayload*(
self: ref BlockProcessor,
blck: gloas.SignedBeaconBlock,
envelope: gloas.SignedExecutionPayloadEnvelope,
sidecarsOpt: Opt[gloas.DataColumnSidecars],
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
await self.storePayload(blck, envelope, sidecarsOpt)

proc enqueuePayload*(
self: ref BlockProcessor,
blck: gloas.SignedBeaconBlock,
Expand All @@ -912,7 +920,7 @@ proc enqueuePayload*(
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
debugGloasComment("backfilling")

discard self.storePayload(blck, envelope, sidecarsOpt)
discard self.addPayload(blck, envelope, sidecarsOpt)

proc enqueuePayload*(self: ref BlockProcessor, blck: gloas.SignedBeaconBlock) =
## Enqueue payload processing by block that is a valid block.
Expand Down
61 changes: 58 additions & 3 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,59 @@ proc initFullNode(
rmanBlockLoader = proc(
blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] =
dag.getForkedBlock(blockRoot)
rmanEnvelopeVerifier = proc(signedEnvelope: gloas.SignedExecutionPayloadEnvelope):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
## Envelope verifier contains the same logic as block_processor
## enqueuePayload() except when the valid block or any sidecars is
## missing, we will return ok() as it is not any types of VerifierError.
## Therefore, the call is discarded silently.
template blockRoot(): auto = signedEnvelope.message.beacon_block_root

let
blockRef = dag.getBlockRef(blockRoot).valueOr:
return ok()
blck =
block:
let forkedBlock = dag.getForkedBlock(blockRef.bid).valueOr:
# We have checked that the block exists in the chain. There might be
# issues in reading the database or data in the memory is broken.
# Since no result is returned, we log for investigation.
debug "Enqueue payload from envelope. Block is missing in DB",
bid = shortLog(blockRef.bid)
return ok()
withBlck(forkedBlock):
when consensusFork >= ConsensusFork.Gloas:
forkyBlck.asSigned()
else:
# Incorrect fork which shouldn't be happening.
debug "Enqueue payload from envelope. Block is in incorrect fork",
bid = shortLog(blockRef.bid)
return ok()
envelope = envelopeQuarantine[].popOrphan(blck).valueOr:
# At this point, the signedEnvelope is from a different builder since
# the block should be the source of truth. We should notify receiving
# bad value from the peer.
envelopeQuarantine[].addMissing(blockRoot)
return err(VerifierError.Invalid)
sidecarsOpt =
block:
template bid(): auto =
blck.message.body.signed_execution_payload_bid
let sidecarsOpt =
if bid.message.blob_kzg_commitments.len() == 0:
Opt.some(default(gloas.DataColumnSidecars))
else:
gloasColumnQuarantine[].popSidecars(blockRoot)
if sidecarsOpt.isNone():
# As sidecars are missing, put envelope back to quarantine.
consensusManager.quarantine[].addSidecarless(blck)
envelopeQuarantine[].addOrphan(envelope)
return ok()
sidecarsOpt
await blockProcessor.addPayload(blck, envelope, sidecarsOpt)
rmanEnvelopeLoader = proc(blockRoot: Eth2Digest):
Opt[gloas.TrustedSignedExecutionPayloadEnvelope] =
dag.db.getExecutionPayloadEnvelope(blockRoot)
rmanBlobLoader = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] =
var blob_sidecar = BlobSidecar.new()
Expand Down Expand Up @@ -756,8 +809,10 @@ proc initFullNode(
requestManager = RequestManager.init(
node.network, supernode, custodyColumns,
dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader)
quarantine, envelopeQuarantine, blobQuarantine,
dataColumnQuarantine, rmanBlockVerifier, rmanBlockLoader,
rmanEnvelopeVerifier, rmanEnvelopeLoader,
rmanBlobLoader, rmanDataColumnLoader)
validatorCustody = ValidatorCustodyRef.init(node.network, dag, custodyColumns,
dataColumnQuarantine)

Expand Down Expand Up @@ -2200,7 +2255,7 @@ proc onSlotStart(node: BeaconNode, wallTime: BeaconTime,
node.consensusManager[].updateHead(wallSlot)

await node.handleValidatorDuties(lastSlot, wallSlot)
node.requestManager.switchToColumnLoop()
node.requestManager.upgradeLoops()
await onSlotEnd(node, wallSlot)

# https://github.com/ethereum/builder-specs/blob/v0.4.0/specs/bellatrix/validator.md#registration-dissemination
Expand Down
160 changes: 157 additions & 3 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import ssz_serialization/types
import
../spec/[forks, network, peerdas_helpers],
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
../consensus_object_pools/blob_quarantine,
../consensus_object_pools/[
blob_quarantine, block_quarantine, envelope_quarantine],
"."/sync_protocol, "."/sync_manager,
../gossip_processing/block_processor

Expand Down Expand Up @@ -52,10 +52,18 @@ type
maybeFinalized: bool
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}

EnvelopeVerifierFn = proc(
signedEnvelope: gloas.SignedExecutionPayloadEnvelope,
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}

BlockLoaderFn = proc(
blockRoot: Eth2Digest
): Opt[ForkedTrustedSignedBeaconBlock] {.gcsafe, raises: [].}

EnvelopeLoaderFn = proc(
blockRoot: Eth2Digest,
): Opt[gloas.TrustedSignedExecutionPayloadEnvelope] {.gcsafe, raises: [].}

BlobLoaderFn = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].}

Expand All @@ -80,13 +88,17 @@ type
getBeaconTime: GetBeaconTimeFn
inhibit: InhibitFn
quarantine: ref Quarantine
envelopeQuarantine: ref EnvelopeQuarantine
blobQuarantine: ref BlobQuarantine
dataColumnQuarantine: ref ColumnQuarantine
blockVerifier: BlockVerifierFn
blockLoader: BlockLoaderFn
envelopeVerifier: EnvelopeVerifierFn
envelopeLoader: EnvelopeLoaderFn
blobLoader: BlobLoaderFn
dataColumnLoader: DataColumnLoaderFn
blockLoopFuture: Future[void].Raising([CancelledError])
envelopeLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void].Raising([CancelledError])
dataColumnLoopFuture: Future[void].Raising([CancelledError])

Expand All @@ -103,10 +115,13 @@ func init*(T: type RequestManager, network: Eth2Node,
getBeaconTime: GetBeaconTimeFn,
inhibit: InhibitFn,
quarantine: ref Quarantine,
envelopeQuarantine: ref EnvelopeQuarantine,
blobQuarantine: ref BlobQuarantine,
dataColumnQuarantine: ref ColumnQuarantine,
blockVerifier: BlockVerifierFn,
blockLoader: BlockLoaderFn = nil,
envelopeVerifier: EnvelopeVerifierFn,
envelopeLoader: EnvelopeLoaderFn,
blobLoader: BlobLoaderFn = nil,
dataColumnLoader: DataColumnLoaderFn = nil): RequestManager =
RequestManager(
Expand All @@ -116,10 +131,13 @@ func init*(T: type RequestManager, network: Eth2Node,
getBeaconTime: getBeaconTime,
inhibit: inhibit,
quarantine: quarantine,
envelopeQuarantine: envelopeQuarantine,
blobQuarantine: blobQuarantine,
dataColumnQuarantine: dataColumnQuarantine,
blockVerifier: blockVerifier,
blockLoader: blockLoader,
envelopeVerifier: envelopeVerifier,
envelopeLoader: envelopeLoader,
blobLoader: blobLoader,
dataColumnLoader: dataColumnLoader)

Expand All @@ -137,6 +155,21 @@ func checkResponse(roots: openArray[Eth2Digest],
checks.del(res)
true

func checkResponse(
roots: openArray[Eth2Digest],
envelopes: openArray[ref SignedExecutionPayloadEnvelope],
): bool =
## Ensure there is the requested envelope as per each root.
if len(envelopes) > len(roots):
return false
var checks = roots.toHashSet()
for envelope in envelopes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be useful to make envelopes a HashSet? how frequently can rman for enevelopes get triggered for missing envelopes? also mostly depends on the max missing envelopes

if envelope[].message.beacon_block_root in checks:
checks.excl(envelope[].message.beacon_block_root)
else:
return false
true

func cmpColumnIndex(x: ColumnIndex, y: ref fulu.DataColumnSidecar): int =
cmp(x, y[].index)

Expand Down Expand Up @@ -260,6 +293,68 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async:
if not(isNil(peer)):
rman.network.peerPool.release(peer)

proc fetchEnvelopesFromNetwork(self: RequestManager, roots: seq[Eth2Digest])
{.async: (raises: [CancelledError]).} =
var peer: Peer

try:
peer = await self.network.peerPool.acquire()
debug "Requesting envelopes by root",
peer = peer, envelopes = shortLog(roots),
peer_score = peer.getScore()

let envelopes = await executionPayloadEnvelopesByRoot(
peer, BlockRootsList roots)

if envelopes.isOk:
var uenvelopes = envelopes.get().asSeq()
if checkResponse(roots, uenvelopes):
var gotGoodEnvelope = false

for envelope in uenvelopes:
self.envelopeQuarantine[].addOrphan(envelope[])
let res = await self.envelopeVerifier(envelope[])

# Envelope is marked as missing when we got a valid block. As in
# Gloas, both valid block and envelope are required in order to
# proceed to the next slot. So in theory we should only be able to
# notice at most one missing envelope per slot.
#
# If there is a good way to find missing envelopes other than the
# head, the response, which may contains 2 or more envelopes, may
# cause verifier error as the order matters.
#
# TODO improve/investigate way of figuring out missing envelope
# efficiently (across different slots).
#
# TODO verify multiple envelopes in the chain's order.
debugGloasComment("as comment above")
if res.isErr():
debug "Received invalid envelope",
peer = peer, envelopes = shortLog(roots)
debugGloasComment("update score when processing in order")
return
else:
gotGoodEnvelope = true

if gotGoodEnvelope:
debug "Request manager got good envelope",
peer = peer, envelopes = shortLog(roots), uenvelopes = len(uenvelopes)
peer.updateScore(PeerScoreGoodValues)

else:
debug "Mismatching response to envelopes by root",
peer = peer, envelopes = shortLog(roots), uenvelopes = len(uenvelopes)
peer.updateScore(PeerScoreBadResponse)
else:
debug "Envelopes by root request failed",
peer = peer, envelopes = shortLog(roots), err = envelopes.error()
peer.updateScore(PeerScoreNoValues)

finally:
if not(isNil(peer)):
self.network.peerPool.release(peer)

func cmpSidecarIndexes(x, y: ref BlobSidecar | ref fulu.DataColumnSidecar): int =
cmp(x[].index, y[].index)

Expand Down Expand Up @@ -505,6 +600,59 @@ proc requestManagerBlockLoop(
debug "Request manager block tick", blocks = shortLog(blockRoots),
sync_speed = speed(start, finish)

proc requestManagerEnvelopeLoop(self: RequestManager)
{.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
await sleepAsync(POLL_INTERVAL)

if self.inhibit():
continue

let missingBlockRoots = self.envelopeQuarantine[].peekMissing().toSeq()
if missingBlockRoots.len() == 0:
continue

var blockRoots: seq[Eth2Digest]
if self.envelopeLoader == nil:
blockRoots = missingBlockRoots
else:
var verifiers:
seq[Future[Result[void, VerifierError]].Raising([CancelledError])]
for blockRoot in missingBlockRoots:
let envelope = self.envelopeLoader(blockRoot).valueOr:
blockRoots.add blockRoot
continue
debug "Loaded orphaned envelope from storage", blockRoot
verifiers.add self.envelopeVerifier(envelope.asSigned())
try:
await allFutures(verifiers)
except CancelledError as exc:
let futs = verifiers.mapIt(it.cancelAndWait())
await noCancel allFutures(futs)
raise exc

if blockRoots.len() == 0:
continue

debug "Requesting detected missing envelopes", envelopes = shortLog(blockRoots)
let start = SyncMoment.now(0)

var workers:
array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]

for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = self.fetchEnvelopesFromNetwork(blockRoots)

await allFutures(workers)

let finish = SyncMoment.now(lenu64(blockRoots))

debug "Request manager envelope tick",
envelopes = shortLog(blockRoots),
sync_speed = speed(start, finish)

proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
let
wallTime = rman.getBeaconTime()
Expand Down Expand Up @@ -741,7 +889,7 @@ proc start*(rman: var RequestManager) =
rman.blockLoopFuture = rman.requestManagerBlockLoop()
rman.blobLoopFuture = rman.requestManagerBlobLoop()

proc switchToColumnLoop*(rman: var RequestManager) =
proc upgradeLoops*(rman: var RequestManager) =
let currentEpoch =
rman.getBeaconTime().slotOrZero(rman.network.cfg.timeParams).epoch()

Expand All @@ -753,10 +901,16 @@ proc switchToColumnLoop*(rman: var RequestManager) =
rman.dataColumnLoopFuture =
rman.requestManagerDataColumnLoop()

if currentEpoch >= rman.network.cfg.GLOAS_FORK_EPOCH and
isNil(rman.envelopeLoopFuture):
rman.envelopeLoopFuture = rman.requestManagerEnvelopeLoop()

proc stop*(rman: RequestManager) =
## Stop Request Manager's loop.
if not(isNil(rman.blockLoopFuture)):
rman.blockLoopFuture.cancelSoon()
if not(isNil(rman.envelopeLoopFuture)):
rman.envelopeLoopFuture.cancelSoon()
if not(isNil(rman.blobLoopFuture)):
rman.blobLoopFuture.cancelSoon()
if not(isNil(rman.dataColumnLoopFuture)):
Expand Down
Loading