Skip to content

Commit 44a974d

Browse files
authored
feat: update chain_head (#1836)
1 parent 2ed3618 commit 44a974d

File tree

11 files changed

+302
-83
lines changed

11 files changed

+302
-83
lines changed

bin/trin/src/run.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use trin_state::initialize_state_network;
1919
use trin_storage::{config::StorageCapacityConfig, PortalStorageConfigFactory};
2020
#[cfg(windows)]
2121
use trin_utils::cli::Web3TransportType;
22-
use trin_validation::oracle::HeaderOracle;
22+
use trin_validation::{chain_head::ChainHead, oracle::HeaderOracle};
2323
use utp_rs::socket::UtpSocket;
2424

2525
use crate::{
@@ -121,6 +121,9 @@ async fn run_trin_internal(
121121
// Initialize validation oracle
122122
let header_oracle = Arc::new(RwLock::new(HeaderOracle::default()));
123123

124+
// Initialize head of the chain management.
125+
let chain_head = ChainHead::new_pectra_defaults();
126+
124127
// Initialize and spawn uTP socket
125128
let (utp_talk_reqs_tx, utp_talk_reqs_rx) = mpsc::unbounded_channel();
126129

@@ -170,6 +173,7 @@ async fn run_trin_internal(
170173
portalnet_config.clone(),
171174
storage_config_factory.create(&Subnetwork::Beacon, Distance::MAX)?,
172175
header_oracle.clone(),
176+
chain_head,
173177
)
174178
.await?
175179
} else {

crates/ethportal-api/src/types/consensus/light_client/update.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use ssz_types::{
1010
use superstruct::superstruct;
1111
use tree_hash_derive::TreeHash;
1212

13+
use super::header::LightClientHeader;
1314
use crate::{
1415
light_client::header::{LightClientHeaderDeneb, LightClientHeaderElectra},
1516
types::consensus::{
@@ -108,4 +109,24 @@ impl LightClientUpdate {
108109
ForkName::Electra => LightClientUpdateElectra::from_ssz_bytes(bytes).map(Self::Electra),
109110
}
110111
}
112+
113+
pub fn attested_header(&self) -> LightClientHeader {
114+
match self {
115+
Self::Bellatrix(update) => LightClientHeader::Bellatrix(update.attested_header.clone()),
116+
Self::Capella(update) => LightClientHeader::Capella(update.attested_header.clone()),
117+
Self::Deneb(update) => LightClientHeader::Deneb(update.attested_header.clone()),
118+
Self::Electra(update) => LightClientHeader::Electra(update.attested_header.clone()),
119+
}
120+
}
121+
122+
pub fn finalized_header(&self) -> LightClientHeader {
123+
match self {
124+
Self::Bellatrix(update) => {
125+
LightClientHeader::Bellatrix(update.finalized_header.clone())
126+
}
127+
Self::Capella(update) => LightClientHeader::Capella(update.finalized_header.clone()),
128+
Self::Deneb(update) => LightClientHeader::Deneb(update.finalized_header.clone()),
129+
Self::Electra(update) => LightClientHeader::Electra(update.finalized_header.clone()),
130+
}
131+
}
111132
}

crates/light-client/src/client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::{
1919
errors::NodeError,
2020
node::Node,
2121
rpc::Rpc,
22+
watch::LightClientWatchReceivers,
2223
};
2324

2425
#[derive(Default)]
@@ -452,4 +453,8 @@ impl<DB: Database, R: ConsensusRpc + 'static> Client<DB, R> {
452453
pub async fn get_light_client_store(&self) -> Result<LightClientStore> {
453454
self.node.read().await.get_light_client_store()
454455
}
456+
457+
pub async fn get_light_client_watch_receivers(&self) -> LightClientWatchReceivers {
458+
self.node.read().await.consensus.get_watch_receivers()
459+
}
455460
}

crates/light-client/src/consensus/consensus_client.rs

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use ethportal_api::{
1414
finality_update::{LightClientFinalityUpdate, LightClientFinalityUpdateElectra},
1515
optimistic_update::{LightClientOptimisticUpdate, LightClientOptimisticUpdateElectra},
1616
store::LightClientStore,
17-
update::{FinalizedRootProofLenElectra, LightClientUpdateElectra},
17+
update::{FinalizedRootProofLenElectra, LightClientUpdate, LightClientUpdateElectra},
1818
},
1919
};
2020
use milagro_bls::PublicKey;
@@ -28,6 +28,7 @@ use crate::{
2828
consensus::{
2929
constants::MAX_REQUEST_LIGHT_CLIENT_UPDATES, rpc::portal_rpc::expected_current_slot,
3030
},
31+
watch::{light_client_watch_channels, LightClientWatchReceivers, LightClientWatchSenders},
3132
};
3233

3334
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md
@@ -40,6 +41,7 @@ pub struct ConsensusLightClient<R: ConsensusRpc> {
4041
initial_checkpoint: B256,
4142
pub last_checkpoint: Option<B256>,
4243
pub config: Arc<Config>,
44+
watch_senders: LightClientWatchSenders,
4345
}
4446

4547
impl<R: ConsensusRpc> ConsensusLightClient<R> {
@@ -49,23 +51,27 @@ impl<R: ConsensusRpc> ConsensusLightClient<R> {
4951
config: Arc<Config>,
5052
) -> Result<ConsensusLightClient<R>> {
5153
let rpc = R::new(rpc);
54+
let (watch_senders, _watch_receivers) = light_client_watch_channels();
5255

5356
Ok(ConsensusLightClient {
5457
rpc,
5558
store: LightClientStore::default(),
5659
last_checkpoint: None,
5760
config,
5861
initial_checkpoint: checkpoint_block_root,
62+
watch_senders,
5963
})
6064
}
6165

6266
pub fn with_custom_rpc(rpc: R, checkpoint_block_root: B256, config: Arc<Config>) -> Self {
67+
let (watch_senders, _watch_receivers) = light_client_watch_channels();
6368
ConsensusLightClient {
6469
rpc,
6570
store: LightClientStore::default(),
6671
last_checkpoint: None,
6772
config,
6873
initial_checkpoint: checkpoint_block_root,
74+
watch_senders,
6975
}
7076
}
7177

@@ -105,6 +111,10 @@ impl<R: ConsensusRpc> ConsensusLightClient<R> {
105111
&self.store
106112
}
107113

114+
pub fn get_watch_receivers(&self) -> LightClientWatchReceivers {
115+
self.watch_senders.subscribe()
116+
}
117+
108118
pub async fn sync(&mut self) -> Result<()> {
109119
self.bootstrap().await?;
110120

@@ -366,13 +376,35 @@ impl<R: ConsensusRpc> ConsensusLightClient<R> {
366376
}
367377

368378
fn apply_update(&mut self, update: &LightClientUpdateElectra) {
369-
let update = GenericUpdate::from(update);
370-
self.apply_generic_update(&update);
379+
let optimistic_header_slot = self.store.optimistic_header.slot;
380+
let finalized_header_slot = self.store.finalized_header.slot;
381+
382+
let generic_update = GenericUpdate::from(update);
383+
self.apply_generic_update(&generic_update);
384+
385+
if optimistic_header_slot < self.store.optimistic_header.slot
386+
|| finalized_header_slot < self.store.finalized_header.slot
387+
{
388+
self.watch_senders
389+
.update
390+
.send_replace(Some(LightClientUpdate::Electra(update.clone())));
391+
}
371392
}
372393

373394
fn apply_finality_update(&mut self, update: &LightClientFinalityUpdateElectra) {
374-
let update = GenericUpdate::from(update);
375-
self.apply_generic_update(&update);
395+
let optimistic_header_slot = self.store.optimistic_header.slot;
396+
let finalized_header_slot = self.store.finalized_header.slot;
397+
398+
let generic_update = GenericUpdate::from(update);
399+
self.apply_generic_update(&generic_update);
400+
401+
if optimistic_header_slot < self.store.optimistic_header.slot
402+
|| finalized_header_slot < self.store.finalized_header.slot
403+
{
404+
self.watch_senders
405+
.finality_update
406+
.send_replace(Some(LightClientFinalityUpdate::Electra(update.clone())));
407+
}
376408
}
377409

378410
fn log_finality_update(&self, update: &GenericUpdate) {
@@ -393,8 +425,16 @@ impl<R: ConsensusRpc> ConsensusLightClient<R> {
393425
}
394426

395427
fn apply_optimistic_update(&mut self, update: &LightClientOptimisticUpdateElectra) {
396-
let update = GenericUpdate::from(update);
397-
self.apply_generic_update(&update);
428+
let optimistic_header_slot = self.store.optimistic_header.slot;
429+
430+
let generic_update = GenericUpdate::from(update);
431+
self.apply_generic_update(&generic_update);
432+
433+
if optimistic_header_slot < self.store.optimistic_header.slot {
434+
self.watch_senders
435+
.optimistic_update
436+
.send_replace(Some(LightClientOptimisticUpdate::Electra(update.clone())));
437+
}
398438
}
399439

400440
fn log_optimistic_update(&self, update: &GenericUpdate) {

crates/light-client/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ pub mod errors;
1010
pub mod node;
1111
pub mod rpc;
1212
pub mod utils;
13+
pub mod watch;

crates/light-client/src/watch.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use ethportal_api::light_client::{
2+
finality_update::LightClientFinalityUpdate, optimistic_update::LightClientOptimisticUpdate,
3+
update::LightClientUpdate,
4+
};
5+
use tokio::sync::watch;
6+
7+
/// The [tokio::sync::watch::channel] Receivers for light client update types.
8+
///
9+
/// Value will be `None` until first update.
10+
#[derive(Clone)]
11+
pub struct LightClientWatchReceivers {
12+
pub update: watch::Receiver<Option<LightClientUpdate>>,
13+
pub optimistic_update: watch::Receiver<Option<LightClientOptimisticUpdate>>,
14+
pub finality_update: watch::Receiver<Option<LightClientFinalityUpdate>>,
15+
}
16+
17+
/// The [tokio::sync::watch::channel] Senders for light client update types.
18+
#[derive(Debug, Clone)]
19+
pub struct LightClientWatchSenders {
20+
pub update: watch::Sender<Option<LightClientUpdate>>,
21+
pub optimistic_update: watch::Sender<Option<LightClientOptimisticUpdate>>,
22+
pub finality_update: watch::Sender<Option<LightClientFinalityUpdate>>,
23+
}
24+
25+
impl LightClientWatchSenders {
26+
pub fn subscribe(&self) -> LightClientWatchReceivers {
27+
LightClientWatchReceivers {
28+
update: self.update.subscribe(),
29+
optimistic_update: self.optimistic_update.subscribe(),
30+
finality_update: self.finality_update.subscribe(),
31+
}
32+
}
33+
}
34+
35+
pub fn light_client_watch_channels() -> (LightClientWatchSenders, LightClientWatchReceivers) {
36+
let (update_sender, update_receiver) = watch::channel(None);
37+
let (optimistic_update_sender, optimistic_update_receiver) = watch::channel(None);
38+
let (finality_update_sender, finality_update_receiver) = watch::channel(None);
39+
let senders = LightClientWatchSenders {
40+
update: update_sender,
41+
optimistic_update: optimistic_update_sender,
42+
finality_update: finality_update_sender,
43+
};
44+
let receivers = LightClientWatchReceivers {
45+
update: update_receiver,
46+
optimistic_update: optimistic_update_receiver,
47+
finality_update: finality_update_receiver,
48+
};
49+
(senders, receivers)
50+
}

crates/subnetworks/beacon/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use tokio::{
2626
};
2727
use tracing::info;
2828
use trin_storage::PortalStorageConfig;
29-
use trin_validation::oracle::HeaderOracle;
29+
use trin_validation::{chain_head::ChainHead, oracle::HeaderOracle};
3030
use utp_rs::socket::UtpSocket;
3131

3232
use crate::{events::BeaconEvents, jsonrpc::BeaconRequestHandler, network::BeaconNetwork};
@@ -46,6 +46,7 @@ pub async fn initialize_beacon_network(
4646
portalnet_config: PortalnetConfig,
4747
storage_config: PortalStorageConfig,
4848
header_oracle: Arc<RwLock<HeaderOracle>>,
49+
chain_head: ChainHead,
4950
) -> anyhow::Result<(
5051
BeaconHandler,
5152
BeaconNetworkTask,
@@ -62,6 +63,7 @@ pub async fn initialize_beacon_network(
6263
storage_config,
6364
portalnet_config.clone(),
6465
header_oracle,
66+
chain_head,
6567
)
6668
.await?;
6769
let beacon_event_stream = beacon_network.overlay.event_stream().await?;

crates/subnetworks/beacon/src/network.rs

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use portalnet::{
1616
use tokio::sync::{Mutex, RwLock};
1717
use tracing::{error, info};
1818
use trin_storage::PortalStorageConfig;
19-
use trin_validation::oracle::HeaderOracle;
19+
use trin_validation::{chain_head::ChainHead, oracle::HeaderOracle};
2020
use utp_rs::socket::UtpSocket;
2121

2222
use crate::{
@@ -51,14 +51,18 @@ impl BeaconNetwork {
5151
storage_config: PortalStorageConfig,
5252
portal_config: PortalnetConfig,
5353
header_oracle: Arc<RwLock<HeaderOracle>>,
54+
chain_head: ChainHead,
5455
) -> anyhow::Result<Self> {
5556
let config = OverlayConfig {
5657
bootnode_enrs: portal_config.bootnodes.clone(),
5758
utp_transfer_limit: portal_config.utp_transfer_limit,
5859
gossip_dropped: GOSSIP_DROPPED,
5960
..Default::default()
6061
};
61-
let storage = Arc::new(PLMutex::new(BeaconStorage::new(storage_config)?));
62+
let storage = Arc::new(PLMutex::new(BeaconStorage::new(
63+
storage_config,
64+
chain_head.clone(),
65+
)?));
6266
storage.lock().spawn_pruning_task(); // Spawn pruning task to clean up expired content.
6367
let storage_clone = Arc::clone(&storage);
6468
let validator = Arc::new(BeaconValidator::new(header_oracle));
@@ -81,17 +85,49 @@ impl BeaconNetwork {
8185
// Get the trusted block root to start syncing from.
8286
let trusted_block_root = get_trusted_block_root(&portal_config, storage_clone)?;
8387

84-
// Spawn the beacon sync task.
88+
// Spawn light client sync and update watch task
8589
tokio::spawn(async move {
86-
let beacon_sync = BeaconSync::new(overlay_tx);
87-
let beacon_sync = beacon_sync.start(trusted_block_root).await;
88-
match beacon_sync {
89-
Ok(client) => {
90-
let mut beacon_client = beacon_client_clone.lock().await;
91-
*beacon_client = Some(client);
92-
}
90+
// Sync LightClient
91+
let client = match BeaconSync::new(overlay_tx).start(trusted_block_root).await {
92+
Ok(client) => client,
9393
Err(err) => {
9494
error!(error = %err, "Failed to start beacon sync.");
95+
return;
96+
}
97+
};
98+
let mut watch_receivers = client.get_light_client_watch_receivers().await;
99+
*beacon_client_clone.lock().await = Some(client);
100+
101+
// Watch for light clients updates and update ChainHead
102+
loop {
103+
tokio::select! {
104+
Ok(()) = watch_receivers.update.changed() => {
105+
let update = watch_receivers
106+
.update
107+
.borrow_and_update()
108+
.as_ref()
109+
.expect("Updated LightClientUpdate must be present")
110+
.clone();
111+
chain_head.process_update(update);
112+
}
113+
Ok(()) = watch_receivers.optimistic_update.changed() => {
114+
let update = watch_receivers
115+
.optimistic_update
116+
.borrow_and_update()
117+
.as_ref()
118+
.expect("Updated LightClientOptimisticUpdate must be present")
119+
.clone();
120+
chain_head.process_optimistic_update(update);
121+
}
122+
Ok(()) = watch_receivers.finality_update.changed() => {
123+
let update = watch_receivers
124+
.finality_update
125+
.borrow_and_update()
126+
.as_ref()
127+
.expect("Updated LightClientFinalityUpdate must be present")
128+
.clone();
129+
chain_head.process_finality_update(update);
130+
}
95131
}
96132
}
97133
});
@@ -176,12 +212,14 @@ mod tests {
176212
temp_dir.path().to_path_buf(),
177213
)
178214
.unwrap();
179-
180215
let storage_config = storage_cfg_factory
181216
.create(&Subnetwork::Beacon, Distance::MAX)
182217
.unwrap();
218+
183219
// A mock storage that always returns None
184-
let storage_clone = Arc::new(PLMutex::new(BeaconStorage::new(storage_config).unwrap()));
220+
let storage_clone = Arc::new(PLMutex::new(
221+
BeaconStorage::new(storage_config, ChainHead::new_pectra_defaults()).unwrap(),
222+
));
185223

186224
let result = get_trusted_block_root(&portal_config, storage_clone)
187225
.expect("Function should not fail with an Err");

0 commit comments

Comments
 (0)