|
1 | 1 | mod ephemeral_bundle;
|
2 | 2 | mod gossiper;
|
| 3 | + |
| 4 | +use std::{ |
| 5 | + collections::{btree_map::Entry, BTreeMap}, |
| 6 | + num::NonZero, |
| 7 | + sync::Arc, |
| 8 | +}; |
| 9 | + |
| 10 | +use alloy::rpc::types::beacon::events::{ |
| 11 | + BeaconNodeEventTopic, FinalizedCheckpointEvent, HeadEvent, LightClientOptimisticUpdateEvent, |
| 12 | +}; |
| 13 | +use anyhow::{bail, ensure}; |
| 14 | +use ephemeral_bundle::EphemeralBundle; |
| 15 | +use ethereum_rpc_client::{ |
| 16 | + consensus::{event::BeaconEvent, first_slot_in_a_period, ConsensusApi}, |
| 17 | + execution::ExecutionApi, |
| 18 | +}; |
| 19 | +use ethportal_api::{ |
| 20 | + consensus::{ |
| 21 | + beacon_block::BeaconBlockElectra, |
| 22 | + constants::{SLOTS_PER_EPOCH, SLOTS_PER_HISTORICAL_ROOT}, |
| 23 | + header::BeaconBlockHeader, |
| 24 | + }, |
| 25 | + Receipts, |
| 26 | +}; |
| 27 | +use futures::StreamExt; |
| 28 | +use gossiper::Gossiper; |
| 29 | +use lru::LruCache; |
| 30 | +use revm_primitives::B256; |
| 31 | +use tokio::task::JoinHandle; |
| 32 | +use tracing::{error, info, warn}; |
| 33 | +use tree_hash::TreeHash; |
| 34 | +use trin_history::network::HistoryNetwork; |
| 35 | +use trin_metrics::bridge::BridgeMetricsReporter; |
| 36 | + |
| 37 | +use crate::census::Census; |
| 38 | + |
| 39 | +/// Total number of blocks we need to buffer in the bridge. |
| 40 | +/// - 1 for the head block |
| 41 | +/// - 4 blocks as wiggle room for re-orgs, re-orgs should never been greater than 7 slots |
| 42 | +/// - 8 blocks as a buffer for network stability, if there are any problems on the network this will |
| 43 | +/// ensure we are good |
| 44 | +const TOTAL_BUFFER_COUNT: u64 = 1 + 4 + 8; |
| 45 | + |
| 46 | +pub struct EphemeralHistoryBridge { |
| 47 | + gossiper: Gossiper, |
| 48 | + consensus_api: ConsensusApi, |
| 49 | + execution_api: ExecutionApi, |
| 50 | + beacon_blocks: BTreeMap<B256, BeaconBlockElectra>, |
| 51 | + receipts: LruCache<u64, Receipts>, |
| 52 | +} |
| 53 | + |
| 54 | +impl EphemeralHistoryBridge { |
| 55 | + pub async fn new( |
| 56 | + history_network: Arc<HistoryNetwork>, |
| 57 | + head_offer_limit: usize, |
| 58 | + non_ephemeral_offer_limit: usize, |
| 59 | + consensus_api: ConsensusApi, |
| 60 | + execution_api: ExecutionApi, |
| 61 | + census: Census, |
| 62 | + ) -> anyhow::Result<Self> { |
| 63 | + let metrics = BridgeMetricsReporter::new("ephemeral".to_string(), "history"); |
| 64 | + let gossiper = Gossiper::new( |
| 65 | + census, |
| 66 | + history_network, |
| 67 | + metrics, |
| 68 | + head_offer_limit, |
| 69 | + non_ephemeral_offer_limit, |
| 70 | + ); |
| 71 | + |
| 72 | + Ok(Self { |
| 73 | + gossiper, |
| 74 | + consensus_api, |
| 75 | + execution_api, |
| 76 | + beacon_blocks: BTreeMap::new(), |
| 77 | + // We use a cache of 2x SLOTS_PER_EPOCH to ensure we have enough space for the receipts |
| 78 | + receipts: LruCache::new( |
| 79 | + NonZero::new(SLOTS_PER_EPOCH as usize * 2).expect("Should be non-zero"), |
| 80 | + ), |
| 81 | + }) |
| 82 | + } |
| 83 | + |
| 84 | + pub async fn launch(mut self) { |
| 85 | + info!("Launching Ephemeral History bridge"); |
| 86 | + |
| 87 | + let mut stream = self |
| 88 | + .consensus_api |
| 89 | + .get_events_stream( |
| 90 | + &[ |
| 91 | + BeaconNodeEventTopic::Head, |
| 92 | + BeaconNodeEventTopic::LightClientOptimisticUpdate, |
| 93 | + BeaconNodeEventTopic::FinalizedCheckpoint, |
| 94 | + ], |
| 95 | + "ephemeral_history_bridge", |
| 96 | + ) |
| 97 | + .expect("Failed to create event stream"); |
| 98 | + |
| 99 | + while let Some(event) = stream.next().await { |
| 100 | + match event { |
| 101 | + BeaconEvent::Head(head_event) => { |
| 102 | + info!(block_root = ?head_event.block, "Received head"); |
| 103 | + |
| 104 | + if let Err(err) = self.process_head(head_event).await { |
| 105 | + error!("Failed to process head: {err:?}"); |
| 106 | + } |
| 107 | + } |
| 108 | + BeaconEvent::LightClientOptimisticUpdate(light_client_optimistic_update_event) => { |
| 109 | + info!( |
| 110 | + slot = ?light_client_optimistic_update_event |
| 111 | + .data |
| 112 | + .attested_header |
| 113 | + .beacon |
| 114 | + .slot, |
| 115 | + "Received light client optimistic update" |
| 116 | + ); |
| 117 | + |
| 118 | + if let Err(err) = self.process_light_client_optimistic_update( |
| 119 | + light_client_optimistic_update_event, |
| 120 | + ) { |
| 121 | + error!("Failed to process light client optimistic update: {err:?}"); |
| 122 | + } |
| 123 | + } |
| 124 | + BeaconEvent::FinalizedCheckpoint(finalized_checkpoint_event) => { |
| 125 | + info!( |
| 126 | + block_root = ?finalized_checkpoint_event.block, |
| 127 | + "Received finalized checkpoint" |
| 128 | + ); |
| 129 | + |
| 130 | + if let Err(err) = self.process_finalized_checkpoint(finalized_checkpoint_event) |
| 131 | + { |
| 132 | + error!("Failed to process finalized checkpoint: {err:?}"); |
| 133 | + } |
| 134 | + } |
| 135 | + _ => warn!("Received unexpected event: {event:?}"), |
| 136 | + } |
| 137 | + } |
| 138 | + } |
| 139 | + |
| 140 | + async fn process_head(&mut self, head_event: HeadEvent) -> anyhow::Result<()> { |
| 141 | + let mut next_root = head_event.block; |
| 142 | + for _ in 0..TOTAL_BUFFER_COUNT { |
| 143 | + let beacon_block = self.get_or_download_beacon_block(next_root).await?; |
| 144 | + next_root = beacon_block.parent_root; |
| 145 | + } |
| 146 | + |
| 147 | + Ok(()) |
| 148 | + } |
| 149 | + |
| 150 | + fn process_light_client_optimistic_update( |
| 151 | + &mut self, |
| 152 | + light_client_event: LightClientOptimisticUpdateEvent, |
| 153 | + ) -> anyhow::Result<JoinHandle<()>> { |
| 154 | + if self.beacon_blocks.is_empty() { |
| 155 | + warn!("Received light client optimistic update with no beacon blocks, skipping processing"); |
| 156 | + return Ok(tokio::spawn(async {})); |
| 157 | + } |
| 158 | + |
| 159 | + let alloy_beacon_block = light_client_event.data.attested_header.beacon; |
| 160 | + let beacon_block = BeaconBlockHeader { |
| 161 | + slot: alloy_beacon_block.slot, |
| 162 | + proposer_index: alloy_beacon_block.proposer_index, |
| 163 | + parent_root: alloy_beacon_block.parent_root, |
| 164 | + state_root: alloy_beacon_block.state_root, |
| 165 | + body_root: alloy_beacon_block.body_root, |
| 166 | + }; |
| 167 | + |
| 168 | + let mut ephemeral_bundle = EphemeralBundle::new(beacon_block.tree_hash_root()); |
| 169 | + for _ in 0..TOTAL_BUFFER_COUNT { |
| 170 | + self.append_next_block_to_ephemeral_bundle(&mut ephemeral_bundle)?; |
| 171 | + } |
| 172 | + |
| 173 | + let gossiper = self.gossiper.clone(); |
| 174 | + Ok(tokio::spawn(async move { |
| 175 | + gossiper.gossip_ephemeral_bundle(ephemeral_bundle).await; |
| 176 | + })) |
| 177 | + } |
| 178 | + |
| 179 | + fn process_finalized_checkpoint( |
| 180 | + &mut self, |
| 181 | + finalized_checkpoint_event: FinalizedCheckpointEvent, |
| 182 | + ) -> anyhow::Result<Option<JoinHandle<()>>> { |
| 183 | + if finalized_checkpoint_event.epoch % (SLOTS_PER_HISTORICAL_ROOT / SLOTS_PER_EPOCH) != 0 { |
| 184 | + return Ok(None); |
| 185 | + } |
| 186 | + |
| 187 | + // The finalized period proves the last 8192 slots, it can't be proven until the next |
| 188 | + // cycle |
| 189 | + let Some(beacon_block) = self.beacon_blocks.get(&finalized_checkpoint_event.block) else { |
| 190 | + bail!( |
| 191 | + "Beacon block not found for finalized checkpoint, this is a critical bug: {:?}", |
| 192 | + finalized_checkpoint_event |
| 193 | + ); |
| 194 | + }; |
| 195 | + |
| 196 | + let mut blocks = vec![]; |
| 197 | + let mut last_block_root = beacon_block.parent_root; |
| 198 | + let first_slot_in_period = |
| 199 | + first_slot_in_a_period((finalized_checkpoint_event.epoch - 1) * SLOTS_PER_EPOCH); |
| 200 | + while let Some(beacon_block) = self.beacon_blocks.remove(&last_block_root) { |
| 201 | + last_block_root = beacon_block.parent_root; |
| 202 | + let slot = first_slot_in_a_period(beacon_block.slot); |
| 203 | + ensure!( |
| 204 | + slot == first_slot_in_period, |
| 205 | + "Beacon block slot does not match the expected period: {slot:?} != {first_slot_in_period:?}", |
| 206 | + ); |
| 207 | + blocks.push(beacon_block); |
| 208 | + } |
| 209 | + |
| 210 | + // Delete all blocks that are older than finalized epoch. |
| 211 | + // This can happen if there was chain reorg (and maybe in some other unexpected situations). |
| 212 | + self.beacon_blocks |
| 213 | + .retain(|_, block| block.slot >= finalized_checkpoint_event.epoch * SLOTS_PER_EPOCH); |
| 214 | + |
| 215 | + let consensus_api = self.consensus_api.clone(); |
| 216 | + let gossiper = self.gossiper.clone(); |
| 217 | + Ok(Some(tokio::spawn(async move { |
| 218 | + gossiper |
| 219 | + .gossiped_non_ephemeral_headers( |
| 220 | + finalized_checkpoint_event.state, |
| 221 | + blocks, |
| 222 | + consensus_api, |
| 223 | + ) |
| 224 | + .await; |
| 225 | + }))) |
| 226 | + } |
| 227 | + |
| 228 | + /// Return the [BeaconBlock] for a given beacon block root. |
| 229 | + /// |
| 230 | + /// If beacon block is not already in `self.beacon_blocks`, it downloads it and puts it there. |
| 231 | + async fn get_or_download_beacon_block( |
| 232 | + &mut self, |
| 233 | + beacon_block_root: B256, |
| 234 | + ) -> anyhow::Result<&BeaconBlockElectra> { |
| 235 | + match self.beacon_blocks.entry(beacon_block_root) { |
| 236 | + Entry::Occupied(occupied_beacon_block) => Ok(occupied_beacon_block.into_mut()), |
| 237 | + Entry::Vacant(vacant_beacon_block) => { |
| 238 | + let beacon_block = self |
| 239 | + .consensus_api |
| 240 | + .get_beacon_block(beacon_block_root.to_string()) |
| 241 | + .await? |
| 242 | + .message; |
| 243 | + |
| 244 | + let receipts = self |
| 245 | + .execution_api |
| 246 | + .get_receipts(beacon_block.body.execution_payload.block_number) |
| 247 | + .await?; |
| 248 | + |
| 249 | + self.receipts |
| 250 | + .push(beacon_block.body.execution_payload.block_number, receipts); |
| 251 | + Ok(vacant_beacon_block.insert(beacon_block)) |
| 252 | + } |
| 253 | + } |
| 254 | + } |
| 255 | + |
| 256 | + fn append_next_block_to_ephemeral_bundle( |
| 257 | + &mut self, |
| 258 | + ephemeral_bundle: &mut EphemeralBundle, |
| 259 | + ) -> anyhow::Result<()> { |
| 260 | + let Some(beacon_block) = self |
| 261 | + .beacon_blocks |
| 262 | + .get(&ephemeral_bundle.next_parent_root()) |
| 263 | + .cloned() |
| 264 | + else { |
| 265 | + bail!( |
| 266 | + "Beacon block not found for next parent root: {:?}", |
| 267 | + ephemeral_bundle.next_parent_root() |
| 268 | + ); |
| 269 | + }; |
| 270 | + |
| 271 | + let Some(receipts) = self |
| 272 | + .receipts |
| 273 | + .get(&beacon_block.body.execution_payload.block_number) |
| 274 | + .cloned() |
| 275 | + else { |
| 276 | + bail!( |
| 277 | + "Receipts not found for block number: {}", |
| 278 | + beacon_block.body.execution_payload.block_number |
| 279 | + ); |
| 280 | + }; |
| 281 | + |
| 282 | + ephemeral_bundle.push_parent(beacon_block, receipts)?; |
| 283 | + |
| 284 | + Ok(()) |
| 285 | + } |
| 286 | +} |
0 commit comments