Skip to content

Commit 6ef41c9

Browse files
committed
Add 2nd server test and make Mutex more granular
Signed-off-by: Igor Braga <[email protected]>
1 parent b5b679d commit 6ef41c9

File tree

7 files changed

+122
-70
lines changed

7 files changed

+122
-70
lines changed

crates/validation/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
};
99

1010
/// Counterpart to Go `validator.GoGlobalState`.
11-
#[derive(Clone, Debug, Serialize, Deserialize)]
11+
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
1212
#[serde(rename_all = "PascalCase")]
1313
pub struct GoGlobalState {
1414
#[serde(with = "As::<DisplayFromStr>")]

crates/validator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ tracing-subscriber = { workspace = true, features = [
2525
"json",
2626
"env-filter",
2727
] }
28+
validation = { workspace = true }
2829

2930
[dev-dependencies]
3031
reqwest = "0.13.1"
31-
validation = { workspace = true }

crates/validator/src/config.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use clap::{Args, Parser, ValueEnum};
1313
use std::fs::read_to_string;
1414
use std::net::SocketAddr;
1515
use std::path::PathBuf;
16-
use tokio::sync::Mutex;
1716

1817
use crate::engine::config::JitMachineConfig;
1918
use crate::engine::machine::JitMachine;
@@ -23,7 +22,9 @@ pub struct ServerState {
2322
pub mode: InputMode,
2423
pub binary: PathBuf,
2524
pub module_root: Bytes32,
26-
pub jit_machine: Option<Mutex<JitMachine>>,
25+
/// jit machine responsible for computing next GlobalState. Not wrapped
26+
/// in Arc<> since the caller of ServerState is wrapped in Arc<>
27+
pub jit_machine: Option<JitMachine>,
2728
pub available_workers: usize,
2829
}
2930

@@ -37,7 +38,7 @@ impl ServerState {
3738

3839
let jit_machine = JitMachine::new(&config, Some(module_root))?;
3940

40-
Some(Mutex::new(jit_machine))
41+
Some(jit_machine)
4142
}
4243
InputMode::Native => None,
4344
};

crates/validator/src/engine/execution.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,16 @@ pub async fn validate_continuous(
7070
));
7171
}
7272

73-
let mut locked_jit_machine = server_state.jit_machine.as_ref().unwrap().lock().await;
73+
let jit_machine = server_state.jit_machine.as_ref().unwrap();
7474

75-
if !locked_jit_machine.is_active() {
75+
if !jit_machine.is_active().await {
7676
return Err(format!(
7777
"Jit machine is not active. Maybe it received a shutdown signal? Requested module root: {}",
7878
server_state.module_root
7979
));
8080
}
8181

82-
let new_state = locked_jit_machine
82+
let new_state = jit_machine
8383
.feed_machine(&request)
8484
.await
8585
.map_err(|error| format!("{error:?}"))?;

crates/validator/src/engine/machine.rs

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,12 @@ use tokio::{
3131
io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
3232
net::{TcpListener, TcpStream},
3333
process::{Child, ChildStdin, Command},
34+
sync::Mutex,
3435
};
3536
use tracing::{error, warn};
37+
use validation::{GoGlobalState, ValidationInput};
3638

37-
use crate::{
38-
engine::{config::JitMachineConfig, execution::ValidationRequest},
39-
spawner_endpoints::{local_target, GlobalState},
40-
};
39+
use crate::{engine::config::JitMachineConfig, spawner_endpoints::local_target};
4140

4241
const SUCCESS_BYTE: u8 = 0x0;
4342
const FAILURE_BYTE: u8 = 0x1;
@@ -80,10 +79,12 @@ async fn read_bytes_with_len<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Vec
8079

8180
#[derive(Debug)]
8281
pub struct JitMachine {
83-
pub process_stdin: Option<ChildStdin>,
84-
pub process: Child,
82+
/// Handler to jit binary stdin. Instead of using Mutex<> for the entire
83+
/// JitMachine we chose to use a more granular Mutex<> to avoid contention
84+
pub process_stdin: Mutex<Option<ChildStdin>>,
85+
/// Handler to jit binary process. Needs a Mutex<> to force quit on server shutdown
86+
pub process: Mutex<Child>,
8587
pub wasm_memory_usage_limit: u64,
86-
is_active: bool,
8788
}
8889

8990
impl JitMachine {
@@ -136,39 +137,41 @@ impl JitMachine {
136137
.ok_or_else(|| anyhow!("failed to open stdin to jit process"))?;
137138

138139
Ok(Self {
139-
process_stdin: Some(stdin),
140-
process: child,
140+
process_stdin: Mutex::new(Some(stdin)),
141+
process: Mutex::new(child),
141142
wasm_memory_usage_limit: config.wasm_memory_usage_limit,
142-
is_active: true,
143143
})
144144
}
145145

146-
pub fn is_active(&self) -> bool {
147-
self.is_active && self.process_stdin.is_some()
146+
pub async fn is_active(&self) -> bool {
147+
self.process_stdin.lock().await.is_some()
148148
}
149149

150-
pub async fn feed_machine(&mut self, request: &ValidationRequest) -> Result<GlobalState> {
150+
pub async fn feed_machine(&self, request: &ValidationInput) -> Result<GoGlobalState> {
151151
// 1. Create new TCP connection
152152
// Binding with a port number of 0 will request that the OS assigns a port to this listener.
153153
let listener = TcpListener::bind("127.0.0.1:0")
154154
.await
155155
.context("failed to create TCP listener")?;
156156

157-
let mut state = GlobalState::default();
157+
let mut state = GoGlobalState::default();
158158

159159
let addr = listener.local_addr().context("failed to get local addr")?;
160160

161161
// 2. Format the address string (Go: "%v\n")
162162
let address_str = format!("{addr}\n");
163163

164164
// 3. Send TCP connection via stdin pipe
165-
if let Some(stdin) = &mut self.process_stdin {
166-
stdin
167-
.write_all(address_str.as_bytes())
168-
.await
169-
.context("failed to write address to jit stdin")?;
170-
} else {
171-
return Err(anyhow!("JIT machine stdin is not available"));
165+
{
166+
let mut locked_process_stdin = self.process_stdin.lock().await;
167+
if let Some(stdin) = locked_process_stdin.as_mut() {
168+
stdin
169+
.write_all(address_str.as_bytes())
170+
.await
171+
.context("failed to write address to jit stdin")?;
172+
} else {
173+
return Err(anyhow!("JIT machine stdin is not available"));
174+
}
172175
}
173176

174177
// 4. Wait for the child to call us back
@@ -195,7 +198,7 @@ impl JitMachine {
195198
// 7. Send Delayed Inbox
196199
if request.has_delayed_msg {
197200
write_u8(&mut conn, ANOTHER_BYTE).await?;
198-
write_u64(&mut conn, request.delayed_msg_number).await?;
201+
write_u64(&mut conn, request.delayed_msg_nr).await?;
199202
write_bytes(&mut conn, &request.delayed_msg).await?;
200203
}
201204
write_u8(&mut conn, SUCCESS_BYTE).await?;
@@ -232,7 +235,7 @@ impl JitMachine {
232235
write_u32(&mut conn, local_user_wasm.len() as u32).await?;
233236
for (module_hash, program) in local_user_wasm {
234237
write_exact(&mut conn, &module_hash.0).await?;
235-
write_bytes(&mut conn, program).await?;
238+
write_bytes(&mut conn, &program.as_vec()).await?;
236239
}
237240

238241
// 10. Signal that we are done sending global state
@@ -272,20 +275,21 @@ impl JitMachine {
272275
}
273276
}
274277

275-
pub async fn complete_machine(&mut self) -> Result<()> {
278+
pub async fn complete_machine(&self) -> Result<()> {
276279
// Close stdin. This sends EOF to the child process, signaling it to stop.
277280
// We take the Option to ensure it's dropped and cannot be used again.
278-
if let Some(stdin) = self.process_stdin.take() {
281+
282+
let mut locked_process_stdin = self.process_stdin.lock().await;
283+
if let Some(stdin) = locked_process_stdin.take() {
279284
drop(stdin);
280285
}
281286

282-
self.process
287+
let mut locked_process = self.process.lock().await;
288+
locked_process
283289
.kill()
284290
.await
285291
.context("failed to kill jit process")?;
286292

287-
self.is_active = false;
288-
289293
Ok(())
290294
}
291295
}

crates/validator/src/server.rs

Lines changed: 79 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use tracing::info;
88
use crate::config::ServerState;
99
use crate::router::create_router;
1010

11-
pub(crate) async fn run_server(listener: TcpListener, state: Arc<ServerState>) -> Result<()> {
11+
pub async fn run_server(listener: TcpListener, state: Arc<ServerState>) -> Result<()> {
1212
run_server_internal(listener, state, shutdown_signal()).await
1313
}
1414

@@ -27,8 +27,7 @@ where
2727
info!("Shutdown signal received. Running cleanup...");
2828

2929
if let Some(jit_machine) = state.jit_machine.as_ref() {
30-
let mut locked_jit_machine = jit_machine.lock().await;
31-
locked_jit_machine.complete_machine().await?;
30+
jit_machine.complete_machine().await?;
3231
}
3332

3433
Ok(())
@@ -70,25 +69,27 @@ pub(crate) async fn shutdown_signal() {
7069
mod tests {
7170
use anyhow::Result;
7271
use clap::Parser;
73-
use std::sync::Arc;
74-
use tokio::{net::TcpListener, sync::oneshot};
72+
use std::{net::SocketAddr, sync::Arc};
73+
use tokio::{
74+
net::TcpListener,
75+
sync::oneshot::{self, Sender},
76+
task::JoinHandle,
77+
};
7578

7679
use crate::{
7780
config::{ServerConfig, ServerState},
7881
server::run_server_internal,
7982
};
8083

81-
#[tokio::test]
82-
async fn test_server_lifecycle() -> Result<()> {
83-
// 1. Setup Config and State. Use dummy module root is okay.
84-
let config = ServerConfig::try_parse_from([
85-
"server",
86-
"--module-root",
87-
"0x0000000000000000000000000000000000000000000000000000000000000000",
88-
])
89-
.unwrap();
90-
let state = Arc::new(ServerState::new(&config)?);
84+
struct TestServerConfig {
85+
sender: Sender<()>,
86+
server_handle: JoinHandle<Result<()>>,
87+
addr: SocketAddr,
88+
state: Arc<ServerState>,
89+
}
9190

91+
async fn spinup_server(config: &ServerConfig) -> Result<TestServerConfig> {
92+
let state = Arc::new(ServerState::new(config)?);
9293
// 2. Bind to random free port
9394
let listener = TcpListener::bind("127.0.0.1:0").await?;
9495
let addr = listener.local_addr()?;
@@ -107,16 +108,19 @@ mod tests {
107108
run_server_internal(listener, state_for_server, shutdown_signal).await
108109
});
109110

110-
// 5. Check that jit machine is active
111-
if let Some(jit) = state.jit_machine.as_ref() {
112-
let locked_jit_machine = jit.lock().await;
113-
assert!(locked_jit_machine.is_active());
114-
}
111+
Ok(TestServerConfig {
112+
sender: tx,
113+
server_handle,
114+
addr,
115+
state,
116+
})
117+
}
115118

116-
// 6. Make a real request here to prove the server is up
119+
async fn verify_and_shutdown_server(test_config: TestServerConfig) -> Result<()> {
120+
// 5. Make a real request here to prove the server is up
117121
let client = reqwest::Client::new();
118122
let resp = client
119-
.get(format!("http://{}/validation_capacity", addr))
123+
.get(format!("http://{}/validation_capacity", test_config.addr))
120124
.send()
121125
.await;
122126

@@ -126,28 +130,71 @@ mod tests {
126130
);
127131
assert_eq!(resp.unwrap().status(), 200);
128132

129-
// 7. Trigger Shutdown
133+
// 6. Trigger Shutdown
130134
println!("Sending shutdown signal...");
131-
let _ = tx.send(());
135+
let _ = test_config.sender.send(());
132136

133-
// 8. Wait for the server to finish (this ensures cleanup ran)
134-
let result = server_handle.await?;
137+
// 7. Wait for the server to finish (this ensures cleanup ran)
138+
let result = test_config.server_handle.await?;
135139
assert!(result.is_ok(), "Server should exit successfully");
136140

137-
// 9. Verify Cleanup
138-
if let Some(jit) = state.jit_machine.as_ref() {
139-
let locked_jit_machine = jit.lock().await;
140-
assert!(!locked_jit_machine.is_active());
141+
// 8. Verify jit_machine Cleanup
142+
if let Some(jit) = test_config.state.jit_machine.as_ref() {
143+
assert!(!jit.is_active().await);
141144
}
142145

143-
// 10. Verify same request from above fails expectadly
146+
// 9. Verify same request from above fails expectadly
144147
let resp = client
145-
.get(format!("http://{}/validation_capacity", addr))
148+
.get(format!("http://{}/validation_capacity", test_config.addr))
146149
.send()
147150
.await;
148151

149152
assert!(resp.is_err(), "server should not be up");
150153

151154
Ok(())
152155
}
156+
157+
#[tokio::test]
158+
async fn test_server_lifecycle_native_mode() -> Result<()> {
159+
// 1. Setup Config and State. Use dummy module root is okay.
160+
let config = ServerConfig::try_parse_from([
161+
"server",
162+
"--module-root",
163+
"0x0000000000000000000000000000000000000000000000000000000000000000",
164+
])
165+
.unwrap();
166+
let test_config = spinup_server(&config).await?;
167+
168+
// Since we're running in native mode there should not be an active jit_machine
169+
assert!(test_config.state.jit_machine.is_none());
170+
171+
verify_and_shutdown_server(test_config).await.unwrap();
172+
173+
Ok(())
174+
}
175+
176+
#[tokio::test]
177+
async fn test_server_lifecycle_continuous_mode() -> Result<()> {
178+
// 1. Setup Config and State. Use dummy module root is okay.
179+
let config = ServerConfig::try_parse_from([
180+
"server",
181+
"--module-root",
182+
"0x0000000000000000000000000000000000000000000000000000000000000000",
183+
"--mode",
184+
"continuous",
185+
])
186+
.unwrap();
187+
let test_config = spinup_server(&config).await?;
188+
189+
assert!(test_config.state.jit_machine.is_some());
190+
191+
// Check that jit machine is active
192+
if let Some(jit) = test_config.state.jit_machine.as_ref() {
193+
assert!(jit.is_active().await);
194+
}
195+
196+
verify_and_shutdown_server(test_config).await.unwrap();
197+
198+
Ok(())
199+
}
153200
}

crates/validator/src/spawner_endpoints.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//! field names).
99
1010
use crate::engine::config::{TARGET_AMD_64, TARGET_ARM_64, TARGET_HOST};
11-
use crate::engine::execution::{validate_continuous, validate_native, ValidationRequest};
11+
use crate::engine::execution::{validate_continuous, validate_native};
1212
use crate::{config::InputMode, ServerState};
1313
use axum::extract::State;
1414
use axum::response::IntoResponse;
@@ -31,7 +31,7 @@ pub async fn validate(
3131
Json(request): Json<ValidationInput>,
3232
) -> Result<Json<GoGlobalState>, String> {
3333
match state.mode {
34-
InputMode::Native => validate_native(request).await,
34+
InputMode::Native => validate_native(&state, request).await,
3535
InputMode::Continuous => validate_continuous(&state, request).await,
3636
}
3737
}

0 commit comments

Comments
 (0)