Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 15 additions & 166 deletions crates/icp-cli/src/commands/network/run.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,12 @@
use std::{
io::{BufRead, BufReader},
process::{Child, Command, Stdio},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::Duration,
};

use clap::Args;
use ic_agent::{Agent, AgentError};
use ic_agent::AgentError;
use icp::{
fs::lock::LockError,
identity::manifest::{IdentityList, LoadIdentityManifestError},
manifest,
network::{Configuration, NetworkDirectory, RunNetworkError, run_network},
network::{Configuration, RunNetworkError, run_network},
project::DEFAULT_LOCAL_NETWORK_NAME,
};
use sysinfo::Pid;
use tracing::debug;

use icp::context::Context;
Expand Down Expand Up @@ -56,6 +44,12 @@ pub(crate) enum CommandError {
#[error("failed to create network directory")]
CreateNetworkDir { source: icp::fs::Error },

#[error(transparent)]
LoadNetworkDescriptor(#[from] icp::network::directory::LoadNetworkFileError),

#[error("network '{name}' is already running")]
AlreadyRunning { name: String },

#[error("failed to cleanup canister ID store for environment '{env}'")]
CleanupCanisterIdStore {
source: icp::store_id::CleanupError,
Expand All @@ -65,9 +59,6 @@ pub(crate) enum CommandError {
#[error(transparent)]
NetworkAccess(#[from] icp::network::AccessError),

#[error("timed out waiting for network to start: {err}")]
Timeout { err: String },

#[error(transparent)]
Identities(#[from] LoadIdentityManifestError),

Expand Down Expand Up @@ -109,6 +100,12 @@ pub(crate) async fn exec(ctx: &Context, args: &RunArgs) -> Result<(), CommandErr
nd.ensure_exists()
.map_err(|e| CommandError::CreateNetworkDir { source: e })?;

if nd.load_network_descriptor().await?.is_some() {
return Err(CommandError::AlreadyRunning {
name: args.name.to_owned(),
});
}

// Clean up any existing canister ID mappings of which environment is on this network
for env in p.environments.values() {
if env.network == *network {
Expand All @@ -135,154 +132,6 @@ pub(crate) async fn exec(ctx: &Context, args: &RunArgs) -> Result<(), CommandErr
debug!("Project root: {pdir}");
debug!("Network root: {}", nd.network_root);

if args.background {
let mut child = run_in_background()?;
nd.save_background_network_runner_pid(Pid::from(child.id() as usize))
.await?;
relay_child_output_until_healthy(ctx, &mut child, &nd).await?;
} else {
run_network(
cfg, // config
nd, // nd
pdir, // project_root
seed_accounts, // seed_accounts
)
.await?;
}
run_network(cfg, nd, pdir, seed_accounts, args.background).await?;
Ok(())
}

async fn relay_child_output_until_healthy(
ctx: &Context,
child: &mut Child,
nd: &NetworkDirectory,
) -> Result<(), CommandError> {
let stdout = child.stdout.take().expect("Failed to take child stdout");
let stderr = child.stderr.take().expect("Failed to take child stderr");

let stop_printing_child_output = Arc::new(AtomicBool::new(false));

// Spawn threads to relay output
let term = ctx.term.clone();
let should_stop_clone = Arc::clone(&stop_printing_child_output);
let stdout_thread = thread::spawn(move || {
let reader = BufReader::new(stdout);
for line in reader.lines() {
if should_stop_clone.load(Ordering::Relaxed) {
break;
}
if let Ok(line) = line {
let _ = term.write_line(&line);
}
}
});

let term = ctx.term.clone();
let should_stop_clone = Arc::clone(&stop_printing_child_output);
let stderr_thread = thread::spawn(move || {
let reader = BufReader::new(stderr);
for line in reader.lines() {
if should_stop_clone.load(Ordering::Relaxed) {
break;
}
if let Ok(line) = line {
let _ = term.write_line(&line);
}
}
});

wait_for_healthy_network(nd).await?;

// Signal threads to stop
stop_printing_child_output.store(true, Ordering::Relaxed);

// Don't join the threads - they're likely blocked on I/O waiting for the next line.
// They'll terminate naturally when the pipes close, or when the next line arrives.
drop(stdout_thread);
drop(stderr_thread);

Ok(())
}

#[allow(clippy::result_large_err)]
fn run_in_background() -> Result<Child, CommandError> {
let exe = std::env::current_exe().expect("Failed to get current executable.");
let mut cmd = Command::new(exe);
// Skip 1 because arg0 is this executable's path.
cmd.args(std::env::args().skip(1).filter(|a| !a.eq("--background")))
.stdin(Stdio::null())
.stdout(Stdio::piped()) // Capture stdout so we can relay it
.stderr(Stdio::piped()); // Capture stderr so we can relay it

// On Unix, create a new process group so the child can continue running
// independently after the run command exits
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
cmd.process_group(0);
}

let child = cmd.spawn().expect("Failed to spawn child process.");
Ok(child)
}

async fn retry_with_timeout<F, Fut, T>(mut f: F, max_retries: usize, delay_ms: u64) -> Option<T>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Option<T>> + Send,
{
let mut retries = 0;
loop {
if let Some(result) = f().await {
return Some(result);
}
if retries > max_retries {
return None;
}
retries += 1;
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
}

async fn wait_for_healthy_network(nd: &NetworkDirectory) -> Result<(), CommandError> {
let max_retries = 45;
let delay_ms = 1000;

// Wait for network descriptor to be written
let network = retry_with_timeout(
|| async move { nd.load_network_descriptor().await.unwrap_or(None) },
max_retries,
delay_ms,
)
.await
.ok_or(CommandError::Timeout {
err: "timed out waiting for network descriptor".to_string(),
})?;

// Wait for network to report itself healthy
let port = network.gateway.port;
let agent = Agent::builder()
.with_url(format!("http://127.0.0.1:{port}"))
.build()?;
retry_with_timeout(
|| {
let agent = agent.clone();
async move {
let status = agent.status().await;
if let Ok(status) = status
&& matches!(&status.replica_health_status, Some(status) if status == "healthy")
{
return Some(());
}

None
}
},
max_retries,
delay_ms,
)
.await
.ok_or(CommandError::Timeout {
err: "timed out waiting for network to start".to_string(),
})
}
4 changes: 4 additions & 0 deletions crates/icp-cli/src/commands/network/stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub async fn exec(ctx: &Context, cmd: &Cmd) -> Result<(), CommandError> {
.with_write(async |root| {
let pid_file = root.background_network_runner_pid_file();
let _ = remove_file(&pid_file); // Cleanup is nice, but optional
let descriptor_file = root.network_descriptor_path();
// Desciptor file must be deleted to allow the network to be restarted, but if it doesn't exist, that's fine too
let _ = remove_file(&descriptor_file);

Ok::<_, CommandError>(())
})
.await??;
Expand Down
12 changes: 6 additions & 6 deletions crates/icp-cli/tests/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ async fn network_run_and_stop_background() {
.args(["network", "run", "my-network", "--background"])
.assert()
.success()
.stdout(contains("Created instance with id")); // part of network start output
.stderr(contains("Created instance with id")); // part of network start output

let network = ctx.wait_for_network_descriptor(&project_dir, "my-network");

Expand All @@ -332,7 +332,7 @@ async fn network_run_and_stop_background() {
);

let pid_contents = read_to_string(&pid_file_path).expect("Failed to read PID file");
let background_controller_pid: Pid = pid_contents
let background_pocketic_pid: Pid = pid_contents
.trim()
.parse()
.expect("PID file should contain a valid process ID");
Expand All @@ -357,7 +357,7 @@ async fn network_run_and_stop_background() {
.success()
.stdout(contains(format!(
"Stopping background network (PID: {})",
background_controller_pid
background_pocketic_pid
)))
.stdout(contains("Network stopped successfully"));

Expand All @@ -367,11 +367,11 @@ async fn network_run_and_stop_background() {
"PID file should be removed after stopping"
);

// Verify controller process is no longer running
// Verify pocketic process is no longer running
let mut system = System::new();
system.refresh_processes(ProcessesToUpdate::Some(&[background_controller_pid]), true);
system.refresh_processes(ProcessesToUpdate::Some(&[background_pocketic_pid]), true);
assert!(
system.process(background_controller_pid).is_none(),
system.process(background_pocketic_pid).is_none(),
"Process should no longer be running"
);

Expand Down
10 changes: 10 additions & 0 deletions crates/icp/src/network/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ impl NetworkRootPaths {
pub fn pocketic_port_file(&self) -> PathBuf {
self.pocketic_dir().join("port")
}

/// PocketIC writes its stdout to this file.
pub fn pocketic_stdout_file(&self) -> PathBuf {
self.pocketic_dir().join("stdout.log")
}

/// PocketIC writes its stderr to this file.
pub fn pocketic_stderr_file(&self) -> PathBuf {
self.pocketic_dir().join("stderr.log")
}
}

impl PathsAccess for NetworkRootPaths {
Expand Down
24 changes: 21 additions & 3 deletions crates/icp/src/network/managed/pocketic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use pocket_ic::common::rest::{
};
use reqwest::Url;
use snafu::prelude::*;
use std::process::Stdio;
use time::OffsetDateTime;

use crate::prelude::*;
Expand Down Expand Up @@ -69,14 +70,31 @@ pub struct PocketIcInstance {
pub root_key: String,
}

pub fn spawn_pocketic(pocketic_path: &Path, port_file: &Path) -> tokio::process::Child {
pub fn spawn_pocketic(
pocketic_path: &Path,
port_file: &Path,
stdout_file: &Path,
stderr_file: &Path,
background: bool,
) -> tokio::process::Child {
let mut cmd = tokio::process::Command::new(pocketic_path);
cmd.arg("--port-file");
cmd.arg(port_file.as_os_str());
cmd.args(["--ttl", "2592000", "--log-levels", "error"]);

cmd.stdout(std::process::Stdio::inherit());
cmd.stderr(std::process::Stdio::inherit());
if background {
eprintln!("For background mode, PocketIC output will be redirected:");
eprintln!(" stdout: {}", stdout_file);
eprintln!(" stderr: {}", stderr_file);
let stdout = std::fs::File::create(stdout_file).expect("Failed to create stdout file.");
let stderr = std::fs::File::create(stderr_file).expect("Failed to create stderr file.");
cmd.stdout(Stdio::from(stdout));
cmd.stderr(Stdio::from(stderr));
} else {
cmd.stdout(Stdio::inherit());
cmd.stderr(Stdio::inherit());
}

#[cfg(unix)]
{
cmd.process_group(0);
Expand Down
Loading