Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .ethexe.example.local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,26 @@ block-time = 1
# (optional, default: false).
# no-rpc = false

# Flag to enable RocksDB snapshot download RPC API.
# (optional, default: false).
# snapshot = false

# Bearer token used by `snapshot_download`.
# Must be provided when `snapshot = true`.
# snapshot-token = "replace-with-strong-random-token"

# Snapshot stream chunk size in bytes.
# (optional, default: 1048576).
# snapshot-chunk-bytes = 1048576

# Snapshot artifact retention in seconds.
# (optional, default: 600).
# snapshot-retention-secs = 600

# Maximum concurrent snapshot downloads.
# (optional, default: 1).
# snapshot-max-concurrent = 1

##########################################################################################

### Prometheus (metrics) service parameters.
Expand Down
20 changes: 20 additions & 0 deletions .ethexe.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,26 @@
# (optional, default: false).
# no-rpc = false

# Flag to enable RocksDB snapshot download RPC API.
# (optional, default: false).
# snapshot = false

# Bearer token used by `snapshot_download`.
# Must be provided when `snapshot = true`.
# snapshot-token = "replace-with-strong-random-token"

# Snapshot stream chunk size in bytes.
# (optional, default: 1048576).
# snapshot-chunk-bytes = 1048576

# Snapshot artifact retention in seconds.
# (optional, default: 600).
# snapshot-retention-secs = 600

# Maximum concurrent snapshot downloads.
# (optional, default: 1).
# snapshot-max-concurrent = 1

##########################################################################################

### Prometheus (metrics) service parameters.
Expand Down
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ethexe/cli/src/params/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ impl Params {
.transpose()
})
.transpose()?;
let rpc = rpc.and_then(|p| p.into_config(&node));
let rpc = match rpc {
Some(params) => params.into_config(&node)?,
None => None,
};
let prometheus = prometheus.and_then(|p| p.into_config());
Ok(Config {
node,
Expand Down
148 changes: 140 additions & 8 deletions ethexe/cli/src/params/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use super::MergeParams;
use anyhow::{Result, anyhow};
use clap::Parser;
use ethexe_rpc::{DEFAULT_BLOCK_GAS_LIMIT_MULTIPLIER, RpcConfig};
use ethexe_rpc::{DEFAULT_BLOCK_GAS_LIMIT_MULTIPLIER, RpcConfig, SnapshotRpcConfig};
use ethexe_service::config::NodeConfig;
use serde::Deserialize;
use std::{
Expand Down Expand Up @@ -52,16 +53,41 @@ pub struct RpcParams {

#[arg(long)]
pub gas_limit_multiplier: Option<u64>,

/// Flag to enable snapshot download RPC API.
#[arg(long)]
#[serde(default)]
pub snapshot: bool,

/// Bearer token for snapshot download RPC authorization.
#[arg(long)]
#[serde(rename = "snapshot-token")]
pub snapshot_token: Option<String>,

/// Snapshot chunk size in bytes.
#[arg(long)]
#[serde(rename = "snapshot-chunk-bytes")]
pub snapshot_chunk_bytes: Option<usize>,

/// Snapshot retention period in seconds.
#[arg(long)]
#[serde(rename = "snapshot-retention-secs")]
pub snapshot_retention_secs: Option<u64>,

/// Max amount of concurrent snapshot downloads.
#[arg(long)]
#[serde(rename = "snapshot-max-concurrent")]
pub snapshot_max_concurrent: Option<u32>,
}

impl RpcParams {
/// Default RPC port.
pub const DEFAULT_RPC_PORT: u16 = 9944;

/// Convert self into a proper `RpcConfig` object, if RPC service is enabled.
pub fn into_config(self, node_config: &NodeConfig) -> Option<RpcConfig> {
pub fn into_config(self, node_config: &NodeConfig) -> Result<Option<RpcConfig>> {
if self.no_rpc {
return None;
return Ok(None);
}

let ipv4_addr = if self.rpc_external {
Expand Down Expand Up @@ -91,14 +117,49 @@ impl RpcParams {
.gas_limit_multiplier
.unwrap_or(DEFAULT_BLOCK_GAS_LIMIT_MULTIPLIER);

Some(RpcConfig {
let snapshot = if self.snapshot {
let auth_bearer_token = self.snapshot_token.ok_or_else(|| {
anyhow!("`snapshot-token` must be provided when `snapshot` rpc is enabled")
})?;
if auth_bearer_token.is_empty() {
return Err(anyhow!(
"`snapshot-token` must be non-empty when `snapshot` rpc is enabled"
));
}
Some(SnapshotRpcConfig {
auth_bearer_token,
chunk_size_bytes: self
.snapshot_chunk_bytes
.unwrap_or(SnapshotRpcConfig::DEFAULT_CHUNK_SIZE_BYTES)
.max(1),
retention_secs: self
.snapshot_retention_secs
.unwrap_or(SnapshotRpcConfig::DEFAULT_RETENTION_SECS),
max_concurrent_downloads: self
.snapshot_max_concurrent
.unwrap_or(SnapshotRpcConfig::DEFAULT_MAX_CONCURRENT_DOWNLOADS)
.max(1),
})
} else {
None
};

let gas_allowance = gas_limit_multiplier
.checked_mul(node_config.block_gas_limit)
.ok_or_else(|| {
anyhow!(
"rpc gas allowance overflow: gas_limit_multiplier={gas_limit_multiplier}, block_gas_limit={}",
node_config.block_gas_limit
)
})?;

Ok(Some(RpcConfig {
listen_addr,
cors,
gas_allowance: gas_limit_multiplier
.checked_mul(node_config.block_gas_limit)
.expect("RPC gas allowance overflow"),
gas_allowance,
chunk_size: node_config.chunk_processing_threads,
})
snapshot,
}))
}
}

Expand All @@ -110,6 +171,15 @@ impl MergeParams for RpcParams {
rpc_cors: self.rpc_cors.or(with.rpc_cors),
no_rpc: self.no_rpc || with.no_rpc,
gas_limit_multiplier: self.gas_limit_multiplier.or(with.gas_limit_multiplier),
snapshot: self.snapshot || with.snapshot,
snapshot_token: self.snapshot_token.or(with.snapshot_token),
snapshot_chunk_bytes: self.snapshot_chunk_bytes.or(with.snapshot_chunk_bytes),
snapshot_retention_secs: self
.snapshot_retention_secs
.or(with.snapshot_retention_secs),
snapshot_max_concurrent: self
.snapshot_max_concurrent
.or(with.snapshot_max_concurrent),
}
}
}
Expand Down Expand Up @@ -180,3 +250,65 @@ impl<'de> Deserialize<'de> for Cors {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use ethexe_service::config::ConfigPublicKey;
use tempfile::tempdir;

fn node_config(block_gas_limit: u64) -> NodeConfig {
let database_dir = tempdir().expect("temporary directory should be created");
let key_dir = tempdir().expect("temporary directory should be created");

NodeConfig {
database_path: database_dir.path().to_path_buf(),
key_path: key_dir.path().to_path_buf(),
validator: ConfigPublicKey::Disabled,
validator_session: ConfigPublicKey::Disabled,
eth_max_sync_depth: 0,
worker_threads: None,
blocking_threads: None,
chunk_processing_threads: 2,
block_gas_limit,
canonical_quarantine: 0,
dev: false,
pre_funded_accounts: 0,
fast_sync: false,
chain_deepness_threshold: 0,
}
}

#[test]
fn rejects_empty_snapshot_token() {
let params = RpcParams {
snapshot: true,
snapshot_token: Some(String::new()),
..Default::default()
};

let err = params
.into_config(&node_config(1))
.expect_err("empty snapshot token should be rejected");
assert!(
err.to_string().contains("must be non-empty"),
"unexpected error: {err:#}"
);
}

#[test]
fn rejects_gas_allowance_overflow() {
let params = RpcParams {
gas_limit_multiplier: Some(u64::MAX),
..Default::default()
};

let err = params
.into_config(&node_config(2))
.expect_err("gas allowance overflow should be rejected");
assert!(
err.to_string().contains("rpc gas allowance overflow"),
"unexpected error: {err:#}"
);
}
}
37 changes: 34 additions & 3 deletions ethexe/db/src/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{CASDatabase, KVDatabase};
use anyhow::Result;
use anyhow::{Context as _, Result};
use gprimitives::H256;
use rocksdb::{DB, DBIteratorWithThreadMode, Options};
use std::{path::PathBuf, sync::Arc};
use rocksdb::{DB, DBIteratorWithThreadMode, Options, checkpoint::Checkpoint};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

/// Database for storing states and codes in memory.
#[derive(Debug, Clone)]
Expand All @@ -36,6 +39,15 @@ impl RocksDatabase {
inner: Arc::new(db),
})
}

/// Create a physical RocksDB checkpoint at the provided destination path.
pub fn create_checkpoint(&self, path: impl AsRef<Path>) -> Result<()> {
let checkpoint =
Checkpoint::new(self.inner.as_ref()).context("failed to create rocksdb checkpoint")?;
checkpoint
.create_checkpoint(path)
.context("failed to materialize rocksdb checkpoint")
}
}

impl CASDatabase for RocksDatabase {
Expand Down Expand Up @@ -220,4 +232,23 @@ mod tests {
tests::kv_multi_thread(db);
});
}

#[test]
fn create_checkpoint_and_reopen() {
with_database(|db| {
let key = b"key";
let value = b"value".to_vec();
db.put(key, value.clone());

let checkpoint_dir =
tempfile::tempdir().expect("Failed to create temporary directory for checkpoint");
let checkpoint_path = checkpoint_dir.path().join("checkpoint");
db.create_checkpoint(&checkpoint_path)
.expect("Failed to create RocksDB checkpoint");

let checkpoint_db = RocksDatabase::open(checkpoint_path)
.expect("Failed to open RocksDB checkpoint for read");
assert_eq!(checkpoint_db.get(key), Some(value));
});
}
}
Loading