Skip to content

Commit 267c98b

Browse files
authored
cuprated: add pop blocks command (#545)
* update monero oxide * fix doc * update book * add `pop_blocks` command + fix bug * fix rx VM cache on deep pop blocks * fix for last commit * add docs * fmt * fix bug + add proptest * fix imports
1 parent a602f74 commit 267c98b

File tree

10 files changed

+219
-20
lines changed

10 files changed

+219
-20
lines changed

binaries/cuprated/src/blockchain/interface.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,29 @@ pub async fn handle_incoming_block(
170170
.map_err(IncomingBlockError::InvalidBlock)
171171
}
172172

173+
/// Pop blocks from the top of the blockchain.
174+
///
175+
/// # Errors
176+
///
177+
/// Will error if the blockchain manager is not set up yet.
178+
pub async fn pop_blocks(numb_blocks: usize) -> Result<(), anyhow::Error> {
179+
let Some(incoming_block_tx) = COMMAND_TX.get() else {
180+
// We could still be starting up the blockchain manager.
181+
return anyhow::bail!("The blockchain manager is not running yet");
182+
};
183+
184+
let (response_tx, response_rx) = oneshot::channel();
185+
186+
incoming_block_tx
187+
.send(BlockchainManagerCommand::PopBlocks {
188+
numb_blocks,
189+
response_tx,
190+
})
191+
.await?;
192+
193+
Ok(response_rx.await?)
194+
}
195+
173196
/// Check if we have a block with the given hash.
174197
async fn block_exists(
175198
block_hash: [u8; 32],

binaries/cuprated/src/blockchain/manager/commands.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use tokio::sync::oneshot;
77
use cuprate_types::TransactionVerificationData;
88

99
/// The blockchain manager commands.
10+
#[expect(clippy::large_enum_variant)]
1011
pub enum BlockchainManagerCommand {
1112
/// Attempt to add a new block to the blockchain.
1213
AddBlock {
@@ -17,6 +18,12 @@ pub enum BlockchainManagerCommand {
1718
/// The channel to send the response down.
1819
response_tx: oneshot::Sender<Result<IncomingBlockOk, anyhow::Error>>,
1920
},
21+
/// Pop blocks from the top of the blockchain.
22+
PopBlocks {
23+
numb_blocks: usize,
24+
/// The channel to send the response down.
25+
response_tx: oneshot::Sender<()>,
26+
},
2027
}
2128

2229
/// The [`Ok`] response for an incoming block.

binaries/cuprated/src/blockchain/manager/handler.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,22 @@ impl super::BlockchainManager {
5555

5656
drop(response_tx.send(res));
5757
}
58+
BlockchainManagerCommand::PopBlocks {
59+
numb_blocks,
60+
response_tx,
61+
} => {
62+
let _guard = REORG_LOCK.write().await;
63+
self.pop_blocks(numb_blocks).await;
64+
self.blockchain_write_handle
65+
.ready()
66+
.await
67+
.expect(PANIC_CRITICAL_SERVICE_ERROR)
68+
.call(BlockchainWriteRequest::FlushAltBlocks)
69+
.await
70+
.expect(PANIC_CRITICAL_SERVICE_ERROR);
71+
#[expect(clippy::let_underscore_must_use)]
72+
let _ = response_tx.send(());
73+
}
5874
}
5975
}
6076

binaries/cuprated/src/commands.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ pub enum Command {
5050

5151
/// Print the height of first block not contained in the fast sync hashes.
5252
FastSyncStopHeight,
53+
54+
/// Pop blocks from the top of the blockchain.
55+
PopBlocks { numb_blocks: usize },
5356
}
5457

5558
/// The log output target.
@@ -131,6 +134,15 @@ pub async fn io_loop(
131134

132135
println!("{stop_height}");
133136
}
137+
Command::PopBlocks { numb_blocks } => {
138+
tracing::info!("Popping {numb_blocks} blocks.");
139+
let res = crate::blockchain::interface::pop_blocks(numb_blocks).await;
140+
141+
match res {
142+
Ok(()) => println!("Popped {numb_blocks} blocks."),
143+
Err(e) => println!("Failed to pop blocks: {e}"),
144+
}
145+
}
134146
}
135147
}
136148
}

consensus/context/src/rx_vms.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,25 @@ impl RandomXVmCache {
214214
}
215215

216216
/// Removes all the RandomX VMs above the `new_height`.
217-
pub fn pop_blocks_main_chain(&mut self, new_height: usize) {
217+
pub async fn pop_blocks_main_chain<D: Database + Clone>(
218+
&mut self,
219+
new_height: usize,
220+
database: D,
221+
) -> Result<(), ContextCacheError> {
218222
self.seeds.retain(|(height, _)| *height < new_height);
219223
self.vms.retain(|height, _| *height < new_height);
224+
225+
if self.seeds.len() < RX_SEEDS_CACHED {
226+
let seed_heights = get_last_rx_seed_heights(new_height, RX_SEEDS_CACHED)
227+
.split_at(self.seeds.len())
228+
.1
229+
.to_vec();
230+
let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?;
231+
232+
self.seeds.extend(seed_heights.into_iter().zip(seed_hashes));
233+
}
234+
235+
Ok(())
220236
}
221237

222238
/// Add a new block to the VM cache.

consensus/context/src/task.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,11 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
227227
.pop_blocks_main_chain(numb_blocks, self.database.clone())
228228
.await?;
229229
self.rx_vm_cache
230-
.pop_blocks_main_chain(self.chain_height - numb_blocks - 1);
230+
.pop_blocks_main_chain(
231+
self.chain_height - numb_blocks - 1,
232+
self.database.clone(),
233+
)
234+
.await?;
231235
self.hardfork_state
232236
.pop_blocks_main_chain(numb_blocks, self.database.clone())
233237
.await?;

consensus/context/src/weight.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -137,27 +137,33 @@ impl BlockWeightsCache {
137137

138138
let chain_height = self.tip_height + 1;
139139

140-
let new_long_term_start_height = chain_height
141-
.saturating_sub(self.config.long_term_window)
142-
.saturating_sub(numb_blocks);
140+
let new_long_term_start_height =
141+
chain_height.saturating_sub(self.config.long_term_window + numb_blocks);
143142

144143
let old_long_term_weights = get_long_term_weight_in_range(
145-
new_long_term_start_height
146-
// current_chain_height - self.long_term_weights.len() blocks are already in the cache.
147-
..(chain_height - self.long_term_weights.window_len()),
144+
new_long_term_start_height..
145+
// We don't need to handle the case where this is above the top block like with the
146+
// short term cache as we check at the top of this function and just create a new cache.
147+
(chain_height - self.long_term_weights.window_len()),
148148
database.clone(),
149149
Chain::Main,
150150
)
151151
.await?;
152152

153-
let new_short_term_start_height = chain_height
154-
.saturating_sub(self.config.short_term_window)
155-
.saturating_sub(numb_blocks);
153+
let new_short_term_start_height =
154+
chain_height.saturating_sub(self.config.short_term_window + numb_blocks);
156155

157156
let old_short_term_weights = get_blocks_weight_in_range(
158157
new_short_term_start_height
159-
// current_chain_height - self.long_term_weights.len() blocks are already in the cache.
160-
..(chain_height - self.short_term_block_weights.window_len()),
158+
..(
159+
// the smallest between ...
160+
min(
161+
// the blocks we already have in the cache.
162+
chain_height - self.short_term_block_weights.window_len(),
163+
// the new chain height.
164+
chain_height - numb_blocks,
165+
)
166+
),
161167
database,
162168
Chain::Main,
163169
)

consensus/src/tests/context/rx_vms.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,42 @@ async fn rx_vm_created_on_hf_12() {
5252
assert!(!cache.vms.is_empty());
5353
}
5454

55+
#[tokio::test]
56+
#[expect(unused_qualifications, reason = "false positive in tokio macro")]
57+
async fn rx_vm_pop_blocks() {
58+
let db = DummyDatabaseBuilder::default().finish(Some(2_000_000));
59+
60+
let cache = RandomXVmCache::init_from_chain_height(2_000_000, &HardFork::V16, db.clone())
61+
.await
62+
.unwrap();
63+
64+
let mut cloned_cache = cache.clone();
65+
66+
for i in 0..2_000 {
67+
cloned_cache.new_block(2_000_000 + i, &[0; 32]);
68+
}
69+
70+
cloned_cache
71+
.pop_blocks_main_chain(1_999_999, db.clone())
72+
.await
73+
.unwrap();
74+
75+
assert_eq!(cloned_cache.seeds, cache.seeds);
76+
77+
let mut cloned_cache = cache.clone();
78+
79+
for i in 0..5_000 {
80+
cloned_cache.new_block(2_000_000 + i, &[0; 32]);
81+
}
82+
83+
cloned_cache
84+
.pop_blocks_main_chain(1_999_999, db)
85+
.await
86+
.unwrap();
87+
88+
assert_eq!(cloned_cache.seeds, cache.seeds);
89+
}
90+
5591
proptest! {
5692
// these tests are expensive, so limit cases.
5793
#![proptest_config(ProptestConfig {

consensus/src/tests/context/weight.rs

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use cuprate_consensus_context::{
88
};
99
use cuprate_types::Chain;
1010

11+
use proptest::{collection::vec, prelude::*};
12+
1113
pub(crate) const TEST_WEIGHT_CONFIG: BlockWeightsCacheConfig =
1214
BlockWeightsCacheConfig::new(100, 5000);
1315

@@ -60,12 +62,12 @@ async fn pop_blocks_greater_than_window() -> Result<(), tower::BoxError> {
6062

6163
let old_cache = weight_cache.clone();
6264

63-
weight_cache.new_block(5000, 0, 0);
64-
weight_cache.new_block(5001, 0, 0);
65-
weight_cache.new_block(5002, 0, 0);
65+
for i in 0..4999 {
66+
weight_cache.new_block(5000 + i, 0, 0);
67+
}
6668

6769
weight_cache
68-
.pop_blocks_main_chain(3, database)
70+
.pop_blocks_main_chain(4999, database)
6971
.await
7072
.unwrap();
7173

@@ -108,6 +110,40 @@ async fn pop_blocks_less_than_window() -> Result<(), tower::BoxError> {
108110
Ok(())
109111
}
110112

113+
#[tokio::test]
114+
async fn pop_blocks_on_window() -> Result<(), tower::BoxError> {
115+
let mut db_builder = DummyDatabaseBuilder::default();
116+
for weight in 1..=4999 {
117+
let block = DummyBlockExtendedHeader::default().with_weight_into(weight, weight);
118+
db_builder.add_block(block);
119+
}
120+
121+
let database = db_builder.finish(None);
122+
123+
let mut weight_cache = BlockWeightsCache::init_from_chain_height(
124+
4999,
125+
TEST_WEIGHT_CONFIG,
126+
database.clone(),
127+
Chain::Main,
128+
)
129+
.await?;
130+
131+
let old_cache = weight_cache.clone();
132+
133+
weight_cache.new_block(4999, 0, 0);
134+
weight_cache.new_block(5000, 0, 0);
135+
weight_cache.new_block(5001, 0, 0);
136+
137+
weight_cache
138+
.pop_blocks_main_chain(3, database)
139+
.await
140+
.unwrap();
141+
142+
assert_eq!(weight_cache, old_cache);
143+
144+
Ok(())
145+
}
146+
111147
#[tokio::test]
112148
async fn weight_cache_calculates_correct_median() -> Result<(), tower::BoxError> {
113149
let mut db_builder = DummyDatabaseBuilder::default();
@@ -167,4 +203,41 @@ async fn calc_bw_ltw_2850000_3050000() {
167203
}
168204
}
169205

170-
// TODO: protests
206+
proptest! {
207+
#[test]
208+
fn pop_blocks_proptest(
209+
db_blocks in vec(any::<(usize, usize)>(), 1..10_000),
210+
new_blocks in vec(any::<(usize, usize)>(), 0..5_000),
211+
) {
212+
let mut db_builder = DummyDatabaseBuilder::default();
213+
let db_len = db_blocks.len();
214+
215+
for (weight, ltw) in db_blocks {
216+
let block = DummyBlockExtendedHeader::default().with_weight_into(weight, ltw);
217+
db_builder.add_block(block);
218+
}
219+
220+
tokio_test::block_on(async move {
221+
let db = db_builder.finish(None);
222+
let mut weight_cache = BlockWeightsCache::init_from_chain_height(
223+
db_len,
224+
TEST_WEIGHT_CONFIG,
225+
db.clone(),
226+
Chain::Main,
227+
)
228+
.await.unwrap();
229+
230+
let cloned_cache = weight_cache.clone();
231+
let new_blocks_len = new_blocks.len();
232+
233+
for (i, (weight, ltw)) in new_blocks.into_iter().enumerate() {
234+
weight_cache.new_block(db_len + i, weight, ltw);
235+
}
236+
237+
weight_cache.pop_blocks_main_chain(new_blocks_len, db).await.unwrap();
238+
239+
prop_assert_eq!(weight_cache, cloned_cache);
240+
Ok(())
241+
})?;
242+
}
243+
}

consensus/src/tests/mock_db.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,19 @@ impl Service<BlockchainReadRequest> for DummyDatabase {
173173
let mut end = range.end;
174174
let mut start = range.start;
175175

176+
let block_len = blocks.read().unwrap().len();
176177
if let Some(dummy_height) = dummy_height {
177-
let block_len = blocks.read().unwrap().len();
178-
179178
end -= dummy_height - block_len;
180179
start -= dummy_height - block_len;
181180
}
182181

182+
if block_len < end {
183+
return Err(format!(
184+
"end block not in database! end: {end} len: {block_len}"
185+
)
186+
.into());
187+
}
188+
183189
BlockchainResponse::BlockExtendedHeaderInRange(
184190
blocks
185191
.read()

0 commit comments

Comments
 (0)