From 308a41b60ceeede4177e99e305e026aca6bce4f7 Mon Sep 17 00:00:00 2001 From: lightsing Date: Wed, 7 Jan 2026 13:00:04 +0800 Subject: [PATCH 1/3] fix atomic --- src/db.rs | 110 +++++++++++++++++++++++++++------------------- src/prover/mod.rs | 14 +++--- 2 files changed, 70 insertions(+), 54 deletions(-) diff --git a/src/db.rs b/src/db.rs index 019b921..0a29ce1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,5 +1,6 @@ use crate::coordinator_handler::GetTaskResponse; -use rocksdb::DB; +use rocksdb::{DB, WriteBatch}; +use std::path::Path; use tracing::Level; pub struct Db { @@ -7,79 +8,98 @@ pub struct Db { } impl Db { - pub fn new(path: &str) -> eyre::Result { + pub fn new(path: impl AsRef) -> eyre::Result { let db = DB::open_default(path)?; Ok(Self { db }) } #[instrument(skip(self), level = Level::DEBUG)] - pub fn get_task(&self, public_key: String) -> (Option, Option) { - ( - self.get_coordinator_task_by_public_key(public_key.clone()), - self.get_proving_task_id_by_public_key(public_key), - ) + pub fn get_task(&self, public_key: &str) -> Option<(GetTaskResponse, String)> { + self.get_task_inner(public_key) + .inspect_err(|e| { + tracing::error!("Failed to get task for public_key {public_key}: {e:?}"); + }) + .unwrap_or_default() } - #[instrument(skip_all, fields(public_key = %public_key), level = Level::DEBUG)] - pub fn set_task( - &self, - public_key: String, - coordinator_task: &GetTaskResponse, - proving_task_id: String, - ) { - self.set_coordinator_task_by_public_key(public_key.clone(), coordinator_task); - self.set_proving_task_id_by_public_key(public_key, proving_task_id); - } + fn get_task_inner(&self, public_key: &str) -> eyre::Result> { + let coordinator_task_key = fmt_coordinator_task_key(public_key); + let proving_task_id_key = fmt_proving_task_id_key(public_key); - pub fn delete_task(&self, public_key: String) { - self.delete_coordinator_task_by_public_key(public_key.clone()); - self.delete_proving_task_id_by_public_key(public_key); - } + let mut results = self + .db + .multi_get([coordinator_task_key, proving_task_id_key]) + .into_iter(); - fn get_coordinator_task_by_public_key(&self, public_key: String) -> Option { - self.db - .get(fmt_coordinator_task_key(public_key)) - .ok()? - .as_ref() - .and_then(|v| serde_json::from_slice(v).ok()) - } + let Some(coordinator_task_bytes) = results.next().unwrap()? else { + return Ok(None); + }; + let Some(proving_task_id_bytes) = results.next().unwrap()? else { + return Ok(None); + }; + + let coordinator_task: GetTaskResponse = serde_json::from_slice(&coordinator_task_bytes)?; + let proving_task_id = String::from_utf8(proving_task_id_bytes)?; - fn get_proving_task_id_by_public_key(&self, public_key: String) -> Option { - self.db - .get(fmt_proving_task_id_key(public_key)) - .ok()? - .and_then(|v| String::from_utf8(v).ok()) + Ok(Some((coordinator_task, proving_task_id))) } - fn set_coordinator_task_by_public_key( + #[instrument(skip_all, fields(public_key = %public_key), level = Level::DEBUG)] + pub fn set_task( &self, - public_key: String, + public_key: &str, coordinator_task: &GetTaskResponse, + proving_task_id: &str, ) { - let _ = serde_json::to_vec(coordinator_task) - .map(|bytes| self.db.put(fmt_coordinator_task_key(public_key), bytes)); + if let Err(e) = self.set_task_inner(public_key, coordinator_task, proving_task_id) { + tracing::error!("Failed to set task for public_key {public_key}: {e:?}"); + } } - fn set_proving_task_id_by_public_key(&self, public_key: String, proving_task_id: String) { - let _ = self.db.put( + fn set_task_inner( + &self, + public_key: &str, + coordinator_task: &GetTaskResponse, + proving_task_id: &str, + ) -> eyre::Result<()> { + let task = serde_json::to_vec(coordinator_task)?; + + let mut write_batch = WriteBatch::default(); + // set_coordinator_task_by_public_key + write_batch.put(fmt_coordinator_task_key(public_key), task); + // set_proving_task_id_by_public_key + write_batch.put( fmt_proving_task_id_key(public_key), proving_task_id.as_bytes(), ); + self.db.write(write_batch)?; + + Ok(()) } - fn delete_coordinator_task_by_public_key(&self, public_key: String) { - let _ = self.db.delete(fmt_coordinator_task_key(public_key)); + pub fn delete_task(&self, public_key: &str) { + if let Err(e) = self.delete_task_inner(public_key) { + tracing::error!("Failed to delete task for public_key {public_key}: {e:?}"); + } } - fn delete_proving_task_id_by_public_key(&self, public_key: String) { - let _ = self.db.delete(fmt_proving_task_id_key(public_key)); + fn delete_task_inner(&self, public_key: &str) -> eyre::Result<()> { + let mut write_batch = WriteBatch::default(); + + // delete_coordinator_task_by_public_key + write_batch.delete(fmt_coordinator_task_key(public_key)); + // delete_proving_task_id_by_public_key + write_batch.delete(fmt_proving_task_id_key(public_key)); + + self.db.write(write_batch)?; + Ok(()) } } -fn fmt_coordinator_task_key(public_key: String) -> String { +pub fn fmt_coordinator_task_key(public_key: &str) -> String { format!("last_coordinator_task_{}", public_key) } -fn fmt_proving_task_id_key(public_key: String) -> String { +pub fn fmt_proving_task_id_key(public_key: &str) -> String { format!("last_proving_task_id_{}", public_key) } diff --git a/src/prover/mod.rs b/src/prover/mod.rs index 87ea54a..62c6728 100644 --- a/src/prover/mod.rs +++ b/src/prover/mod.rs @@ -152,10 +152,10 @@ where coordinator_client: &CoordinatorClient, task_spec: Option<(ProofType, &str)>, ) -> eyre::Result<()> { - if let (Some(coordinator_task), Some(mut proving_task_id)) = self + if let Some((coordinator_task, mut proving_task_id)) = self .db .as_ref() - .map(|db| db.get_task(coordinator_client.key_signer.get_public_key())) + .map(|db| db.get_task(&coordinator_client.key_signer.get_public_key())) .unwrap_or_default() { let task_id = coordinator_task.clone().task_id; @@ -291,11 +291,7 @@ where } last_status.replace(current_status); if let Some(db) = &self.db { - db.set_task( - public_key.clone(), - coordinator_task, - proving_service_task_id.clone(), - ); + db.set_task(&public_key, coordinator_task, &proving_service_task_id); } sleep(self.poll_delay()).await; } @@ -317,7 +313,7 @@ where ) .await?; if let Some(db) = &self.db { - db.delete_task(public_key.clone()); + db.delete_task(&public_key); } break; } @@ -341,7 +337,7 @@ where ) .await?; if let Some(db) = &self.db { - db.delete_task(public_key.clone()); + db.delete_task(&public_key); } break; } From 93ed45e6ed0e3ae3be5baec51a03a19364c5e82c Mon Sep 17 00:00:00 2001 From: lightsing Date: Wed, 7 Jan 2026 13:54:52 +0800 Subject: [PATCH 2/3] add tool --- Cargo.lock | 89 ++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 12 ++++++ src/bin/pover_db_tool.rs | 81 ++++++++++++++++++++++++++++++++++++ src/db.rs | 17 +++++++- src/utils.rs | 5 +++ 5 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 src/bin/pover_db_tool.rs diff --git a/Cargo.lock b/Cargo.lock index feb4996..c285048 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" +dependencies = [ + "gimli", +] + [[package]] name = "adler2" version = "2.0.1" @@ -194,6 +203,21 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backtrace" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-link", +] + [[package]] name = "base16ct" version = "0.2.0" @@ -387,6 +411,33 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +[[package]] +name = "color-eyre" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5920befb47832a6d61ee3a3a846565cfa39b331331e68a3b1d1116630f2f26d" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8b88ea9df13354b55bc7234ebcce36e6ef896aca2e42a15de9e10edce01b427" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -910,6 +961,12 @@ dependencies = [ "wasip2", ] +[[package]] +name = "gimli" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" + [[package]] name = "glob" version = "0.3.3" @@ -1557,6 +1614,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "object" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1638,6 +1704,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "owo-colors" +version = "4.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c6901729fa79e91a0913333229e9ca5dc725089d1c363b2f4b4760709dc4a52" + [[package]] name = "parity-scale-codec" version = "3.7.5" @@ -2048,6 +2120,12 @@ dependencies = [ "librocksdb-sys", ] +[[package]] +name = "rustc-demangle" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -2165,6 +2243,7 @@ dependencies = [ "async-trait", "axum", "clap", + "color-eyre", "dotenvy", "ethers-core", "eyre", @@ -2753,6 +2832,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-error" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b1581020d7a273442f5b45074a6a57d5757ad0a47dac0e9f0bd57b81936f3db" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-log" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 1fef419..297d655 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,8 +3,13 @@ name = "scroll-proving-sdk" version = "0.2.0" edition = "2024" +[[bin]] +name = "scroll-pover-db-tool" +path = "src/bin/pover_db_tool.rs" + [dependencies] eyre = "0.6" +color-eyre = "0.6" serde = { version = "1", features = ["derive"] } serde_json = "1.0" ethers-core = { git = "https://github.com/scroll-tech/ethers-rs.git", branch = "v2.0.7" } @@ -27,3 +32,10 @@ dotenvy = "0.15" rocksdb = "0.24" strum = { version = "0.27", features = ["derive"] } serde_repr = "0.1" + +[profile.release] +debug = 1 + +# ref: https://docs.rs/color-eyre/0.6.5/color_eyre/#improving-perf-on-debug-builds +[profile.dev.package.backtrace] +opt-level = 3 diff --git a/src/bin/pover_db_tool.rs b/src/bin/pover_db_tool.rs new file mode 100644 index 0000000..dda082d --- /dev/null +++ b/src/bin/pover_db_tool.rs @@ -0,0 +1,81 @@ +use clap::{Parser, Subcommand}; +use eyre::WrapErr; +use scroll_proving_sdk::db::{Db, PROVING_TASK_ID_KEY_PREFIX}; +use scroll_proving_sdk::utils::init_color_eyre_hook; +use std::collections::HashMap; +use std::path::PathBuf; + +/// Tool to interact with the prover database. +#[derive(Parser)] +struct Cli { + /// Path to the database + #[clap(long, default_value = ".work/db")] + db: PathBuf, + + #[command(subcommand)] + commands: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// List all coordinator and prover task ID pairs + List, + /// Get the corresponding task ID + Get { + #[command(subcommand)] + commands: GetCommands, + }, +} + +#[derive(Subcommand)] +enum GetCommands { + /// Get coordinator task ID by prover task ID + Coordinator { task_id: String }, + /// Get prover task ID by coordinator task ID + Prover { task_id: String }, +} + +fn main() -> eyre::Result<()> { + init_color_eyre_hook(); + + let cli = Cli::parse(); + let db = Db::new(&cli.db).context("open database")?; + + let mut c2p = HashMap::new(); + let mut p2c = HashMap::new(); + + for result in db.inner().prefix_iterator(PROVING_TASK_ID_KEY_PREFIX) { + let (proving_task_key_bytes, _) = result.context("iter entry")?; + let public_key = + std::str::from_utf8(&proving_task_key_bytes[PROVING_TASK_ID_KEY_PREFIX.len()..])?; + + let Some((coordinator_task, prover_task_id)) = db.get_task(public_key) else { + continue; + }; + + c2p.insert(coordinator_task.task_id.clone(), prover_task_id.clone()); + p2c.insert(prover_task_id, coordinator_task.task_id); + } + + match cli.commands { + Commands::List => { + for (c_task_id, p_task_id) in c2p.iter() { + println!("{}\t{}", c_task_id, p_task_id); + } + } + Commands::Get { commands } => match commands { + GetCommands::Coordinator { task_id } => { + if let Some(coordinator_task_id) = p2c.get(&task_id) { + println!("{coordinator_task_id}"); + } + } + GetCommands::Prover { task_id } => { + if let Some(prover_task_id) = c2p.get(&task_id) { + println!("{prover_task_id}"); + } + } + }, + } + + Ok(()) +} diff --git a/src/db.rs b/src/db.rs index 0a29ce1..7af821e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -94,12 +94,25 @@ impl Db { self.db.write(write_batch)?; Ok(()) } + + /// Expose the inner RocksDB instance for advanced operations. + pub fn inner(&self) -> &DB { + &self.db + } } +/// Format keys for storing and retrieving tasks in the database. +pub static COORDINATOR_TASK_KEY_PREFIX: &str = "last_coordinator_task_"; + +/// Format keys for storing and retrieving proving task IDs in the database. +pub static PROVING_TASK_ID_KEY_PREFIX: &str = "last_proving_task_id_"; + +#[inline] pub fn fmt_coordinator_task_key(public_key: &str) -> String { - format!("last_coordinator_task_{}", public_key) + format!("{COORDINATOR_TASK_KEY_PREFIX}{public_key}") } +#[inline] pub fn fmt_proving_task_id_key(public_key: &str) -> String { - format!("last_proving_task_id_{}", public_key) + format!("{PROVING_TASK_ID_KEY_PREFIX}{public_key}") } diff --git a/src/utils.rs b/src/utils.rs index defd998..fecbcf5 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -9,6 +9,11 @@ pub static VERSION: &str = concat!( env!("ZK_VERSION", "`zkvm-prover` version and commit is required"), ); +/// Initialize the color_eyre error reporting hook. +pub fn init_color_eyre_hook() { + color_eyre::install().expect("Failed to initialize color_eyre"); +} + pub fn init_tracing() { tracing_subscriber::fmt() .with_env_filter( From e75bae2913fd692e6fb9d249957f9efb8f737643 Mon Sep 17 00:00:00 2001 From: lightsing Date: Wed, 7 Jan 2026 14:11:22 +0800 Subject: [PATCH 3/3] too large --- Cargo.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 297d655..040a43b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,9 +33,6 @@ rocksdb = "0.24" strum = { version = "0.27", features = ["derive"] } serde_repr = "0.1" -[profile.release] -debug = 1 - # ref: https://docs.rs/color-eyre/0.6.5/color_eyre/#improving-perf-on-debug-builds [profile.dev.package.backtrace] opt-level = 3