Skip to content

Commit 250a80e

Browse files
authored
add gossip validation for dc, and data column quarantine strategy (#6581)
* add gossip validation for dc * review 1 * rm callback * review 2 * added custody columns as a global entity * alpha 8 * few typosA
1 parent 3cb7b91 commit 250a80e

File tree

7 files changed

+383
-8
lines changed

7 files changed

+383
-8
lines changed

beacon_chain/beacon_node.nim

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import
2222
./el/el_manager,
2323
./consensus_object_pools/[
2424
blockchain_dag, blob_quarantine, block_quarantine, consensus_manager,
25-
attestation_pool, sync_committee_msg_pool, validator_change_pool],
25+
data_column_quarantine, attestation_pool, sync_committee_msg_pool, validator_change_pool],
2626
./spec/datatypes/[base, altair],
2727
./spec/eth2_apis/dynamic_fee_recipients,
2828
./sync/[sync_manager, request_manager],
@@ -73,6 +73,7 @@ type
7373
dag*: ChainDAGRef
7474
quarantine*: ref Quarantine
7575
blobQuarantine*: ref BlobQuarantine
76+
dataColumnQuarantine*: ref DataColumnQuarantine
7677
attestationPool*: ref AttestationPool
7778
syncCommitteeMsgPool*: ref SyncCommitteeMsgPool
7879
lightClientPool*: ref LightClientPool
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
# beacon_chain
2+
# Copyright (c) 2018-2024 Status Research & Development GmbH
3+
# Licensed and distributed under either of
4+
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
5+
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
6+
# at your option. This file may not be copied, modified, or distributed except according to those terms.
7+
8+
{.push raises: [].}
9+
10+
import
11+
std/tables,
12+
../spec/datatypes/eip7594,
13+
../spec/helpers
14+
15+
from std/sequtils import mapIt
16+
from std/strutils import join
17+
18+
const
19+
MaxDataColumns = 3 * SLOTS_PER_EPOCH * NUMBER_OF_COLUMNS
20+
## Same limit as `MaxOrphans` in `block_quarantine`
21+
## data columns may arrive before an orphan is tagged `columnless`
22+
23+
type
24+
DataColumnQuarantine* = object
25+
data_columns*:
26+
OrderedTable[DataColumnIdentifier, ref DataColumnSidecar]
27+
supernode*: bool
28+
custody_columns*: seq[ColumnIndex]
29+
onDataColumnSidecarCallback*: OnDataColumnSidecarCallback
30+
31+
DataColumnFetchRecord* = object
32+
block_root*: Eth2Digest
33+
indices*: seq[ColumnIndex]
34+
35+
OnDataColumnSidecarCallback = proc(data: DataColumnSidecar) {.gcsafe, raises: [].}
36+
37+
func init*(T: type DataColumnQuarantine): T =
38+
T()
39+
40+
func shortLog*(x: seq[DataColumnFetchRecord]): string =
41+
"[" & x.mapIt(shortLog(it.block_root) & shortLog(it.indices)).join(", ") & "]"
42+
43+
func put*(quarantine: var DataColumnQuarantine,
44+
dataColumnSidecar: ref DataColumnSidecar) =
45+
if quarantine.data_columns.len >= static(MaxDataColumns.int):
46+
# FIFO if full. For example, sync manager and request manager can race
47+
# to put data columns in at the same time, so one gets data column
48+
# insert -> block resolve -> data column insert, which leaves
49+
# garbage data columns.
50+
#
51+
# This also therefore automatically garbage-collects otherwise valid
52+
# data columns that are correctly signed, point to either correct block
53+
# root which isn't ever seen, and then for any reason simply never used.
54+
var oldest_column_key: DataColumnIdentifier
55+
for k in quarantine.data_columns.keys:
56+
oldest_column_key = k
57+
break
58+
quarantine.data_columns.del(oldest_column_key)
59+
let block_root =
60+
hash_tree_root(dataColumnSidecar.signed_block_header.message)
61+
discard quarantine.data_columns.hasKeyOrPut(
62+
DataColumnIdentifier(block_root: block_root,
63+
index: dataColumnSidecar.index),
64+
dataColumnSidecar)
65+
66+
func hasDataColumn*(
67+
quarantine: DataColumnQuarantine,
68+
slot: Slot,
69+
proposer_index: uint64,
70+
index: ColumnIndex): bool =
71+
for data_column_sidecar in quarantine.data_columns.values:
72+
template block_header: untyped =
73+
data_column_sidecar.signed_block_header.message
74+
if block_header.slot == slot and
75+
block_header.proposer_index == proposer_index and
76+
data_column_sidecar.index == index:
77+
return true
78+
false
79+
80+
func peekColumnIndices*(quarantine: DataColumnQuarantine,
81+
blck: electra.SignedBeaconBlock):
82+
seq[ColumnIndex] =
83+
# Peeks into the currently received column indices
84+
# from quarantine, necessary data availability checks
85+
var indices: seq[ColumnIndex]
86+
for col_idx in quarantine.custody_columns:
87+
if quarantine.data_columns.hasKey(
88+
DataColumnIdentifier(block_root: blck.root,
89+
index: ColumnIndex col_idx)):
90+
indices.add(col_idx)
91+
indices
92+
93+
func gatherDataColumns*(quarantine: DataColumnQuarantine,
94+
digest: Eth2Digest):
95+
seq[ref DataColumnSidecar] =
96+
# Returns the current data columns quried by a
97+
# block header
98+
var columns: seq[ref DataColumnSidecar]
99+
for i in quarantine.custody_columns:
100+
let dc_identifier =
101+
DataColumnIdentifier(
102+
block_root: digest,
103+
index: i)
104+
if quarantine.data_columns.hasKey(dc_identifier):
105+
let value =
106+
quarantine.data_columns.getOrDefault(dc_identifier,
107+
default(ref DataColumnSidecar))
108+
columns.add(value)
109+
columns
110+
111+
func popDataColumns*(
112+
quarantine: var DataColumnQuarantine, digest: Eth2Digest,
113+
blck: electra.SignedBeaconBlock):
114+
seq[ref DataColumnSidecar] =
115+
var r: DataColumnSidecars
116+
for idx in quarantine.custody_columns:
117+
var c: ref DataColumnSidecar
118+
if quarantine.data_columns.pop(
119+
DataColumnIdentifier(block_root: digest,
120+
index: idx),
121+
c):
122+
r.add(c)
123+
r
124+
125+
func hasMissingDataColumns*(quarantine: DataColumnQuarantine,
126+
blck: electra.SignedBeaconBlock): bool =
127+
# `hasMissingDataColumns` consists of the data columns that,
128+
# have been missed over gossip, also in case of a supernode,
129+
# the method would return missing columns when the supernode
130+
# has not received data columns upto the requisite limit (i.e 50%
131+
# of NUMBER_OF_COLUMNS).
132+
133+
# This method shall be actively used by the `RequestManager` to
134+
# root request columns over RPC.
135+
var col_counter = 0
136+
for idx in quarantine.custody_columns:
137+
let dc_identifier =
138+
DataColumnIdentifier(
139+
block_root: blck.root,
140+
index: idx)
141+
if dc_identifier notin quarantine.data_columns:
142+
inc col_counter
143+
if quarantine.supernode and col_counter != NUMBER_OF_COLUMNS:
144+
return false
145+
elif quarantine.supernode == false and
146+
col_counter != max(SAMPLES_PER_SLOT, CUSTODY_REQUIREMENT):
147+
return false
148+
else:
149+
return true
150+
151+
func hasEnoughDataColumns*(quarantine: DataColumnQuarantine,
152+
blck: electra.SignedBeaconBlock): bool =
153+
# `hasEnoughDataColumns` dictates whether there is `enough`
154+
# data columns for a block to be enqueued, ideally for a supernode
155+
# if it receives atleast 50%+ gossip and RPC
156+
157+
# Once 50%+ columns are available we can use this function to
158+
# check it, and thereby check column reconstructability, right from
159+
# gossip validation, consequently populating the quarantine with
160+
# rest of the data columns.
161+
if quarantine.supernode:
162+
let
163+
collectedColumns = quarantine.gatherDataColumns(blck.root)
164+
if collectedColumns.len >= (quarantine.custody_columns.len div 2):
165+
return true
166+
else:
167+
for i in quarantine.custody_columns:
168+
let dc_identifier =
169+
DataColumnIdentifier(
170+
block_root: blck.root,
171+
index: i)
172+
if dc_identifier notin quarantine.data_columns:
173+
return false
174+
else:
175+
return true
176+
177+
func dataColumnFetchRecord*(quarantine: DataColumnQuarantine,
178+
blck: electra.SignedBeaconBlock):
179+
DataColumnFetchRecord =
180+
var indices: seq[ColumnIndex]
181+
for i in quarantine.custody_columns:
182+
let
183+
idx = ColumnIndex(i)
184+
dc_id = DataColumnIdentifier(
185+
block_root: blck.root,
186+
index: idx)
187+
if not quarantine.data_columns.hasKey(
188+
dc_id):
189+
indices.add(idx)
190+
DataColumnFetchRecord(block_root: blck.root, indices: indices)

beacon_chain/gossip_processing/gossip_validation.nim

Lines changed: 147 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ import
1515
stew/byteutils,
1616
# Internals
1717
../spec/[
18-
beaconstate, state_transition_block, forks, helpers, network, signatures],
18+
beaconstate, state_transition_block, forks,
19+
helpers, network, signatures, eip7594_helpers],
1920
../consensus_object_pools/[
2021
attestation_pool, blockchain_dag, blob_quarantine, block_quarantine,
21-
spec_cache, light_client_pool, sync_committee_msg_pool,
22+
data_column_quarantine, spec_cache, light_client_pool, sync_committee_msg_pool,
2223
validator_change_pool],
2324
".."/[beacon_clock],
2425
./batch_validation
@@ -209,6 +210,22 @@ func check_blob_sidecar_inclusion_proof(
209210

210211
ok()
211212

213+
func check_data_column_sidecar_inclusion_proof(
214+
data_column_sidecar: DataColumnSidecar): Result[void, ValidationError] =
215+
let res = data_column_sidecar.verify_data_column_sidecar_inclusion_proof()
216+
if res.isErr:
217+
return errReject(res.error)
218+
219+
ok()
220+
221+
proc check_data_column_sidecar_kzg_proofs(
222+
data_column_sidecar: DataColumnSidecar): Result[void, ValidationError] =
223+
let res = data_column_sidecar.verify_data_column_sidecar_kzg_proofs()
224+
if res.isErr:
225+
return errReject(res.error)
226+
227+
ok()
228+
212229
# Gossip Validation
213230
# ----------------------------------------------------------------
214231

@@ -475,6 +492,134 @@ proc validateBlobSidecar*(
475492

476493
ok()
477494

495+
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#data_column_sidecar_subnet_id
496+
proc validateDataColumnSidecar*(
497+
dag: ChainDAGRef, quarantine: ref Quarantine,
498+
dataColumnQuarantine: ref DataColumnQuarantine,
499+
data_column_sidecar: DataColumnSidecar,
500+
wallTime: BeaconTime, subnet_id: uint64):
501+
Result[void, ValidationError] =
502+
503+
template block_header: untyped = data_column_sidecar.signed_block_header.message
504+
505+
# [REJECT] The sidecar's index is consistent with `NUMBER_OF_COLUMNS`
506+
# -- i.e. `data_column_sidecar.index < NUMBER_OF_COLUMNS`
507+
if not (data_column_sidecar.index < NUMBER_OF_COLUMNS):
508+
return dag.checkedReject("DataColumnSidecar: The sidecar's index should be consistent with NUMBER_OF_COLUMNS")
509+
510+
# [REJECT] The sidecar is for the correct subnet
511+
# -- i.e. `compute_subnet_for_data_column_sidecar(blob_sidecar.index) == subnet_id`.
512+
if not (compute_subnet_for_data_column_sidecar(data_column_sidecar.index) == subnet_id):
513+
return dag.checkedReject("DataColumnSidecar: The sidecar is not for the correct subnet")
514+
515+
# [IGNORE] The sidecar is not from a future slot
516+
# (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance) -- i.e. validate that
517+
# `block_header.slot <= current_slot`(a client MAY queue future sidecars for
518+
# processing at the appropriate slot).
519+
if not (block_header.slot <=
520+
(wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero):
521+
return errIgnore("DataColumnSidecar: slot too high")
522+
523+
# [IGNORE] The sidecar is from a slot greater than the latest
524+
# finalized slot -- i.e. validate that `block_header.slot >
525+
# compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)`
526+
if not (block_header.slot > dag.finalizedHead.slot):
527+
return errIgnore("DataColumnSidecar: slot already finalized")
528+
529+
# [IGNORE] The sidecar is the first sidecar for the tuple
530+
# (block_header.slot, block_header.proposer_index, data_column_sidecar.index)
531+
# with valid header signature, sidecar inclusion proof, and kzg proof.
532+
let block_root = hash_tree_root(block_header)
533+
if dag.getBlockRef(block_root).isSome():
534+
return errIgnore("DataColumnSidecar: already have block")
535+
if dataColumnQuarantine[].hasDataColumn(
536+
block_header.slot, block_header.proposer_index, data_column_sidecar.index):
537+
return errIgnore("DataColumnSidecar: already have valid data column from same proposer")
538+
539+
# [REJECT] The sidecar's `kzg_commitments` inclusion proof is valid as verified by
540+
# `verify_data_column_sidecar_inclusion_proof(sidecar)`.
541+
block:
542+
let v = check_data_column_sidecar_inclusion_proof(data_column_sidecar)
543+
if v.isErr:
544+
return dag.checkedReject(v.error)
545+
546+
# [IGNORE] The sidecar's block's parent (defined by
547+
# `block_header.parent_root`) has been seen (via both gossip and
548+
# non-gossip sources) (a client MAY queue sidecars for processing
549+
# once the parent block is retrieved).
550+
#
551+
# [REJECT] The sidecar's block's parent (defined by
552+
# `block_header.parent_root`) passes validation.
553+
let parent = dag.getBlockRef(block_header.parent_root).valueOr:
554+
if block_header.parent_root in quarantine[].unviable:
555+
quarantine[].addUnviable(block_root)
556+
return dag.checkedReject("DataColumnSidecar: parent not validated")
557+
else:
558+
quarantine[].addMissing(block_header.parent_root)
559+
return errIgnore("DataColumnSidecar: parent not found")
560+
561+
# [REJECT] The sidecar is from a higher slot than the sidecar's
562+
# block's parent (defined by `block_header.parent_root`).
563+
if not (block_header.slot > parent.bid.slot):
564+
return dag.checkedReject("DataColumnSidecar: slot lower than parents'")
565+
566+
# [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's
567+
# block -- i.e. `get_checkpoint_block(store, block_header.parent_root,
568+
# store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`.
569+
let
570+
finalized_checkpoint = getStateField(dag.headState, finalized_checkpoint)
571+
ancestor = get_ancestor(parent, finalized_checkpoint.epoch.start_slot)
572+
573+
if ancestor.isNil:
574+
# This shouldn't happen: we should always be able to trace the parent back
575+
# to the finalized checkpoint (else it wouldn't be in the DAG)
576+
return errIgnore("DataColumnSidecar: Can't find ancestor")
577+
578+
if not (
579+
finalized_checkpoint.root == ancestor.root or
580+
finalized_checkpoint.root.isZero):
581+
quarantine[].addUnviable(block_root)
582+
return dag.checkedReject(
583+
"DataColumnSidecar: Finalized checkpoint not an ancestor")
584+
585+
# [REJECT] The sidecar is proposed by the expected `proposer_index`
586+
# for the block's slot in the context of the current shuffling
587+
# (defined by `block_header.parent_root`/`block_header.slot`).
588+
# If the proposer_index cannot immediately be verified against the expected
589+
# shuffling, the sidecar MAY be queued for later processing while proposers
590+
# for the block's branch are calculated -- in such a case do not
591+
# REJECT, instead IGNORE this message.
592+
let proposer = getProposer(dag, parent, block_header.slot).valueOr:
593+
warn "cannot compute proposer for data column"
594+
return errIgnore("DataColumnSidecar: Cannot compute proposer") # internal issue
595+
596+
if uint64(proposer) != block_header.proposer_index:
597+
return dag.checkedReject("DataColumnSidecar: Unexpected proposer")
598+
599+
# [REJECT] The proposer signature of `data_column_sidecar.signed_block_header`,
600+
# is valid with respect to the `block_header.proposer_index` pubkey.
601+
if not verify_block_signature(
602+
dag.forkAtEpoch(block_header.slot.epoch),
603+
getStateField(dag.headState, genesis_validators_root),
604+
block_header.slot,
605+
block_root,
606+
dag.validatorKey(proposer).get(),
607+
data_column_sidecar.signed_block_header.signature):
608+
return dag.checkedReject("DataColumnSidecar: Invalid proposer signature")
609+
610+
# [REJECT] The sidecar's column data is valid as
611+
# verified by `verify_data_column_kzg_proofs(sidecar)`
612+
block:
613+
let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar)
614+
if r.isErr:
615+
return dag.checkedReject(r.error)
616+
617+
# Send notification about new data column sidecar via callback
618+
if not(isNil(dataColumnQuarantine.onDataColumnSidecarCallback)):
619+
dataColumnQuarantine.onDataColumnSidecarCallback(data_column_sidecar)
620+
621+
ok()
622+
478623
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/phase0/p2p-interface.md#beacon_block
479624
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/bellatrix/p2p-interface.md#beacon_block
480625
proc validateBeaconBlock*(

0 commit comments

Comments
 (0)