Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,14 @@ debug = "line-tables-only"
inherits = "dev"
strip = "none"
debug = "full"
opt-level = 2

# Don't optimize the binaries in the workspace. Compilation of dependencies can be cached
# effectively on the CI because they rarely change, whereas the workspace code usually changes and
# workspace compilation artifacts are therefore not cached. However, some of our tests don't work if
# we don't have any optimizations in the workspace.
[profile.test]
opt-level = 1
opt-level = 2

[profile.test.package.sequencer]
opt-level = 0
Expand Down
10 changes: 10 additions & 0 deletions sequencer/api/migrations/postgres/V1103__reward_state.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Table to store reward account balances at each height

CREATE TABLE reward_state (
height BIGINT NOT NULL,
account JSONB NOT NULL,
balance JSONB NOT NULL,
PRIMARY KEY (height, account)
);

CREATE INDEX reward_state_height_idx ON reward_state (height);
10 changes: 10 additions & 0 deletions sequencer/api/migrations/sqlite/V903__reward_state.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Table to store reward account balances at each height

CREATE TABLE reward_state (
height BIGINT NOT NULL,
account JSONB NOT NULL,
balance JSONB NOT NULL,
PRIMARY KEY (height, account)
);

CREATE INDEX reward_state_height_idx ON reward_state (height);
102 changes: 73 additions & 29 deletions sequencer/src/api/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use espresso_types::{
RewardAccountProofV2, RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2,
REWARD_MERKLE_TREE_V2_HEIGHT,
},
BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Leaf2,
NodeState, ValidatedState,
BlockMerkleTree, DrbAndHeaderUpgradeVersion, EpochVersion, FeeAccount, FeeMerkleTree, Header,
Leaf2, NodeState, ValidatedState,
};
use hotshot::traits::ValidatedState as _;
use hotshot_query_service::{
Expand All @@ -29,7 +29,7 @@ use hotshot_query_service::{
},
VersionedDataSource,
},
merklized_state::{MerklizedState, Snapshot},
merklized_state::Snapshot,
Resolvable,
};
use hotshot_types::{
Expand Down Expand Up @@ -310,46 +310,39 @@ impl CatchupStorage for SqlStorage {
return Ok(Vec::new());
}

// get the latest balance for each account.
// use DISTINCT ON for Postgres
// use ROW_NUMBER() as DISTINCT ON is not supported for SQLite
// Query reward_state table for the latest balance for each account up to the given height
// Use DISTINCT ON for Postgres, ROW_NUMBER for SQLite
#[cfg(not(feature = "embedded-db"))]
let query = format!(
"SELECT DISTINCT ON (idx) idx, entry
FROM {}
WHERE idx IS NOT NULL AND created <= $1
ORDER BY idx DESC, created DESC
LIMIT $2 OFFSET $3",
RewardMerkleTreeV2::state_type()
);
let query = "SELECT DISTINCT ON (account) account, balance
FROM reward_state
WHERE height <= $1
ORDER BY account, height DESC
LIMIT $2 OFFSET $3";

#[cfg(feature = "embedded-db")]
let query = format!(
"SELECT idx, entry FROM (
SELECT idx, entry, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY created DESC) as \
rn
FROM {}
WHERE created <= $1 AND idx IS NOT NULL
let query = "SELECT account, balance FROM (
SELECT account, balance, ROW_NUMBER() OVER (PARTITION BY account ORDER BY height \
DESC) as rn
FROM reward_state
WHERE height <= $1
) sub
WHERE rn = 1
ORDER BY idx DESC
LIMIT $2 OFFSET $3",
RewardMerkleTreeV2::state_type()
);
ORDER BY account
LIMIT $2 OFFSET $3";

let rows = query_as::<(Value, Value)>(&query)
let rows = query_as::<(Value, Value)>(query)
.bind(height as i64)
.bind(limit as i64)
.bind(offset as i64)
.fetch_all(tx.as_mut())
.await
.context("loading reward accounts from storage")?;
.context("loading reward accounts from reward_state table")?;

let mut accounts = Vec::new();
for (idx, entry) in rows {
for (account_json, balance_json) in rows {
let account: RewardAccountV2 =
serde_json::from_value(idx).context("deserializing reward account")?;
let balance: RewardAmount = serde_json::from_value(entry).context(format!(
serde_json::from_value(account_json).context("deserializing reward account")?;
let balance: RewardAmount = serde_json::from_value(balance_json).context(format!(
"deserializing reward balance for account {account}"
))?;

Expand Down Expand Up @@ -496,6 +489,16 @@ impl CatchupStorage for SqlStorage {

Ok(chain)
}

async fn get_header(&self, height: u64) -> anyhow::Result<Header> {
let mut tx = self
.read()
.await
.context(format!("opening transaction to fetch header at {height}"))?;
tx.get_header(BlockId::<SeqTypes>::from(height as usize))
.await
.context(format!("header {height} not available"))
}
}

impl RewardAccountProofDataSource for DataSource {
Expand Down Expand Up @@ -586,6 +589,13 @@ impl CatchupStorage for DataSource {
.get_all_reward_accounts(height, offset, limit)
.await
}

async fn get_header(&self, height: u64) -> anyhow::Result<Header> {
self.as_ref()
.get_header(BlockId::<SeqTypes>::from(height as usize))
.await
.context(format!("header {height} not available"))
}
}

#[async_trait]
Expand All @@ -603,6 +613,40 @@ impl ChainConfigPersistence for Transaction<Write> {
}
}

#[async_trait]
impl crate::state::RewardStatePersistence for Transaction<Write> {
async fn store_reward_state(
&mut self,
height: u64,
accounts: Vec<(RewardAccountV2, RewardAmount)>,
) -> anyhow::Result<()> {
if accounts.is_empty() {
return Ok(());
}

// Build batch of (height, account, balance) tuples for upsert
let rows: Vec<_> = accounts
.iter()
.map(|(account, balance)| {
let account_json =
serde_json::to_value(account).expect("failed to serialize account");
let balance_json =
serde_json::to_value(balance).expect("failed to serialize balance");
(height as i64, account_json, balance_json)
})
.collect();

self.upsert(
"reward_state",
["height", "account", "balance"],
["height", "account"],
rows,
)
.await
.context("failed to upsert reward state")
}
}

async fn load_frontier<Mode: TransactionMode>(
tx: &mut Transaction<Mode>,
height: u64,
Expand Down
74 changes: 74 additions & 0 deletions sequencer/src/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,25 @@ impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
.await
}

#[tracing::instrument(skip(self))]
async fn try_fetch_all_reward_accounts(
&self,
retry: usize,
height: u64,
offset: u64,
limit: u64,
) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
self.fetch(retry, |client| async move {
client
.get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
"catchup/{height}/reward-amounts/{limit}/{offset}"
))
.send()
.await
})
.await
}

async fn try_fetch_state_cert(
&self,
retry: usize,
Expand Down Expand Up @@ -892,6 +911,21 @@ where
Ok(proofs)
}

async fn try_fetch_all_reward_accounts(
&self,
_retry: usize,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a typo in the type declaration for height. It should be u64 instead of u664.

Suggested change
_retry: usize,
height: u64,

height: u64,
offset: u64,
limit: u64,
) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
self.db
.get_all_reward_accounts(height, offset, limit)
.await
.with_context(|| {
format!("failed to get all reward accounts at height {height} from DB")
})
}

async fn try_fetch_state_cert(
&self,
_retry: usize,
Expand Down Expand Up @@ -1018,6 +1052,16 @@ impl StateCatchup for NullStateCatchup {
bail!("state catchup is disabled");
}

async fn try_fetch_all_reward_accounts(
&self,
_retry: usize,
_height: u64,
_offset: u64,
_limit: u64,
) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
bail!("state catchup is disabled");
}

async fn try_fetch_state_cert(
&self,
_retry: usize,
Expand Down Expand Up @@ -1447,6 +1491,36 @@ impl StateCatchup for ParallelStateCatchup {
.await
}

async fn try_fetch_all_reward_accounts(
&self,
retry: usize,
height: u64,
offset: u64,
limit: u64,
) -> anyhow::Result<Vec<(RewardAccountV2, RewardAmount)>> {
// Try to get the accounts on local providers first
let local_result = self
.on_local_providers(move |provider| async move {
provider
.try_fetch_all_reward_accounts(retry, height, offset, limit)
.await
})
.await;

// Check if we were successful locally
if local_result.is_ok() {
return local_result;
}

// If that fails, try the remote ones
self.on_remote_providers(move |provider| async move {
provider
.try_fetch_all_reward_accounts(retry, height, offset, limit)
.await
})
.await
}

async fn try_fetch_state_cert(
&self,
retry: usize,
Expand Down
18 changes: 11 additions & 7 deletions sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,8 @@ where
&persistence.clone(),
);

let epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new()));

let instance_state = NodeState {
chain_config: genesis.chain_config,
genesis_chain_config,
Expand All @@ -635,7 +637,7 @@ where
coordinator: coordinator.clone(),
genesis_version: genesis.genesis_version,
epoch_start_block: genesis.epoch_start_block.unwrap_or_default(),
epoch_rewards_calculator: Arc::new(Mutex::new(EpochRewardsCalculator::new())),
epoch_rewards_calculator,
};

// Initialize the Libp2p network
Expand Down Expand Up @@ -1386,12 +1388,14 @@ pub mod testing {
V::Base::VERSION,
coordinator.clone(),
V::Base::VERSION,
)
.with_current_version(V::Base::version())
.with_genesis(state)
.with_epoch_height(config.epoch_height)
.with_upgrades(upgrades)
.with_epoch_start_block(config.epoch_start_block);
);

let node_state = node_state
.with_current_version(V::Base::version())
.with_genesis(state)
.with_epoch_height(config.epoch_height)
.with_upgrades(upgrades)
.with_epoch_start_block(config.epoch_start_block);

tracing::info!(
i,
Expand Down
4 changes: 4 additions & 0 deletions sequencer/src/persistence/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,10 @@ impl SequencerPersistence for Persistence {
Ok(())
}

async fn backfill_reward_state(&self) -> anyhow::Result<()> {
Ok(())
}

async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
Expand Down
4 changes: 4 additions & 0 deletions sequencer/src/persistence/no_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl SequencerPersistence for NoStorage {
Ok(())
}

async fn backfill_reward_state(&self) -> anyhow::Result<()> {
Ok(())
}

async fn store_drb_result(
&self,
_epoch: EpochNumber,
Expand Down
Loading
Loading