diff --git a/Cargo.lock b/Cargo.lock index 839069e899..c85de32d24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -908,6 +908,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "error-chain" version = "0.12.4" @@ -2318,6 +2328,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "simdutf8" version = "0.1.4" @@ -2587,9 +2607,12 @@ version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ + "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index b613c8d4df..357279f153 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ static_assertions = { version = "1.1.0" } structopt = { version = "0.3.26" } thiserror = { version = "1.0.33" } tiny-keccak = { version = "2.0.2" } -tokio = { version = "1.48.0" } +tokio = { version = "1.48.0", features = ["full"] } tower-http = { version = "0.6.8" } tracing = { version = "0.1.44" } tracing-subscriber = { version = "0.3.22" } diff --git a/crates/validator/src/config.rs b/crates/validator/src/config.rs index 6a5b15d5a7..cf1a1ec6b8 100644 --- a/crates/validator/src/config.rs +++ b/crates/validator/src/config.rs @@ -4,10 +4,15 @@ use arbutil::Bytes32; use clap::{Args, Parser, ValueEnum}; use serde::{Deserialize, Serialize}; -use std::fs::read_to_string; use std::net::SocketAddr; use std::path::PathBuf; +#[derive(Clone, Debug, ValueEnum)] +pub enum InputMode { + Native, + Continuous, +} + #[derive(Clone, Debug, Parser)] pub struct ServerConfig { /// Socket address where the server should be run. @@ -18,42 +23,23 @@ pub struct ServerConfig { #[clap(long, value_enum, default_value_t = LoggingFormat::Text)] pub logging_format: LoggingFormat, + #[clap(long, value_enum, default_value_t = InputMode::Native)] + pub mode: InputMode, + #[clap(flatten)] - module_root_config: ModuleRootConfig, + pub module_root_config: ModuleRootConfig, } #[derive(Clone, Debug, Args)] #[group(required = true, multiple = false)] -struct ModuleRootConfig { +pub struct ModuleRootConfig { /// Supported module root. #[clap(long)] - module_root: Option, + pub module_root: Option, /// Path to the file containing the module root. #[clap(long)] - module_root_path: Option, -} - -impl ServerConfig { - pub fn get_module_root(&self) -> anyhow::Result { - match ( - self.module_root_config.module_root, - &self.module_root_config.module_root_path, - ) { - (Some(root), None) => Ok(root), - (None, Some(ref path)) => { - let content = read_to_string(path)?; - let root = content - .trim() - .parse::() - .map_err(|e| anyhow::anyhow!(e))?; - Ok(root) - } - _ => Err(anyhow::anyhow!( - "Either module_root or module_root_path must be specified" - )), - } - } + pub module_root_path: Option, } #[derive(Copy, Clone, Eq, PartialEq, Debug, Default, ValueEnum, Deserialize, Serialize)] diff --git a/crates/validator/src/endpoints/mod.rs b/crates/validator/src/endpoints/mod.rs new file mode 100644 index 0000000000..81c3f42ed5 --- /dev/null +++ b/crates/validator/src/endpoints/mod.rs @@ -0,0 +1,2 @@ +pub mod spawner_endpoints; +pub mod validate; diff --git a/crates/validator/src/spawner_endpoints.rs b/crates/validator/src/endpoints/spawner_endpoints.rs similarity index 52% rename from crates/validator/src/spawner_endpoints.rs rename to crates/validator/src/endpoints/spawner_endpoints.rs index 20ecc67521..e4c6ff617b 100644 --- a/crates/validator/src/spawner_endpoints.rs +++ b/crates/validator/src/endpoints/spawner_endpoints.rs @@ -7,14 +7,15 @@ //! package. Their serialization is configured to match the Go side (by using `PascalCase` for //! field names). +use crate::endpoints::validate::{validate_contiguous, validate_native, ValidationRequest}; use crate::ServerState; -use arbutil::{Bytes32, PreimageType}; +use arbutil::Bytes32; use axum::extract::State; use axum::response::IntoResponse; use axum::Json; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::sync::Arc; +use tracing::info; pub async fn capacity() -> impl IntoResponse { "1" // TODO: Figure out max number of workers (optionally, make it configurable) @@ -35,65 +36,26 @@ pub async fn stylus_archs() -> &'static str { "host" } -pub async fn validate(Json(request): Json) -> Result, String> { - let delayed_inbox = match request.has_delayed_msg { - true => vec![jit::SequencerMessage { - number: request.delayed_msg_number, - data: request.delayed_msg, - }], - false => vec![], - }; - - let opts = jit::Opts { - validator: jit::ValidatorOpts { - binary: Default::default(), - cranelift: true, // The default for JIT binary, no need for LLVM right now - debug: false, // JIT's debug messages are using printlns, which would clutter the server logs - require_success: false, // Relevant for JIT binary only. - }, - input_mode: jit::InputMode::Native(jit::NativeInput { - old_state: request.start_state.into(), - inbox: request.batch_info.into_iter().map(Into::into).collect(), - delayed_inbox, - preimages: request.preimages, - programs: request.user_wasms[stylus_archs().await].clone(), - }), - }; - - let result = jit::run(&opts).map_err(|error| format!("{error}"))?; - if let Some(err) = result.error { - Err(format!("{err}")) - } else { - Ok(Json(GlobalState::from(result.new_state))) +pub async fn validate( + State(state): State>, + Json(request): Json, +) -> Result, String> { + match state.mode { + crate::config::InputMode::Native => validate_native(request, &state.locator).await, + crate::config::InputMode::Continuous => validate_contiguous(request, &state.locator).await, } } pub async fn wasm_module_roots(State(state): State>) -> impl IntoResponse { - format!("[{:?}]", state.module_root) -} - -/// Counterpart for Go struct `validator.ValidationInput`. -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename_all = "PascalCase")] -pub struct ValidationRequest { - id: u64, - has_delayed_msg: bool, - #[serde(rename = "DelayedMsgNr")] - delayed_msg_number: u64, - preimages: HashMap>>, - user_wasms: HashMap>>, - batch_info: Vec, - delayed_msg: Vec, - start_state: GlobalState, - debug_chain: bool, + format!("{:?}", state.locator.module_roots()) } /// Counterpart for Go struct `validator.BatchInfo`. #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "PascalCase")] pub struct BatchInfo { - number: u64, - data: Vec, + pub number: u64, + pub data: Vec, } impl From for jit::SequencerMessage { @@ -106,13 +68,13 @@ impl From for jit::SequencerMessage { } /// Counterpart for Go struct `validator.GoGlobalState`. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, Default)] #[serde(rename_all = "PascalCase")] pub struct GlobalState { - block_hash: Bytes32, - send_root: Bytes32, - batch: u64, - pos_in_batch: u64, + pub block_hash: Bytes32, + pub send_root: Bytes32, + pub batch: u64, + pub pos_in_batch: u64, } impl From for jit::GlobalState { diff --git a/crates/validator/src/endpoints/validate.rs b/crates/validator/src/endpoints/validate.rs new file mode 100644 index 0000000000..2575f71ae8 --- /dev/null +++ b/crates/validator/src/endpoints/validate.rs @@ -0,0 +1,96 @@ +use std::collections::HashMap; + +use arbutil::{Bytes32, PreimageType}; +use axum::Json; +use serde::{Deserialize, Serialize}; + +use crate::{ + endpoints::spawner_endpoints::{stylus_archs, BatchInfo, GlobalState}, + server_jit::{ + config::JitMachineConfig, jit_machine::JitMachine, machine_locator::MachineLocator, + }, +}; + +/// Counterpart for Go struct `validator.ValidationInput`. +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct ValidationRequest { + id: u64, + pub has_delayed_msg: bool, + #[serde(rename = "DelayedMsgNr")] + pub delayed_msg_number: u64, + pub preimages: HashMap>>, + pub user_wasms: HashMap>>, + pub batch_info: Vec, + pub delayed_msg: Vec, + pub start_state: GlobalState, + pub module_root: Bytes32, + debug_chain: bool, +} + +pub async fn validate_native( + request: ValidationRequest, + locator: &MachineLocator, +) -> Result, String> { + let delayed_inbox = match request.has_delayed_msg { + true => vec![jit::SequencerMessage { + number: request.delayed_msg_number, + data: request.delayed_msg, + }], + false => vec![], + }; + let config = JitMachineConfig::default(); + + let opts = jit::Opts { + validator: jit::ValidatorOpts { + binary: locator + .get_machine_path(request.module_root) + .join(&config.prover_bin_path), + cranelift: true, // The default for JIT binary, no need for LLVM right now + debug: false, // JIT's debug messages are using printlns, which would clutter the server logs + require_success: false, // Relevant for JIT binary only. + }, + input_mode: jit::InputMode::Native(jit::NativeInput { + old_state: request.start_state.into(), + inbox: request.batch_info.into_iter().map(Into::into).collect(), + delayed_inbox, + preimages: request.preimages, + programs: request.user_wasms[stylus_archs().await].clone(), + }), + }; + + let result = jit::run(&opts).map_err(|error| format!("{error:?}"))?; + if let Some(err) = result.error { + Err(format!("{err}")) + } else { + Ok(Json(GlobalState::from(result.new_state))) + } +} + +pub async fn validate_contiguous( + request: ValidationRequest, + locator: &MachineLocator, +) -> Result, String> { + let config = JitMachineConfig::default(); + let module_root = if request.module_root == Bytes32::default() { + None + } else { + Some(request.module_root) + }; + + let mut jit_machine = + JitMachine::new(&config, locator, module_root).map_err(|error| format!("{error:?}"))?; + + let new_state = jit_machine + .feed_machine(&request) + .await + .map_err(|error| format!("{error:?}"))?; + + // Make sure JIT validator binary is done + jit_machine + .complete_machine() + .await + .map_err(|error| format!("{error:?}"))?; + + Ok(Json(new_state)) +} diff --git a/crates/validator/src/main.rs b/crates/validator/src/main.rs index 2da80586b0..d70b8b6a1e 100644 --- a/crates/validator/src/main.rs +++ b/crates/validator/src/main.rs @@ -2,7 +2,6 @@ // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md use anyhow::Result; -use arbutil::Bytes32; use clap::Parser; use logging::init_logging; use router::create_router; @@ -10,14 +9,18 @@ use std::sync::Arc; use tokio::net::TcpListener; use tracing::info; +use crate::{config::InputMode, server_jit::machine_locator::MachineLocator}; + mod config; +mod endpoints; mod logging; mod router; -mod spawner_endpoints; +mod server_jit; #[derive(Clone, Debug)] pub struct ServerState { - module_root: Bytes32, + mode: InputMode, + locator: MachineLocator, } #[tokio::main] @@ -26,8 +29,11 @@ async fn main() -> Result<()> { init_logging(config.logging_format)?; info!("Starting validator server with config: {:#?}", config); + let locator = MachineLocator::new(&config.module_root_config)?; + let state = Arc::new(ServerState { - module_root: config.get_module_root()?, + mode: config.mode, + locator, }); let listener = TcpListener::bind(config.address).await?; diff --git a/crates/validator/src/router.rs b/crates/validator/src/router.rs index 4a451a283a..7ee659ac2d 100644 --- a/crates/validator/src/router.rs +++ b/crates/validator/src/router.rs @@ -1,7 +1,7 @@ // Copyright 2025-2026, Offchain Labs, Inc. // For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md -use crate::{spawner_endpoints, ServerState}; +use crate::{endpoints::spawner_endpoints, ServerState}; use axum::routing::{get, post}; use axum::Router; use std::sync::Arc; diff --git a/crates/validator/src/server_jit/config.rs b/crates/validator/src/server_jit/config.rs new file mode 100644 index 0000000000..28e233fc06 --- /dev/null +++ b/crates/validator/src/server_jit/config.rs @@ -0,0 +1,106 @@ +use std::{ + env, + path::{Path, PathBuf}, +}; + +use anyhow::{anyhow, Context, Result}; + +#[derive(Clone, Debug)] +pub struct JitMachineConfig { + pub prover_bin_path: String, + pub jit_cranelift: bool, + pub wasm_memory_usage_limit: u64, + pub jit_path: Option, +} + +impl Default for JitMachineConfig { + fn default() -> Self { + Self { + prover_bin_path: "replay.wasm".to_owned(), + jit_cranelift: true, + wasm_memory_usage_limit: 4_294_967_296, + jit_path: None, + } + } +} + +pub fn get_jit_path(config_path: &Option) -> Result { + // 1. If a custom path is provided, use it directly + if let Some(jit_path) = config_path { + let path = Path::new(&jit_path); + if path.exists() { + return Ok(path.to_path_buf()); + } + return Err(anyhow!( + "Custom JIT path provided but not found: {jit_path}", + )); + } + + // 2. Fall back to auto-detection + let current_exe = env::current_exe().context("failed to get path of current executable")?; + + let exe_string = current_exe.to_string_lossy(); + let is_test_env = exe_string.contains("test") + || exe_string.contains("deps") + || exe_string.contains("system_tests"); + + let candidate = if is_test_env { + // CARGO_MANIFEST_DIR points to crates/validator, therefore we need to look for the grandparent + let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR")); + if let Some(grandparent) = manifest_dir.parent().and_then(|p| p.parent()) { + grandparent.join("target").join("bin").join("jit") + } else { + return Err(anyhow!( + "Custom JIT path not found for test env: {manifest_dir:?}", + )); + } + } else { + current_exe + .parent() + .ok_or_else(|| anyhow!("failed to resolve parent directory of executable"))? + .join("jit") + }; + + if candidate.exists() { + return Ok(candidate); + } + + // 3. Fallback: Search system PATH + // We treat a missing PATH var as "just continue" rather than a hard error + if let Ok(path_var) = env::var("PATH") { + for split_path in env::split_paths(&path_var) { + let joined = split_path.join("jit"); + if joined.exists() { + return Ok(joined); + } + } + } + + Err(anyhow!( + "jit binary not found in local paths or system PATH" + )) +} + +#[cfg(test)] +mod tests { + use crate::server_jit::config::get_jit_path; + + #[test] + fn test_get_jit_path() { + let jit_path = get_jit_path(&None).unwrap(); + + assert!(jit_path.exists(), "JIT binary does not exist"); + assert!( + jit_path.is_file(), + "JIT path points to a directory, expected a file" + ); + + let path_str = jit_path.to_str().expect("path contains invalid utf-8"); + + assert!( + path_str.contains("nitro/target/bin/jit"), + "Path {:?} did not contain expected substring 'nitro/target/bin/jit'", + jit_path + ); + } +} diff --git a/crates/validator/src/server_jit/jit_machine.rs b/crates/validator/src/server_jit/jit_machine.rs new file mode 100644 index 0000000000..bcb0461a31 --- /dev/null +++ b/crates/validator/src/server_jit/jit_machine.rs @@ -0,0 +1,286 @@ +use anyhow::{anyhow, Context, Result}; +use arbutil::Bytes32; +use std::{collections::HashMap, env::consts, process::Stdio}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + process::{Child, ChildStdin, Command}, +}; +use tracing::{debug, error, info, warn}; + +use crate::{ + endpoints::{spawner_endpoints::GlobalState, validate::ValidationRequest}, + server_jit::{ + config::{get_jit_path, JitMachineConfig}, + machine_locator::MachineLocator, + }, +}; + +const SUCCESS_BYTE: u8 = 0x0; +const FAILURE_BYTE: u8 = 0x1; +const ANOTHER_BYTE: u8 = 0x3; +const READY_BYTE: u8 = 0x4; + +async fn write_exact(conn: &mut TcpStream, data: &[u8]) -> Result<()> { + conn.write_all(data).await.map_err(|e| anyhow!(e)) +} + +async fn write_u8(conn: &mut TcpStream, data: u8) -> Result<()> { + write_exact(conn, &[data]).await +} + +async fn write_u32(conn: &mut TcpStream, data: u32) -> Result<()> { + write_exact(conn, &data.to_be_bytes()).await +} + +async fn write_u64(conn: &mut TcpStream, data: u64) -> Result<()> { + write_exact(conn, &data.to_be_bytes()).await +} + +async fn write_bytes(conn: &mut TcpStream, data: &[u8]) -> Result<()> { + write_u64(conn, data.len() as u64).await?; + write_exact(conn, data).await +} + +async fn read_bytes32(reader: &mut R) -> Result<[u8; 32]> { + let mut buf = [0u8; 32]; + reader.read_exact(&mut buf).await?; + Ok(buf) +} + +async fn read_bytes_with_len(reader: &mut R) -> Result> { + let len = reader.read_u64().await?; // Tokio reads BigEndian by default + let mut buf = vec![0u8; len as usize]; + reader.read_exact(&mut buf).await?; + Ok(buf) +} + +const TARGET_ARM_64: &str = "arm64"; +const TARGET_AMD_64: &str = "amd64"; +const TARGET_HOST: &str = "host"; + +fn local_target() -> String { + if consts::OS == "linux" { + match consts::ARCH { + "aarch64" => return TARGET_ARM_64.to_owned(), + "x86_64" => return TARGET_AMD_64.to_owned(), + arch => { + debug!("Unsupported architecture {arch} detected. Using host as target arch"); + } + } + } + TARGET_HOST.to_owned() +} + +pub struct JitMachine { + pub process_stdin: Option, + pub process: Child, + pub wasm_memory_usage_limit: u64, // default: WasmMemoryUsageLimit: 4_294_967_296, +} + +impl JitMachine { + pub fn new( + config: &JitMachineConfig, + locator: &MachineLocator, + module_root: Option, + ) -> Result { + let jit_path = get_jit_path(&config.jit_path)?; + let mut cmd = Command::new(jit_path); + + let module_root = if let Some(module_root) = module_root { + module_root + } else { + locator.latest_wasm_module_root() + }; + + // info!("Joining prover_bin_path: {} to ") + let bin_path = locator + .get_machine_path(module_root) + .join(&config.prover_bin_path); + + if config.jit_cranelift { + cmd.arg("--cranelift"); + } + + cmd.arg("--binary") + .arg(bin_path) + .arg("continuous") + .stdin(Stdio::piped()) // We must pipe stdin so we can write to it. + .stdout(Stdio::inherit()) // Inherit stdout/stderr so logs show up in your main console. + .stderr(Stdio::inherit()); + + let mut child = cmd.spawn().context("failed to spawn jit binary")?; + + let stdin = child + .stdin + .take() + .ok_or_else(|| anyhow!("failed to open stdin to jit process"))?; + + Ok(Self { + process_stdin: Some(stdin), + process: child, + wasm_memory_usage_limit: config.wasm_memory_usage_limit, + }) + } + + pub async fn feed_machine(&mut self, request: &ValidationRequest) -> Result { + // 1. Create new TCP connection + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("failed to create TCP listener")?; + + let mut state = GlobalState::default(); + + let addr = listener.local_addr().context("failed to get local addr")?; + + // 2. Format the address string (Go: "%v\n") + let address_str = format!("{}\n", addr); + + // 3. Send TCP connection via stdin pipe + if let Some(stdin) = &mut self.process_stdin { + stdin + .write_all(address_str.as_bytes()) + .await + .context("failed to write address to jit stdin")?; + } else { + return Err(anyhow!("JIT machine stdin is not available")); + } + + // 4. Wait for the child to call us back + let (mut conn, _) = listener + .accept() + .await + .context("timeout waiting for jit to connect")?; + + // 5. Sending Global State + write_u64(&mut conn, request.start_state.batch).await?; + write_u64(&mut conn, request.start_state.pos_in_batch).await?; + write_exact(&mut conn, &request.start_state.block_hash.0).await?; + write_exact(&mut conn, &request.start_state.send_root.0).await?; + + // 6. Send batch info + for batch in request.batch_info.iter() { + write_u8(&mut conn, ANOTHER_BYTE).await?; + write_u64(&mut conn, batch.number).await?; + write_bytes(&mut conn, &batch.data).await?; + } + write_u8(&mut conn, SUCCESS_BYTE).await?; + + // 7. Send Delayed Inbox + if request.has_delayed_msg { + write_u8(&mut conn, ANOTHER_BYTE).await?; + write_u64(&mut conn, request.delayed_msg_number).await?; + write_bytes(&mut conn, &request.delayed_msg).await?; + } + write_u8(&mut conn, SUCCESS_BYTE).await?; + + // 8. Send Known Preimages + write_u32(&mut conn, request.preimages.len() as u32).await?; + + for (ty, preimages) in request.preimages.iter() { + write_u8(&mut conn, *ty as u8).await?; + write_u32(&mut conn, preimages.len() as u32).await?; + for (hash, preimage) in preimages { + write_exact(&mut conn, &hash.0).await?; + write_bytes(&mut conn, preimage).await?; + } + } + + // 9. Send User Wasms + let local_target = local_target(); + let user_wasms = request.user_wasms.get(&local_target); + + if user_wasms.map_or(true, |m| m.is_empty()) { + for (arch, wasms) in &request.user_wasms { + if !wasms.is_empty() { + return Err(anyhow!( + "bad stylus arch. got {}, expected {}", + arch, + local_target + )); + } + } + } + + let empty_map = HashMap::new(); + let user_wasms = user_wasms.unwrap_or(&empty_map); + write_u32(&mut conn, user_wasms.len() as u32).await?; + for (module_hash, program) in user_wasms { + write_exact(&mut conn, &module_hash.0).await?; + write_bytes(&mut conn, program).await?; + } + + // 10. Signal done sending state + write_u8(&mut conn, READY_BYTE).await?; + + // 11. Read Response and return new state + loop { + let mut kind_buf = [0u8; 1]; + conn.read_exact(&mut kind_buf).await?; + + match kind_buf[0] { + FAILURE_BYTE => { + let msg_bytes = read_bytes_with_len(&mut conn).await?; + let msg = String::from_utf8_lossy(&msg_bytes); + error!("Jit Machine Failure message: {msg}"); + return Err(anyhow!("Jit Machine Failure: {}", msg)); + } + SUCCESS_BYTE => { + // We write the values to socket in BigEndian so we can use + // read_u64() directly from AsyncReadExt which handles + // BigEndian by default in most net implementations. + state.batch = conn.read_u64().await?; + state.pos_in_batch = conn.read_u64().await?; + state.block_hash.0 = read_bytes32(&mut conn).await?; + state.send_root.0 = read_bytes32(&mut conn).await?; + + let memory_used = conn.read_u64().await?; + if memory_used > self.wasm_memory_usage_limit { + warn!( + "WARN: memory used {} exceeds limit {}", + memory_used, self.wasm_memory_usage_limit + ); + } + + return Ok(state); + } + _ => { + return Err(anyhow!("inter-process communication failure: unknown byte")); + } + } + } + } + + pub async fn complete_machine(&mut self) -> Result<()> { + // Close stdin. This sends EOF to the child process, signaling it to stop. + // We take the Option to ensure it's dropped and cannot be used again. + if let Some(stdin) = self.process_stdin.take() { + drop(stdin); + } + + let status = self + .process + .wait() + .await + .context("failed to wait for JIT process to exit")?; + + if status.success() { + tracing::debug!("JIT machine exited successfully"); + Ok(()) + } else { + // Determine if it was a code (error) or a signal (killed) + match status.code() { + Some(code) => { + let msg = format!("JIT machine exited with error code: {}", code); + error!("{}", msg); + Err(anyhow!(msg)) + } + None => { + let msg = "JIT machine terminated by signal"; + error!("{}", msg); + Err(anyhow!(msg)) + } + } + } + } +} diff --git a/crates/validator/src/server_jit/machine_locator.rs b/crates/validator/src/server_jit/machine_locator.rs new file mode 100644 index 0000000000..a40c410afb --- /dev/null +++ b/crates/validator/src/server_jit/machine_locator.rs @@ -0,0 +1,227 @@ +use anyhow::Result; +use arbutil::Bytes32; +use std::collections::HashSet; +use std::env; +use std::fs; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use tracing::debug; +use tracing::warn; + +use crate::config::ModuleRootConfig; + +#[derive(Debug, Clone)] +pub struct MachineLocator { + root_path: PathBuf, + latest: Bytes32, + module_roots: Vec, +} + +impl MachineLocator { + pub fn new(module_root_config: &ModuleRootConfig) -> Result { + let mut dirs = Vec::new(); + let mut module_roots_set = HashSet::new(); + let mut latest_module_root = Bytes32::default(); + let mut final_root_path = PathBuf::new(); + + // Use CARGO_MANIFEST_DIR to find the crate root. + let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR")); + + if let Some(module_root) = module_root_config.module_root { + module_roots_set.insert(module_root); + latest_module_root = module_root; + + final_root_path = + if let Some(grandparent) = manifest_dir.parent().and_then(|p| p.parent()) { + grandparent.join("target").join("machines") + } else { + final_root_path + }; + } else { + if let Some(rp) = module_root_config + .module_root_path + .as_ref() + .filter(|s| s.exists()) + { + dirs.push(PathBuf::from(rp)); + } else { + // Try to find the workspace root by looking for "target" in common locations + // /../../target/machines + if let Some(grandparent) = manifest_dir.parent().and_then(|p| p.parent()) { + dirs.push(grandparent.join("target").join("machines")); + } + // /target/machines + dirs.push(manifest_dir.join("target").join("machines")); + + // Check working directory + if let Ok(work_dir) = env::current_dir() { + dirs.push(work_dir.join("machines")); + dirs.push(work_dir.join("target").join("machines")); + } + + // Check relative to executable + if let Ok(exec_path) = env::current_exe() { + if let Some(grandparent_of_exec) = exec_path.parent().and_then(|p| p.parent()) { + dirs.push(grandparent_of_exec.join("machines")); + } + } + } + } + + for dir in dirs { + if !dir.exists() || !dir.is_dir() { + debug!("{dir:?} does not exist!!!"); + continue; + } + + let entries = match fs::read_dir(&dir) { + Ok(e) => e, + Err(e) => { + warn!("Reading directory {:?} error: {}", dir, e); + continue; + } + }; + + for entry in entries.flatten() { + let mr_file = entry.path().join("module-root.txt"); + + if !mr_file.exists() { + continue; + } + + let mr_content = match fs::read_to_string(&mr_file) { + Ok(c) => c, + Err(e) => { + warn!("Reading module roots file {:?} error: {}", mr_file, e); + continue; + } + }; + + let module_root = match Bytes32::from_str(mr_content.trim()) { + Ok(h) => h, + Err(_) => { + warn!("Error converting module root file {mr_file:?} into hash"); + continue; + } + }; + + let dir_name = entry.file_name().to_string_lossy().to_string(); + + // IMPORTANT: + // Go's moduleRoot.Hex() returns "0x" + hex. + // Rust Bytes32 Display impl returns raw hex. + // We must format it manually to match Go's directory naming convention. + let module_root_hex = format!("0x{}", module_root); + + if dir_name != "latest" && dir_name != module_root_hex { + continue; + } + + module_roots_set.insert(module_root); + + if dir_name == "latest" { + latest_module_root = module_root; + } + + final_root_path = dir.canonicalize().unwrap_or(dir.clone()); + } + + if !final_root_path.as_os_str().is_empty() { + break; + } + } + + let module_roots: Vec = module_roots_set.into_iter().collect(); + + Ok(MachineLocator { + root_path: final_root_path, + latest: latest_module_root, + module_roots, + }) + } + + pub fn get_machine_path(&self, module_root: Bytes32) -> PathBuf { + if module_root == Bytes32::default() || module_root == self.latest { + self.root_path.join("latest") + } else { + self.root_path.join(format!("0x{}", module_root)) + } + } + + pub fn latest_wasm_module_root(&self) -> Bytes32 { + self.latest + } + + pub fn module_roots(&self) -> &[Bytes32] { + &self.module_roots + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use arbutil::Bytes32; + + use crate::{config::ModuleRootConfig, server_jit::machine_locator::MachineLocator}; + + #[test] + fn test_new_machine_locator_with_path() { + let locator_path = "testdata".to_owned(); + let config = ModuleRootConfig { + module_root: None, + module_root_path: Some(locator_path.into()), + }; + let machine_locator = MachineLocator::new(&config).unwrap(); + + let expected_latest = + Bytes32::from_str("0xf4389b835497a910d7ba3ebfb77aa93da985634f3c052de1290360635be40c4a") + .unwrap(); + assert_eq!(expected_latest, machine_locator.latest); + + let expected_module_roots = [ + Bytes32::from_str("0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4") + .unwrap(), + Bytes32::from_str("0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4") + .unwrap(), + Bytes32::from_str("0xf4389b835497a910d7ba3ebfb77aa93da985634f3c052de1290360635be40c4a") + .unwrap(), + ]; + + let mut module_roots = machine_locator.module_roots().to_vec(); + module_roots.sort(); + let module_roots: [Bytes32; 3] = module_roots.try_into().unwrap(); + + assert_eq!(expected_module_roots, module_roots); + } + + #[test] + fn test_new_machine_locator_with_module_root() { + let config = ModuleRootConfig { + module_root: Some( + Bytes32::from_str( + "0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4", + ) + .unwrap(), + ), + module_root_path: None, + }; + let machine_locator = MachineLocator::new(&config).unwrap(); + + let expected_latest = + Bytes32::from_str("0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4") + .unwrap(); + assert_eq!(expected_latest, machine_locator.latest); + + let expected_module_roots = [Bytes32::from_str( + "0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4", + ) + .unwrap()]; + + let mut module_roots = machine_locator.module_roots().to_vec(); + module_roots.sort(); + let module_roots: [Bytes32; 1] = module_roots.try_into().unwrap(); + + assert_eq!(expected_module_roots, module_roots); + } +} diff --git a/crates/validator/src/server_jit/mod.rs b/crates/validator/src/server_jit/mod.rs new file mode 100644 index 0000000000..78fd19ed99 --- /dev/null +++ b/crates/validator/src/server_jit/mod.rs @@ -0,0 +1,3 @@ +pub mod config; +pub mod jit_machine; +pub mod machine_locator; diff --git a/crates/validator/testdata/0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4/module-root.txt b/crates/validator/testdata/0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4/module-root.txt new file mode 100644 index 0000000000..067f2db9f5 --- /dev/null +++ b/crates/validator/testdata/0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4/module-root.txt @@ -0,0 +1 @@ +0x68e4fe5023f792d4ef584796c84d710303a5e12ea02d6e37e2b5e9c4332507c4 diff --git a/crates/validator/testdata/0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4/module-root.txt b/crates/validator/testdata/0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4/module-root.txt new file mode 100644 index 0000000000..ad3a905ab7 --- /dev/null +++ b/crates/validator/testdata/0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4/module-root.txt @@ -0,0 +1 @@ +0x8b104a2e80ac6165dc58b9048de12f301d70b02a0ab51396c22b4b4b802a16a4 diff --git a/crates/validator/testdata/0xf4389b835497a910d7ba3ebfb77aa93da985634f3c052de1290360635be40c4a/module-root.txt b/crates/validator/testdata/0xf4389b835497a910d7ba3ebfb77aa93da985634f3c052de1290360635be40c4a/module-root.txt new file mode 100644 index 0000000000..1a359ae1cd --- /dev/null +++ b/crates/validator/testdata/0xf4389b835497a910d7ba3ebfb77aa93da985634f3c052de1290360635be40c4a/module-root.txt @@ -0,0 +1 @@ +0xf4389b835497a910d7ba3ebfb77aa93da985634f3c052de1290360635be40c4a diff --git a/crates/validator/testdata/latest b/crates/validator/testdata/latest new file mode 120000 index 0000000000..42d98792a0 --- /dev/null +++ b/crates/validator/testdata/latest @@ -0,0 +1 @@ +0xf4389b835497a910d7ba3ebfb77aa93da985634f3c052de1290360635be40c4a \ No newline at end of file