Skip to content
This repository was archived by the owner on Oct 14, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 70 additions & 5 deletions src/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ impl<B: Block> ChainRef<B> {
}
}

/// Attempts to fetch a given number of blocks in ascending
/// Attempts to fetch a given number of blocks in descending
/// mode by its hashes from the cache and if it doesn't succeed it
/// then attempts to retrieve them from the database.
pub fn query_ascending(&self, from: &Hash, size: u8) -> Option<Vec<Arc<B>>> {
pub fn query_descending(&self, from: &Hash, size: u8) -> Option<Vec<Arc<B>>> {
// Get the first block
let current_block = self.query(from);

Expand Down Expand Up @@ -267,10 +267,10 @@ impl<B: Block> ChainRef<B> {
None
}

/// Attempts to fetch a given number of blocks in descending
/// Attempts to fetch a given number of blocks in ascending
/// mode by its hashes from the cache and if it doesn't succeed it
/// then attempts to retrieve them from the database.
pub fn query_descending(&self, from: &Hash, size: u8) -> Option<Vec<Arc<B>>> {
pub fn query_ascending(&self, from: &Hash, size: u8) -> Option<Vec<Arc<B>>> {
// Get the first block
let current_block = self.query(from);

Expand Down Expand Up @@ -439,6 +439,13 @@ impl<B: Block> ChainRef<B> {
let chain = self.chain.read();
chain.canonical_tip_state.inner_ref().state_root().clone()
}

/// Removes blocks in ascending order starting from the current height
/// and the specified offset
pub fn remove_blocks(&self, height_offset: u64) {
let mut chain = self.chain.write();
chain.remove_blocks(height_offset);
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -762,6 +769,38 @@ impl<B: Block> Chain<B> {
self.db.retrieve(&block_hash.0).is_some()
}

/// Removes blocks in ascending order starting from the current height
/// and the specified offset
pub fn remove_blocks(&mut self, height_offset: u64) {
let current_height = self.height;
let mut height = current_height - height_offset - 1;

// If height is less than 1, no need to remove anything
if (height < 1) {
return;
}

let mut encoded_height = encode_be_u64!(height);

// Get block hashes in ascending order by using the height and remove the
// data for each of them
while let Some(bytes) = self.db.retrieve(&crypto::hash_slice(&encoded_height).0) {
// If block hash was found, proceed with removal
let mut hash_bytes: [u8; 32] = [0; 32];
hash_bytes.copy_from_slice(&bytes);
self.remove_block(&Hash(hash_bytes));

if (height > 0) {
height -= 1;
encoded_height = encode_be_u64!(height);
} else {
break;
}
}

self.db.flush();
}

#[inline]
fn update_max_orphan_height(&mut self, new_height: u64) {
if self.max_orphan_height.is_none() {
Expand All @@ -777,7 +816,7 @@ impl<B: Block> Chain<B> {

#[inline]
fn write_block(&mut self, block: Arc<B>) {
let block_hash = block.block_hash().unwrap();
let block_hash: Hash = block.block_hash().unwrap();
//println!("DEBUG WRITING BLOCK: {:?}", block_hash);
assert!(self.disconnected_heads_mapping.get(&block_hash).is_none());
assert!(self.disconnected_tips_mapping.get(&block_hash).is_none());
Expand Down Expand Up @@ -900,6 +939,27 @@ impl<B: Block> Chain<B> {
}
}

#[inline]
fn remove_block(&mut self, block_hash: &Hash) {
// Remove block from ledger
self.db.delete(&block_hash.0);

// Remove short mapping
let short = block_hash.to_short();
self.db.delete(&short.0);

// Remove block height
let block_height_key = Self::compute_height_key(&block_hash);
let block_height_encoded = self.db.retrieve(&block_height_key.0).unwrap();

self.db.delete(&block_height_key.0);

// Remove height mapping
self.db.delete(&crypto::hash_slice(&block_height_encoded).0);

// TODO: remove root block?
}

#[inline]
fn write_canonical_height(&mut self, height: u64) {
let encoded_height = encode_be_u64!(height);
Expand Down Expand Up @@ -9470,5 +9530,10 @@ pub mod tests {

true
}

// TODO add test when blocks & transactions generator is done
// fn it_removes_correctly_in_archival_mode() -> bool {

// }
}
}
6 changes: 6 additions & 0 deletions src/constants/src/chain_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,11 @@ pub const MAX_PIECE_SIZE: usize = 262144; // 256kb
/// The maximum allowed size of a sub-piece. `MAX_PIECE_SIZE % MAX_SUB_PIECE_SIZE` must equal to 0
pub const MAX_SUB_PIECE_SIZE: usize = 16384; // 16kb

/// Specifies the height offset below which the blocks are removed when not in archival mode
pub const PRUNE_BLOCKS_HEIGHT_OFFSET: u64 = 20;

/// Specifies the time interval in milliseconds for the prune operation when not in archival mode
pub const PRUNE_BLOCKS_INTERVAL: u64 = 5000;

static_assertions::const_assert_eq!(crate::MAX_TX_SET_SIZE % crate::MAX_PIECE_SIZE, 0);
static_assertions::const_assert_eq!(crate::MAX_PIECE_SIZE % crate::MAX_SUB_PIECE_SIZE, 0);
1 change: 0 additions & 1 deletion src/network/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,6 @@ async fn account_bytes_write<N: NetworkInterface>(
addr: &SocketAddr,
bytes_write: usize,
) {
let bytes_write = bytes_write;
let acc = {
if let Some(peer) = network.peers().get(addr) {
peer.bytes_write.clone()
Expand Down
2 changes: 1 addition & 1 deletion src/network/src/downloader/download_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub enum DownloadState {
/// The download is queued
Queued,

/// We have are currently downloading this object
/// We are currently downloading this object
Downloading,

/// The download has been downloaded
Expand Down
17 changes: 17 additions & 0 deletions src/network/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use crate::interface::NetworkInterface;
use crate::network::Network;
use constants::*;
use std::sync::atomic::Ordering;
use std::time::Duration;

Expand All @@ -32,6 +33,14 @@ pub async fn start_periodic_jobs(network: Network) {
}
}

pub async fn start_chain_prune_job(network: Network) {
loop {
debug!("Executing chain prune blocks job...");
tokio::spawn(prune_chain_blocks(network.clone()));
tokio::time::delay_for(Duration::from_millis(PRUNE_BLOCKS_INTERVAL)).await;
}
}

/// Traverses each peer and sets the amount of bytes read and wrote
/// for the last second
async fn account_bytes_read_write_for_peers(network: Network) {
Expand All @@ -54,3 +63,11 @@ async fn account_bytes_read_write_for_peers(network: Network) {
});
}
}

async fn prune_chain_blocks(network: Network) {
let chain = network.pow_chain_ref();

tokio::spawn(async move {
chain.remove_blocks(PRUNE_BLOCKS_HEIGHT_OFFSET);
});
}
46 changes: 32 additions & 14 deletions src/purple/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,20 +277,38 @@ fn main() {
}
}

tokio::join!(
// Start bootstrap process
bootstrap(
network.clone(),
accept_connections,
node_storage.clone(),
argv.max_peers,
argv.bootnodes.clone(),
argv.port,
true,
),
// Start periodic jobs
network::jobs::start_periodic_jobs(network.clone()),
);
if (argv.archival_mode) {
tokio::join!(
// Start bootstrap process
bootstrap(
network.clone(),
accept_connections,
node_storage.clone(),
argv.max_peers,
argv.bootnodes.clone(),
argv.port,
true,
),
// Start periodic jobs
network::jobs::start_periodic_jobs(network.clone()),
);
} else {
tokio::join!(
// Start bootstrap process
bootstrap(
network.clone(),
accept_connections,
node_storage.clone(),
argv.max_peers,
argv.bootnodes.clone(),
argv.port,
true,
),
// Start periodic jobs
network::jobs::start_periodic_jobs(network.clone()),
network::jobs::start_chain_prune_job(network.clone()),
);
}
});
}

Expand Down