Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions beacon_chain/consensus_object_pools/envelope_quarantine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func popOrphan*(
func delOrphan*(self: var EnvelopeQuarantine, blck: gloas.SignedBeaconBlock) =
self.orphans.del(blck.root)

iterator peekMissing*(self: EnvelopeQuarantine): Eth2Digest =
for v in self.missing:
yield v

func cleanupOrphans*(self: var EnvelopeQuarantine, finalizedSlot: Slot) =
var toDel: seq[Eth2Digest]

Expand Down
10 changes: 9 additions & 1 deletion beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,14 @@ proc storePayload(

ok()

proc addPayload*(
self: ref BlockProcessor,
blck: gloas.SignedBeaconBlock,
envelope: gloas.SignedExecutionPayloadEnvelope,
sidecarsOpt: Opt[gloas.DataColumnSidecars],
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
await self.storePayload(blck, envelope, sidecarsOpt)

proc enqueuePayload*(
self: ref BlockProcessor,
blck: gloas.SignedBeaconBlock,
Expand All @@ -912,7 +920,7 @@ proc enqueuePayload*(
if blck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
debugGloasComment("backfilling")

discard self.storePayload(blck, envelope, sidecarsOpt)
discard self.addPayload(blck, envelope, sidecarsOpt)

proc enqueuePayload*(self: ref BlockProcessor, blck: gloas.SignedBeaconBlock) =
## Enqueue payload processing by block that is a valid block.
Expand Down
59 changes: 57 additions & 2 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,59 @@ proc initFullNode(
rmanBlockLoader = proc(
blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] =
dag.getForkedBlock(blockRoot)
rmanEnvelopeVerifier = proc(signedEnvelope: gloas.SignedExecutionPayloadEnvelope):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
## Envelope verifier contains the same logic as block_processor
## enqueuePayload() except when the valid block or any sidecars is
## missing, we will return ok() as it is not any types of VerifierError.
## Therefore, the call is discarded silently.
template blockRoot(): auto = signedEnvelope.message.beacon_block_root

let
blockRef = dag.getBlockRef(blockRoot).valueOr:
return ok()
blck =
block:
let forkedBlock = dag.getForkedBlock(blockRef.bid).valueOr:
# We have checked that the block exists in the chain. There might be
# issues in reading the database or data in the memory is broken.
# Since no result is returned, we log for investigation.
debug "Enqueue payload from envelope. Block is missing in DB",
bid = shortLog(blockRef.bid)
return ok()
withBlck(forkedBlock):
when consensusFork >= ConsensusFork.Gloas:
forkyBlck.asSigned()
else:
# Incorrect fork which shouldn't be happening.
debug "Enqueue payload from envelope. Block is in incorrect fork",
bid = shortLog(blockRef.bid)
return ok()
envelope = envelopeQuarantine[].popOrphan(blck).valueOr:
# At this point, the signedEnvelope is from a different builder since
# the block should be the source of truth. We should notify receiving
# bad value from the peer.
envelopeQuarantine[].addMissing(blockRoot)
return err(VerifierError.Invalid)
sidecarsOpt =
block:
template bid(): auto =
blck.message.body.signed_execution_payload_bid
let sidecarsOpt =
if bid.message.blob_kzg_commitments.len() == 0:
Opt.some(default(gloas.DataColumnSidecars))
else:
gloasColumnQuarantine[].popSidecars(blockRoot)
if sidecarsOpt.isNone():
# As sidecars are missing, put envelope back to quarantine.
consensusManager.quarantine[].addSidecarless(blck)
envelopeQuarantine[].addOrphan(envelope)
return ok()
sidecarsOpt
await blockProcessor.addPayload(blck, envelope, sidecarsOpt)
rmanEnvelopeLoader = proc(blockRoot: Eth2Digest):
Opt[gloas.TrustedSignedExecutionPayloadEnvelope] =
dag.db.getExecutionPayloadEnvelope(blockRoot)
rmanBlobLoader = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] =
var blob_sidecar = BlobSidecar.new()
Expand Down Expand Up @@ -756,8 +809,10 @@ proc initFullNode(
requestManager = RequestManager.init(
node.network, supernode, custodyColumns,
dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader)
quarantine, envelopeQuarantine, blobQuarantine,
dataColumnQuarantine, rmanBlockVerifier, rmanBlockLoader,
rmanEnvelopeVerifier, rmanEnvelopeLoader,
rmanBlobLoader, rmanDataColumnLoader)
validatorCustody = ValidatorCustodyRef.init(node.network, dag, custodyColumns,
dataColumnQuarantine)

Expand Down
158 changes: 156 additions & 2 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import ssz_serialization/types
import
../spec/[forks, network, peerdas_helpers],
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
../consensus_object_pools/blob_quarantine,
../consensus_object_pools/[
blob_quarantine, block_quarantine, envelope_quarantine],
"."/sync_protocol, "."/sync_manager,
../gossip_processing/block_processor

Expand Down Expand Up @@ -52,10 +52,18 @@ type
maybeFinalized: bool
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}

EnvelopeVerifierFn = proc(
signedEnvelope: gloas.SignedExecutionPayloadEnvelope,
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}

BlockLoaderFn = proc(
blockRoot: Eth2Digest
): Opt[ForkedTrustedSignedBeaconBlock] {.gcsafe, raises: [].}

EnvelopeLoaderFn = proc(
blockRoot: Eth2Digest,
): Opt[gloas.TrustedSignedExecutionPayloadEnvelope] {.gcsafe, raises: [].}

BlobLoaderFn = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].}

Expand All @@ -80,13 +88,17 @@ type
getBeaconTime: GetBeaconTimeFn
inhibit: InhibitFn
quarantine: ref Quarantine
envelopeQuarantine: ref EnvelopeQuarantine
blobQuarantine: ref BlobQuarantine
dataColumnQuarantine: ref ColumnQuarantine
blockVerifier: BlockVerifierFn
blockLoader: BlockLoaderFn
envelopeVerifier: EnvelopeVerifierFn
envelopeLoader: EnvelopeLoaderFn
blobLoader: BlobLoaderFn
dataColumnLoader: DataColumnLoaderFn
blockLoopFuture: Future[void].Raising([CancelledError])
envelopeLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void].Raising([CancelledError])
dataColumnLoopFuture: Future[void].Raising([CancelledError])

Expand All @@ -103,10 +115,13 @@ func init*(T: type RequestManager, network: Eth2Node,
getBeaconTime: GetBeaconTimeFn,
inhibit: InhibitFn,
quarantine: ref Quarantine,
envelopeQuarantine: ref EnvelopeQuarantine,
blobQuarantine: ref BlobQuarantine,
dataColumnQuarantine: ref ColumnQuarantine,
blockVerifier: BlockVerifierFn,
blockLoader: BlockLoaderFn = nil,
envelopeVerifier: EnvelopeVerifierFn,
envelopeLoader: EnvelopeLoaderFn,
blobLoader: BlobLoaderFn = nil,
dataColumnLoader: DataColumnLoaderFn = nil): RequestManager =
RequestManager(
Expand All @@ -116,10 +131,13 @@ func init*(T: type RequestManager, network: Eth2Node,
getBeaconTime: getBeaconTime,
inhibit: inhibit,
quarantine: quarantine,
envelopeQuarantine: envelopeQuarantine,
blobQuarantine: blobQuarantine,
dataColumnQuarantine: dataColumnQuarantine,
blockVerifier: blockVerifier,
blockLoader: blockLoader,
envelopeVerifier: envelopeVerifier,
envelopeLoader: envelopeLoader,
blobLoader: blobLoader,
dataColumnLoader: dataColumnLoader)

Expand All @@ -137,6 +155,22 @@ func checkResponse(roots: openArray[Eth2Digest],
checks.del(res)
true

func checkResponse(
roots: openArray[Eth2Digest],
envelopes: openArray[ref SignedExecutionPayloadEnvelope],
): bool =
## Ensure there is the requested envelope as per each root.
var checks = @roots
if len(envelopes) > len(roots):
return false
for envelope in envelopes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be useful to make envelopes a HashSet? how frequently can rman for enevelopes get triggered for missing envelopes? also mostly depends on the max missing envelopes

let res = checks.find(envelope[].message.beacon_block_root)
if res == -1:
return false
else:
checks.del(res)
true

func cmpColumnIndex(x: ColumnIndex, y: ref fulu.DataColumnSidecar): int =
cmp(x, y[].index)

Expand Down Expand Up @@ -260,6 +294,68 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async:
if not(isNil(peer)):
rman.network.peerPool.release(peer)

proc fetchEnvelopesFromNetwork(self: RequestManager, roots: seq[Eth2Digest])
{.async: (raises: [CancelledError]).} =
var peer: Peer

try:
peer = await self.network.peerPool.acquire()
debug "Requesting envelopes by root",
peer = peer, envelopes = shortLog(roots),
peer_score = peer.getScore()

let envelopes = await executionPayloadEnvelopesByRoot(
peer, BlockRootsList roots)

if envelopes.isOk:
var uenvelopes = envelopes.get().asSeq()
if checkResponse(roots, uenvelopes):
var gotGoodEnvelope = false

for envelope in uenvelopes:
self.envelopeQuarantine[].addOrphan(envelope[])
let res = await self.envelopeVerifier(envelope[])

# Envelope is marked as missing when we got a valid block. As in
# Gloas, both valid block and envelope are required in order to
# proceed to the next slot. So in theory we should only be able to
# notice at most one missing envelope per slot.
#
# If there is a good way to find missing envelopes other than the
# head, the response, which may contains 2 or more envelopes, may
# cause verifier error as the order matters.
#
# TODO improve/investigate way of figuring out missing envelope
# efficiently (across different slots).
#
# TODO verify multiple envelopes in the chain's order.
debugGloasComment("as comment above")
if res.isErr():
debug "Received invalid envelope",
peer = peer, envelopes = shortLog(roots)
debugGloasComment("update score when processing in order")
return
else:
gotGoodEnvelope = true

if gotGoodEnvelope:
debug "Request manager got good envelope",
peer = peer, envelopes = shortLog(roots), uenvelopes = len(uenvelopes)
peer.updateScore(PeerScoreGoodValues)

else:
debug "Mismatching response to envelopes by root",
peer = peer, envelopes = shortLog(roots), uenvelopes = len(uenvelopes)
peer.updateScore(PeerScoreBadResponse)
else:
debug "Envelopes by root request failed",
peer = peer, envelopes = shortLog(roots), err = envelopes.error()
peer.updateScore(PeerScoreNoValues)

finally:
if not(isNil(peer)):
self.network.peerPool.release(peer)

func cmpSidecarIndexes(x, y: ref BlobSidecar | ref fulu.DataColumnSidecar): int =
cmp(x[].index, y[].index)

Expand Down Expand Up @@ -508,6 +604,61 @@ proc requestManagerBlockLoop(
debug "Request manager block tick", blocks = shortLog(blockRoots),
sync_speed = speed(start, finish)

proc requestManagerEnvelopeLoop(self: RequestManager)
{.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
await sleepAsync(POLL_INTERVAL)

if self.inhibit():
continue

let missingBlockRoots = self.envelopeQuarantine[].peekMissing().toSeq()
if missingBlockRoots.len() == 0:
continue

var blockRoots: seq[Eth2Digest]
if self.envelopeLoader == nil:
blockRoots = missingBlockRoots
else:
var verifiers:
seq[Future[Result[void, VerifierError]].Raising([CancelledError])]
for blockRoot in missingBlockRoots:
let envelope = self.envelopeLoader(blockRoot).valueOr:
blockRoots.add blockRoot
continue
debug "Loaded orphaned envelope from storage", blockRoot
verifiers.add self.envelopeVerifier(envelope.asSigned())
try:
await allFutures(verifiers)
except CancelledError as exc:
var futs = newSeqOfCap[Future[void].Raising([])](verifiers.len)
for verifier in verifiers:
futs.add verifier.cancelAndWait()
await noCancel allFutures(futs)
raise exc

if blockRoots.len() == 0:
continue

debug "Requesting detected missing envelopes", envelopes = shortLog(blockRoots)
let start = SyncMoment.now(0)

var workers:
array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]

for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = self.fetchEnvelopesFromNetwork(blockRoots)

await allFutures(workers)

let finish = SyncMoment.now(uint64(len(blockRoots)))

debug "Request manager envelope tick",
envelopes = shortLog(blockRoots),
sync_speed = speed(start, finish)

proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
let
wallTime = rman.getBeaconTime()
Expand Down Expand Up @@ -742,6 +893,7 @@ proc requestManagerDataColumnLoop(
proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
rman.blockLoopFuture = rman.requestManagerBlockLoop()
rman.envelopeLoopFuture = rman.requestManagerEnvelopeLoop()
rman.blobLoopFuture = rman.requestManagerBlobLoop()

proc switchToColumnLoop*(rman: var RequestManager) =
Expand All @@ -760,6 +912,8 @@ proc stop*(rman: RequestManager) =
## Stop Request Manager's loop.
if not(isNil(rman.blockLoopFuture)):
rman.blockLoopFuture.cancelSoon()
if not(isNil(rman.envelopeLoopFuture)):
rman.envelopeLoopFuture.cancelSoon()
if not(isNil(rman.blobLoopFuture)):
rman.blobLoopFuture.cancelSoon()
if not(isNil(rman.dataColumnLoopFuture)):
Expand Down
Loading