diff --git a/Cargo.toml b/Cargo.toml index 0ce8a324bd6..16947c5e302 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -294,13 +294,14 @@ debug = "line-tables-only" inherits = "dev" strip = "none" debug = "full" +opt-level = 2 # Don't optimize the binaries in the workspace. Compilation of dependencies can be cached # effectively on the CI because they rarely change, whereas the workspace code usually changes and # workspace compilation artifacts are therefore not cached. However, some of our tests don't work if # we don't have any optimizations in the workspace. [profile.test] -opt-level = 1 +opt-level = 2 [profile.test.package.sequencer] opt-level = 0 diff --git a/sequencer/api/migrations/postgres/V1103__reward_state.sql b/sequencer/api/migrations/postgres/V1103__reward_state.sql new file mode 100644 index 00000000000..5e5d3b8a3eb --- /dev/null +++ b/sequencer/api/migrations/postgres/V1103__reward_state.sql @@ -0,0 +1,10 @@ +-- Table to store reward account balances at each height + +CREATE TABLE reward_state ( + height BIGINT NOT NULL, + account JSONB NOT NULL, + balance JSONB NOT NULL, + PRIMARY KEY (height, account) +); + +CREATE INDEX reward_state_height_idx ON reward_state (height); diff --git a/sequencer/api/migrations/sqlite/V903__reward_state.sql b/sequencer/api/migrations/sqlite/V903__reward_state.sql new file mode 100644 index 00000000000..5e5d3b8a3eb --- /dev/null +++ b/sequencer/api/migrations/sqlite/V903__reward_state.sql @@ -0,0 +1,10 @@ +-- Table to store reward account balances at each height + +CREATE TABLE reward_state ( + height BIGINT NOT NULL, + account JSONB NOT NULL, + balance JSONB NOT NULL, + PRIMARY KEY (height, account) +); + +CREATE INDEX reward_state_height_idx ON reward_state (height); diff --git a/sequencer/src/api/sql.rs b/sequencer/src/api/sql.rs index cea4c28adef..35b4efde0ae 100644 --- a/sequencer/src/api/sql.rs +++ b/sequencer/src/api/sql.rs @@ -14,8 +14,8 @@ use espresso_types::{ RewardAccountProofV2, RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2, REWARD_MERKLE_TREE_V2_HEIGHT, }, - BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2, - NodeState, ValidatedState, + BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Header, + Leaf2, NodeState, ValidatedState, }; use hotshot::traits::ValidatedState as _; use hotshot_query_service::{ @@ -29,7 +29,7 @@ use hotshot_query_service::{ }, VersionedDataSource, }, - merklized_state::{MerklizedState, Snapshot}, + merklized_state::Snapshot, Resolvable, }; use hotshot_types::{ @@ -310,46 +310,39 @@ impl CatchupStorage for SqlStorage { return Ok(Vec::new()); } - // get the latest balance for each account. - // use DISTINCT ON for Postgres - // use ROW_NUMBER() as DISTINCT ON is not supported for SQLite + // Query reward_state table for the latest balance for each account up to the given height + // Use DISTINCT ON for Postgres, ROW_NUMBER for SQLite #[cfg(not(feature = "embedded-db"))] - let query = format!( - "SELECT DISTINCT ON (idx) idx, entry - FROM {} - WHERE idx IS NOT NULL AND created <= $1 - ORDER BY idx DESC, created DESC - LIMIT $2 OFFSET $3", - RewardMerkleTreeV2::state_type() - ); + let query = "SELECT DISTINCT ON (account) account, balance + FROM reward_state + WHERE height <= $1 + ORDER BY account, height DESC + LIMIT $2 OFFSET $3"; #[cfg(feature = "embedded-db")] - let query = format!( - "SELECT idx, entry FROM ( - SELECT idx, entry, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY created DESC) as \ - rn - FROM {} - WHERE created <= $1 AND idx IS NOT NULL + let query = "SELECT account, balance FROM ( + SELECT account, balance, ROW_NUMBER() OVER (PARTITION BY account ORDER BY height \ + DESC) as rn + FROM reward_state + WHERE height <= $1 ) sub WHERE rn = 1 - ORDER BY idx DESC - LIMIT $2 OFFSET $3", - RewardMerkleTreeV2::state_type() - ); + ORDER BY account + LIMIT $2 OFFSET $3"; - let rows = query_as::<(Value, Value)>(&query) + let rows = query_as::<(Value, Value)>(query) .bind(height as i64) .bind(limit as i64) .bind(offset as i64) .fetch_all(tx.as_mut()) .await - .context("loading reward accounts from storage")?; + .context("loading reward accounts from reward_state table")?; let mut accounts = Vec::new(); - for (idx, entry) in rows { + for (account_json, balance_json) in rows { let account: RewardAccountV2 = - serde_json::from_value(idx).context("deserializing reward account")?; - let balance: RewardAmount = serde_json::from_value(entry).context(format!( + serde_json::from_value(account_json).context("deserializing reward account")?; + let balance: RewardAmount = serde_json::from_value(balance_json).context(format!( "deserializing reward balance for account {account}" ))?; @@ -496,6 +489,16 @@ impl CatchupStorage for SqlStorage { Ok(chain) } + + async fn get_header(&self, height: u64) -> anyhow::Result
{ + let mut tx = self + .read() + .await + .context(format!("opening transaction to fetch header at {height}"))?; + tx.get_header(BlockId::::from(height as usize)) + .await + .context(format!("header {height} not available")) + } } impl RewardAccountProofDataSource for DataSource { @@ -586,6 +589,13 @@ impl CatchupStorage for DataSource { .get_all_reward_accounts(height, offset, limit) .await } + + async fn get_header(&self, height: u64) -> anyhow::Result
{ + self.as_ref() + .get_header(BlockId::::from(height as usize)) + .await + .context(format!("header {height} not available")) + } } #[async_trait] @@ -603,6 +613,40 @@ impl ChainConfigPersistence for Transaction { } } +#[async_trait] +impl crate::state::RewardStatePersistence for Transaction { + async fn store_reward_state( + &mut self, + height: u64, + accounts: Vec<(RewardAccountV2, RewardAmount)>, + ) -> anyhow::Result<()> { + if accounts.is_empty() { + return Ok(()); + } + + // Build batch of (height, account, balance) tuples for upsert + let rows: Vec<_> = accounts + .iter() + .map(|(account, balance)| { + let account_json = + serde_json::to_value(account).expect("failed to serialize account"); + let balance_json = + serde_json::to_value(balance).expect("failed to serialize balance"); + (height as i64, account_json, balance_json) + }) + .collect(); + + self.upsert( + "reward_state", + ["height", "account", "balance"], + ["height", "account"], + rows, + ) + .await + .context("failed to upsert reward state") + } +} + async fn load_frontier( tx: &mut Transaction, height: u64, diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs index b297f8786d2..48454113e63 100644 --- a/sequencer/src/catchup.rs +++ b/sequencer/src/catchup.rs @@ -476,6 +476,25 @@ impl StateCatchup for StatePeers { .await } + #[tracing::instrument(skip(self))] + async fn try_fetch_all_reward_accounts( + &self, + retry: usize, + height: u64, + offset: u64, + limit: u64, + ) -> anyhow::Result> { + self.fetch(retry, |client| async move { + client + .get::>(&format!( + "catchup/{height}/reward-amounts/{limit}/{offset}" + )) + .send() + .await + }) + .await + } + async fn try_fetch_state_cert( &self, retry: usize, @@ -892,6 +911,21 @@ where Ok(proofs) } + async fn try_fetch_all_reward_accounts( + &self, + _retry: usize, + height: u64, + offset: u64, + limit: u64, + ) -> anyhow::Result> { + self.db + .get_all_reward_accounts(height, offset, limit) + .await + .with_context(|| { + format!("failed to get all reward accounts at height {height} from DB") + }) + } + async fn try_fetch_state_cert( &self, _retry: usize, @@ -1018,6 +1052,16 @@ impl StateCatchup for NullStateCatchup { bail!("state catchup is disabled"); } + async fn try_fetch_all_reward_accounts( + &self, + _retry: usize, + _height: u64, + _offset: u64, + _limit: u64, + ) -> anyhow::Result> { + bail!("state catchup is disabled"); + } + async fn try_fetch_state_cert( &self, _retry: usize, @@ -1447,6 +1491,36 @@ impl StateCatchup for ParallelStateCatchup { .await } + async fn try_fetch_all_reward_accounts( + &self, + retry: usize, + height: u64, + offset: u64, + limit: u64, + ) -> anyhow::Result> { + // Try to get the accounts on local providers first + let local_result = self + .on_local_providers(move |provider| async move { + provider + .try_fetch_all_reward_accounts(retry, height, offset, limit) + .await + }) + .await; + + // Check if we were successful locally + if local_result.is_ok() { + return local_result; + } + + // If that fails, try the remote ones + self.on_remote_providers(move |provider| async move { + provider + .try_fetch_all_reward_accounts(retry, height, offset, limit) + .await + }) + .await + } + async fn try_fetch_state_cert( &self, retry: usize, diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 99eef9f0130..616c88bc6fb 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -620,6 +620,8 @@ where &persistence.clone(), ); + let epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new())); + let instance_state = NodeState { chain_config: genesis.chain_config, genesis_chain_config, @@ -635,7 +637,7 @@ where coordinator: coordinator.clone(), genesis_version: genesis.genesis_version, epoch_start_block: genesis.epoch_start_block.unwrap_or_default(), - epoch_rewards_calculator: Arc::new(Mutex::new(EpochRewardsCalculator::new())), + epoch_rewards_calculator, }; // Initialize the Libp2p network @@ -1386,12 +1388,14 @@ pub mod testing { V::Base::VERSION, coordinator.clone(), V::Base::VERSION, - ) - .with_current_version(V::Base::version()) - .with_genesis(state) - .with_epoch_height(config.epoch_height) - .with_upgrades(upgrades) - .with_epoch_start_block(config.epoch_start_block); + ); + + let node_state = node_state + .with_current_version(V::Base::version()) + .with_genesis(state) + .with_epoch_height(config.epoch_height) + .with_upgrades(upgrades) + .with_epoch_start_block(config.epoch_start_block); tracing::info!( i, diff --git a/sequencer/src/persistence/fs.rs b/sequencer/src/persistence/fs.rs index 142c69a374a..acb7519b0ac 100644 --- a/sequencer/src/persistence/fs.rs +++ b/sequencer/src/persistence/fs.rs @@ -1422,6 +1422,10 @@ impl SequencerPersistence for Persistence { Ok(()) } + async fn backfill_reward_state(&self) -> anyhow::Result<()> { + Ok(()) + } + async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> { if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await { if loaded_drb_input.difficulty_level != drb_input.difficulty_level { diff --git a/sequencer/src/persistence/no_storage.rs b/sequencer/src/persistence/no_storage.rs index 0dc43816ba7..68f1eaaaca4 100644 --- a/sequencer/src/persistence/no_storage.rs +++ b/sequencer/src/persistence/no_storage.rs @@ -242,6 +242,10 @@ impl SequencerPersistence for NoStorage { Ok(()) } + async fn backfill_reward_state(&self) -> anyhow::Result<()> { + Ok(()) + } + async fn store_drb_result( &self, _epoch: EpochNumber, diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index f607600dc80..0cf82ad5544 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -2037,6 +2037,133 @@ impl SequencerPersistence for Persistence { Ok(()) } + async fn backfill_reward_state(&self) -> anyhow::Result<()> { + let batch_size: i64 = 10000; + let mut tx = self.db.read().await?; + + // Check migration progress - migrated_rows tracks the number of accounts processed + let (is_completed, mut offset) = query_as::<(bool, i64)>( + "SELECT completed, migrated_rows FROM epoch_migration WHERE table_name = \ + 'reward_state'", + ) + .fetch_one(tx.as_mut()) + .await?; + + if is_completed { + tracing::info!("reward state backfill already done"); + return Ok(()); + } + + drop(tx); + + tracing::warn!("backfilling reward state from reward_merkle_tree_v2..."); + + loop { + let mut tx = self.db.read().await?; + + // Get latest balance for each account using DISTINCT ON (Postgres) or ROW_NUMBER (SQLite) + // Process in batches by account + #[cfg(not(feature = "embedded-db"))] + let rows = query( + "SELECT DISTINCT ON (idx) created, idx, entry FROM reward_merkle_tree_v2 + WHERE idx IS NOT NULL AND entry IS NOT NULL + ORDER BY idx, created DESC + LIMIT $1 OFFSET $2", + ) + .bind(batch_size) + .bind(offset) + .fetch_all(tx.as_mut()) + .await?; + + #[cfg(feature = "embedded-db")] + let rows = query( + "SELECT created, idx, entry FROM ( + SELECT created, idx, entry, + ROW_NUMBER() OVER (PARTITION BY idx ORDER BY created DESC) as rn + FROM reward_merkle_tree_v2 + WHERE idx IS NOT NULL AND entry IS NOT NULL + ) sub + WHERE rn = 1 + LIMIT $1 OFFSET $2", + ) + .bind(batch_size) + .bind(offset) + .fetch_all(tx.as_mut()) + .await?; + + drop(tx); + + if rows.is_empty() { + break; + } + + let mut values: Vec<(i64, serde_json::Value, serde_json::Value)> = Vec::new(); + + for row in rows.iter() { + let height: i64 = row.try_get("created")?; + let account: serde_json::Value = row.try_get("idx")?; + let balance: serde_json::Value = row.try_get("entry")?; + + values.push((height, account, balance)); + } + + let rows_count = values.len(); + + let mut query_builder: sqlx::QueryBuilder = + sqlx::QueryBuilder::new("INSERT INTO reward_state (height, account, balance) "); + + query_builder.push_values(&values, |mut b, (height, account, balance)| { + b.push_bind(height).push_bind(account).push_bind(balance); + }); + + // For latest balances, we update if there's a newer entry + query_builder + .push(" ON CONFLICT (height, account) DO UPDATE SET balance = EXCLUDED.balance"); + + let query = query_builder.build(); + + let mut tx = self.db.write().await?; + query.execute(tx.as_mut()).await?; + + offset += rows_count as i64; + + tx.upsert( + "epoch_migration", + ["table_name", "completed", "migrated_rows"], + ["table_name"], + [("reward_state".to_string(), false, offset)], + ) + .await?; + tx.commit().await?; + + tracing::info!( + "reward state backfill progress: rows={} offset={}", + rows_count, + offset + ); + + if rows_count < batch_size as usize { + break; + } + } + + tracing::warn!("reward state backfill completed"); + + let mut tx = self.db.write().await?; + tx.upsert( + "epoch_migration", + ["table_name", "completed", "migrated_rows"], + ["table_name"], + [("reward_state".to_string(), true, offset)], + ) + .await?; + tx.commit().await?; + + tracing::info!("updated epoch_migration table for reward_state"); + + Ok(()) + } + async fn store_next_epoch_quorum_certificate( &self, high_qc: NextEpochQuorumCertificate2, diff --git a/sequencer/src/request_response/catchup/state.rs b/sequencer/src/request_response/catchup/state.rs index 24509d58d7d..0ae547f3024 100644 --- a/sequencer/src/request_response/catchup/state.rs +++ b/sequencer/src/request_response/catchup/state.rs @@ -4,7 +4,9 @@ use async_trait::async_trait; use committable::{Commitment, Committable}; use espresso_types::{ traits::{SequencerPersistence, StateCatchup}, - v0_3::{ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1}, + v0_3::{ + ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardAmount, RewardMerkleCommitmentV1, + }, v0_4::{RewardAccountProofV2, RewardAccountV2, RewardMerkleCommitmentV2}, BackoffParams, BlockMerkleTree, EpochVersion, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Header, Leaf2, NodeState, PubKey, SeqTypes, SequencerVersions, @@ -173,6 +175,17 @@ impl< .with_context(|| "timed out while fetching reward accounts")? } + async fn try_fetch_all_reward_accounts( + &self, + _retry: usize, + _height: u64, + _offset: u64, + _limit: u64, + ) -> anyhow::Result> { + // Not supported via request-response protocol - use HTTP catchup + anyhow::bail!("fetching all reward accounts not supported via request-response protocol") + } + async fn try_fetch_state_cert( &self, _retry: usize, diff --git a/sequencer/src/state.rs b/sequencer/src/state.rs index 0d929ccdee1..f4376f49668 100644 --- a/sequencer/src/state.rs +++ b/sequencer/src/state.rs @@ -5,7 +5,7 @@ use anyhow::{bail, ensure, Context}; use either::Either; use espresso_types::{ traits::StateCatchup, - v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1}, + v0_3::{ChainConfig, RewardAccountV1, RewardAmount, RewardMerkleTreeV1}, v0_4::{Delta, RewardAccountV2, RewardMerkleTreeV2}, BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2, ValidatedState, @@ -215,37 +215,59 @@ async fn store_state_update( .await .context("failed to store reward merkle nodes")?; } else { - // Collect reward merkle tree v2 proofs for batch insertion - let reward_proofs: Vec<_> = rewards_delta - .iter() - .map(|delta| { - let proof = match reward_merkle_tree_v2.universal_lookup(*delta) { - LookupResult::Ok(_, proof) => proof, - LookupResult::NotFound(proof) => proof, - LookupResult::NotInMemory => { - bail!("missing merkle path for reward account {delta}") - }, - }; - let path = EpochVersion, store V2 proofs only up to DrbAndHeaderUpgradeVersion + if version <= DrbAndHeaderUpgradeVersion::version() { + let reward_proofs: Vec<_> = rewards_delta + .iter() + .map(|delta| { + let proof = match reward_merkle_tree_v2.universal_lookup(*delta) { + LookupResult::Ok(_, proof) => proof, + LookupResult::NotFound(proof) => proof, + LookupResult::NotInMemory => { + bail!("missing merkle path for reward account {delta}") + }, + }; + let path = >::to_traversal_path( delta, reward_merkle_tree_v2.height() ); - Ok((proof, path)) - }) - .collect::>>()?; + Ok((proof, path)) + }) + .collect::>>()?; - tracing::debug!( - count = reward_proofs.len(), - "inserting v2 reward accounts in batch" - ); - UpdateStateData::::insert_merkle_nodes_batch( - tx, - reward_proofs, - block_number, - ) - .await - .context("failed to store reward merkle nodes")?; + tracing::debug!( + count = reward_proofs.len(), + "inserting v2 reward accounts in batch" + ); + UpdateStateData::::insert_merkle_nodes_batch( + tx, + reward_proofs, + block_number, + ) + .await + .context("failed to store reward merkle nodes")?; + } + + if !rewards_delta.is_empty() { + let account_balances: Vec<(RewardAccountV2, RewardAmount)> = rewards_delta + .iter() + .filter_map(|account| match reward_merkle_tree_v2.lookup(*account) { + LookupResult::Ok(balance, _) => Some((*account, *balance)), + _ => None, + }) + .collect(); + + if !account_balances.is_empty() { + tracing::debug!( + count = account_balances.len(), + "storing reward state to reward_state table" + ); + tx.store_reward_state(block_number, account_balances) + .await + .context("failed to store reward state")?; + } + } } tracing::debug!(block_number, "updating state height"); @@ -476,6 +498,16 @@ impl SequencerStateDataSource for T where { } +#[async_trait::async_trait] +pub(crate) trait RewardStatePersistence { + /// Store reward account balances at a specific height. + async fn store_reward_state( + &mut self, + height: u64, + accounts: Vec<(RewardAccountV2, RewardAmount)>, + ) -> anyhow::Result<()>; +} + pub(crate) trait SequencerStateUpdate: Transaction + UpdateStateData @@ -483,6 +515,7 @@ pub(crate) trait SequencerStateUpdate: + UpdateStateData + UpdateStateData + ChainConfigPersistence + + RewardStatePersistence { } @@ -493,5 +526,6 @@ impl SequencerStateUpdate for T where + UpdateStateData + UpdateStateData + ChainConfigPersistence + + RewardStatePersistence { } diff --git a/types/src/v0/impls/header.rs b/types/src/v0/impls/header.rs index 39d56f5ec93..edadb78a2f4 100644 --- a/types/src/v0/impls/header.rs +++ b/types/src/v0/impls/header.rs @@ -865,40 +865,76 @@ impl Header { // At epoch boundary: apply prev epoch rewards tracing::info!(%height, %epoch, %prev_epoch, "epoch boundary: applying rewards"); - let (epoch_rewards_applied, changed_accounts) = - if let Some(result) = reward_calculator.get_result(prev_epoch).await { + let (epoch_rewards_applied, changed_accounts) = if let Some(result) = + reward_calculator.get_result(prev_epoch).await + { + tracing::info!( + %epoch, + prev_epoch = %result.epoch, + total = %result.total_distributed.0, + "applying epoch rewards" + ); + validated_state.reward_merkle_tree_v2 = result.reward_tree.clone(); + (result.total_distributed, result.changed_accounts) + } else if prev_epoch <= first_epoch + 1 { + (RewardAmount::default(), HashSet::new()) + } else { + // Missing prev_epoch calculation - need to compute it now + let prev_epoch_last_block = *prev_epoch * epoch_height; + let prev_epoch_header = instance_state + .state_catchup + .as_ref() + .fetch_header(prev_epoch_last_block) + .await + .with_context(|| { + format!( + "failed to fetch header at height {prev_epoch_last_block} for prev_epoch \ + {prev_epoch}" + ) + })?; + + if prev_epoch_header.version() >= EpochRewardVersion::version() { + // V6+ epoch needs rewards - spawn and wait for calculation + tracing::warn!( + %epoch, + %prev_epoch, + "missing V6 epoch rewards at boundary, spawning calculation now" + ); + + if !reward_calculator.is_calculating(prev_epoch) { + reward_calculator.spawn_background_task( + prev_epoch, + epoch_height, + validated_state.reward_merkle_tree_v2.clone(), + instance_state.clone(), + coordinator.clone(), + None, // Will fetch header inside task + ); + } + + // Wait for the calculation to complete + let result = reward_calculator + .get_result(prev_epoch) + .await + .context(format!( + "failed to calculate missing rewards for epoch {prev_epoch}" + ))?; + tracing::info!( %epoch, - prev_epoch = %result.epoch, + %prev_epoch, total = %result.total_distributed.0, - "applying epoch rewards" + "applied delayed epoch rewards" ); + validated_state.reward_merkle_tree_v2 = result.reward_tree.clone(); (result.total_distributed, result.changed_accounts) - } else if prev_epoch <= first_epoch + 1 { - (RewardAmount::default(), HashSet::new()) } else { - let prev_epoch_last_block = *prev_epoch * epoch_height; - let prev_epoch_header = instance_state - .state_catchup - .as_ref() - .fetch_header(prev_epoch_last_block) - .await - .with_context(|| { - format!( - "failed to fetch header at height {prev_epoch_last_block} for \ - prev_epoch {prev_epoch}" - ) - })?; - - if prev_epoch_header.version() >= EpochRewardVersion::version() { - anyhow::bail!( - "rewards missing for V6 epoch {prev_epoch} at epoch {epoch} boundary" - ); - } - tracing::info!(%epoch, %prev_epoch, "no rewards for V5 epoch"); + // Pre-V6 epoch has no rewards + tracing::info!(%epoch, %prev_epoch, "no rewards for pre-V6 epoch"); (RewardAmount::default(), HashSet::new()) - }; + } + }; // Start calculation for current epoch reward_calculator.spawn_background_task( @@ -910,10 +946,11 @@ impl Header { Some(*leader_counts), ); - // // keep last 3 epochs - // if *epoch > *first_epoch + 3 { - // reward_calculator.results.retain(|&e, _| e >= epoch); - // } + // Keep last 3 epochs in cache + if *epoch > *first_epoch + 3 { + let cutoff_epoch = EpochNumber::new(*epoch - 3); + reward_calculator.results.retain(|&e, _| e > cutoff_epoch); + } Ok((epoch_rewards_applied, changed_accounts)) } diff --git a/types/src/v0/impls/instance_state.rs b/types/src/v0/impls/instance_state.rs index 8db29d40fbc..237ba23ea12 100644 --- a/types/src/v0/impls/instance_state.rs +++ b/types/src/v0/impls/instance_state.rs @@ -177,7 +177,7 @@ impl NodeState { coordinator, genesis_version, epoch_start_block: 0, - epoch_rewards_calculator: Arc::new(Mutex::new(EpochRewardsCalculator::new())), + epoch_rewards_calculator: Arc::new(Mutex::new(EpochRewardsCalculator::default())), } } @@ -594,6 +594,16 @@ pub mod mock { anyhow::bail!("unimplemented") } + async fn try_fetch_all_reward_accounts( + &self, + _retry: usize, + _height: u64, + _offset: u64, + _limit: u64, + ) -> anyhow::Result> { + anyhow::bail!("unimplemented") + } + fn backoff(&self) -> &BackoffParams { &self.backoff } diff --git a/types/src/v0/impls/reward.rs b/types/src/v0/impls/reward.rs index ece5a6820e0..962caa2148e 100644 --- a/types/src/v0/impls/reward.rs +++ b/types/src/v0/impls/reward.rs @@ -1039,7 +1039,10 @@ pub struct EpochRewardsCalculator { impl EpochRewardsCalculator { pub fn new() -> Self { - Self::default() + Self { + results: HashMap::new(), + pending: HashMap::new(), + } } /// Check if we have a cached result for epoch. @@ -1078,7 +1081,7 @@ impl EpochRewardsCalculator { None } - /// Start a background task that fetches data and calculates epoch rewards. + /// Start a background task that calculates epoch rewards. /// Does nothing if calculation is already done or in progress for this epoch. /// pub fn spawn_background_task( @@ -1099,7 +1102,11 @@ impl EpochRewardsCalculator { return; } - tracing::info!(%epoch, has_leader_counts = leader_counts.is_some(), "starting background epoch rewards task"); + tracing::info!( + %epoch, + has_leader_counts = leader_counts.is_some(), + "starting background epoch rewards task" + ); let handle = tokio::spawn(async move { Self::fetch_and_calculate( @@ -1124,17 +1131,15 @@ impl EpochRewardsCalculator { leader_counts: Option, ) -> anyhow::Result { let epoch_last_block_height = (*epoch) * epoch_height; - // For fetching missing accounts, we need the tree state from epoch - 1 - let catchup_height = (*epoch - 1) * epoch_height; tracing::info!( %epoch, epoch_last_block_height, - catchup_height, has_leader_counts = leader_counts.is_some(), "fetch_and_calculate: starting" ); + // Get leader_counts for this epoch if not provided let leader_counts = if let Some(lc) = leader_counts { lc } else { @@ -1169,6 +1174,7 @@ impl EpochRewardsCalculator { .expect("V6+ header must have leader_counts") }; + // Ensure stake table is available for this epoch if let Err(err) = coordinator.stake_table_for_epoch(Some(epoch)).await { tracing::info!(%epoch, "stake table missing for epoch, triggering catchup: {err:#}"); coordinator @@ -1199,6 +1205,7 @@ impl EpochRewardsCalculator { "fetch_and_calculate: got validators and block_reward" ); + // Check if we're missing accounts that need to be in the tree let accounts_to_update: Vec<_> = leader_counts .iter() .enumerate() @@ -1217,66 +1224,73 @@ impl EpochRewardsCalculator { .cloned() .collect(); + // If we have missing accounts, fetch all reward accounts from peers and rebuild the tree if !missing_accounts.is_empty() { - let reward_merkle_tree_root = reward_tree.commitment(); - tracing::info!( %epoch, num_missing = missing_accounts.len(), - %reward_merkle_tree_root, + "missing accounts detected, fetching all reward accounts from peers" + ); + + // Fetch all reward accounts at the height just before this epoch + // This fetches from the reward_state table which stores all account balances + let catchup_height = epoch_last_block_height.saturating_sub(epoch_height); + + tracing::info!( + %epoch, catchup_height, - "fetch_and_calculate: fetching missing reward accounts" + "fetching all reward accounts from peers to rebuild tree" ); - const MAX_RETRIES: u32 = 10; - for attempt in 1..=MAX_RETRIES { - match instance_state + // Fetch all reward accounts from peers (paginated) + let mut all_accounts = Vec::new(); + let mut offset = 0u64; + let limit = 10_000u64; + + loop { + let accounts = instance_state .state_catchup - .fetch_reward_accounts_v2( - &instance_state, - catchup_height, - ViewNumber::new(catchup_height), - reward_merkle_tree_root, - missing_accounts.clone(), - ) + .as_ref() + .fetch_all_reward_accounts(catchup_height, offset, limit) .await - { - Ok(proofs) => { - for proof in proofs { - proof - .remember(&mut reward_tree) - .expect("proof previously verified"); - } - tracing::info!( - %epoch, - "fetch_and_calculate: remembered missing accounts" - ); - break; - }, - Err(e) => { - if attempt == MAX_RETRIES { - anyhow::bail!( - "failed to fetch missing reward accounts after {MAX_RETRIES} \ - retries: {e}" - ); - } - tracing::warn!( - %epoch, - attempt, - MAX_RETRIES, - error = %e, - "fetch_and_calculate: failed to fetch accounts, retrying" - ); - tokio::time::sleep(std::time::Duration::from_secs(3)).await; - }, + .with_context(|| { + format!( + "failed to fetch reward accounts at height {catchup_height}, offset \ + {offset}" + ) + })?; + + let count = accounts.len(); + all_accounts.extend(accounts); + + if (count as u64) < limit { + break; } + offset += limit; } + + tracing::info!( + %epoch, + num_accounts = all_accounts.len(), + "fetched all reward accounts, rebuilding tree" + ); + + // Rebuild the tree from scratch with all the accounts + let kv_pairs: Vec<(RewardAccountV2, RewardAmount)> = all_accounts; + reward_tree = RewardMerkleTreeV2::from_kv_set(REWARD_MERKLE_TREE_V2_HEIGHT, kv_pairs) + .context("failed to rebuild reward merkle tree from accounts")?; + + tracing::info!( + %epoch, + reward_tree_commitment = %reward_tree.commitment(), + "reward tree rebuilt successfully" + ); } tracing::info!( %epoch, reward_tree_commitment = %reward_tree.commitment(), - "fetch_and_calculate: starting calculation" + "starting final epoch calculation" ); Self::calculate_all_rewards(epoch, leader_counts, reward_tree, block_reward, validators) diff --git a/types/src/v0/traits.rs b/types/src/v0/traits.rs index ab84aa789b4..07c3e85a452 100644 --- a/types/src/v0/traits.rs +++ b/types/src/v0/traits.rs @@ -284,6 +284,42 @@ pub trait StateCatchup: Send + Sync { .await } + /// Fetch all reward accounts at a given height without retrying on transient errors. + /// This is used to rebuild the reward merkle tree when catching up. + async fn try_fetch_all_reward_accounts( + &self, + retry: usize, + height: u64, + offset: u64, + limit: u64, + ) -> anyhow::Result>; + + /// Fetch all reward accounts at a given height, retrying on transient errors. + /// This is used to rebuild the reward merkle tree when catching up. + async fn fetch_all_reward_accounts( + &self, + height: u64, + offset: u64, + limit: u64, + ) -> anyhow::Result> { + self.backoff() + .retry(self, |provider, retry| { + async move { + provider + .try_fetch_all_reward_accounts(retry, height, offset, limit) + .await + .map_err(|err| { + err.context(format!( + "fetching all reward accounts at height {height}, offset \ + {offset}, limit {limit}" + )) + }) + } + .boxed() + }) + .await + } + /// Fetch the state certificate for a given epoch without retrying on transient errors. async fn try_fetch_state_cert( &self, @@ -491,6 +527,29 @@ impl StateCatchup for Arc { .await } + async fn try_fetch_all_reward_accounts( + &self, + retry: usize, + height: u64, + offset: u64, + limit: u64, + ) -> anyhow::Result> { + (**self) + .try_fetch_all_reward_accounts(retry, height, offset, limit) + .await + } + + async fn fetch_all_reward_accounts( + &self, + height: u64, + offset: u64, + limit: u64, + ) -> anyhow::Result> { + (**self) + .fetch_all_reward_accounts(height, offset, limit) + .await + } + async fn try_fetch_state_cert( &self, retry: usize, @@ -934,6 +993,7 @@ pub trait SequencerPersistence: self.migrate_vid_shares().await?; self.migrate_quorum_proposals().await?; self.migrate_quorum_certificates().await?; + self.backfill_reward_state().await?; tracing::warn!("consensus storage has been migrated to new types"); @@ -946,6 +1006,11 @@ pub trait SequencerPersistence: async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>; async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>; + /// Backfill reward state from reward_merkle_tree_v2 to reward_state table. + /// This populates the reward_state table with all (height, account, balance) entries. + /// Continues adding entries like other migrations until all historical data is processed. + async fn backfill_reward_state(&self) -> anyhow::Result<()>; + async fn load_anchor_view(&self) -> anyhow::Result { match self.load_anchor_leaf().await? { Some((leaf, _)) => Ok(leaf.view_number()),