diff --git a/feos/services/vm-service/src/dispatcher.rs b/feos/services/vm-service/src/dispatcher.rs index b2c690d..5cb5049 100644 --- a/feos/services/vm-service/src/dispatcher.rs +++ b/feos/services/vm-service/src/dispatcher.rs @@ -3,8 +3,11 @@ use crate::{ dispatcher_handlers::{ - handle_create_vm_command, handle_delete_vm_command, handle_get_vm_command, - handle_list_vms_command, handle_stream_vm_events_command, perform_startup_sanity_check, + handle_attach_disk_command, handle_create_vm_command, handle_delete_vm_command, + handle_get_vm_command, handle_list_vms_command, handle_pause_vm_command, + handle_remove_disk_command, handle_resume_vm_command, handle_shutdown_vm_command, + handle_start_vm_command, handle_stream_vm_console_command, handle_stream_vm_events_command, + perform_startup_sanity_check, }, error::VmServiceError, persistence::repository::VmRepository, @@ -65,15 +68,13 @@ impl VmServiceDispatcher { let hypervisor = self.hypervisor.clone(); let event_bus_tx = self.event_bus_tx.clone(); let status_channel_tx = self.status_channel_tx.clone(); - let healthcheck_cancel_bus_tx = self.healthcheck_cancel_bus.clone(); match cmd { Command::CreateVm(req, responder) => { handle_create_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await; } Command::StartVm(req, responder) => { - let cancel_bus = healthcheck_cancel_bus_tx.subscribe(); - tokio::spawn(worker::handle_start_vm(req, responder, hypervisor, event_bus_tx, cancel_bus)); + handle_start_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx, &self.healthcheck_cancel_bus).await; } Command::GetVm(req, responder) => { handle_get_vm_command(&self.repository, req, responder).await; @@ -85,7 +86,7 @@ impl VmServiceDispatcher { handle_delete_vm_command(&self.repository, &self.healthcheck_cancel_bus, req, responder, hypervisor, event_bus_tx).await; } Command::StreamVmConsole(input_stream, output_tx) => { - tokio::spawn(worker::handle_stream_vm_console(*input_stream, output_tx, hypervisor)); + handle_stream_vm_console_command(&self.repository, *input_stream, output_tx, hypervisor).await; } Command::ListVms(req, responder) => { handle_list_vms_command(&self.repository, req, responder).await; @@ -94,19 +95,19 @@ impl VmServiceDispatcher { tokio::spawn(worker::handle_ping_vm(req, responder, hypervisor)); } Command::ShutdownVm(req, responder) => { - tokio::spawn(worker::handle_shutdown_vm(req, responder, hypervisor, event_bus_tx)); + handle_shutdown_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await; } Command::PauseVm(req, responder) => { - tokio::spawn(worker::handle_pause_vm(req, responder, hypervisor, event_bus_tx)); + handle_pause_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await; } Command::ResumeVm(req, responder) => { - tokio::spawn(worker::handle_resume_vm(req, responder, hypervisor, event_bus_tx)); + handle_resume_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await; } Command::AttachDisk(req, responder) => { - tokio::spawn(worker::handle_attach_disk(req, responder, hypervisor)); + handle_attach_disk_command(&self.repository, req, responder, hypervisor).await; } Command::RemoveDisk(req, responder) => { - tokio::spawn(worker::handle_remove_disk(req, responder, hypervisor)); + handle_remove_disk_command(&self.repository, req, responder, hypervisor).await; } } }, diff --git a/feos/services/vm-service/src/dispatcher_handlers.rs b/feos/services/vm-service/src/dispatcher_handlers.rs index 0f8d565..9fed868 100644 --- a/feos/services/vm-service/src/dispatcher_handlers.rs +++ b/feos/services/vm-service/src/dispatcher_handlers.rs @@ -10,9 +10,13 @@ use crate::{ use feos_proto::{ image_service::{image_service_client::ImageServiceClient, PullImageRequest}, vm_service::{ - CreateVmRequest, CreateVmResponse, DeleteVmRequest, DeleteVmResponse, GetVmRequest, - ListVmsRequest, ListVmsResponse, StreamVmEventsRequest, VmEvent, VmInfo, VmState, - VmStateChangedEvent, + stream_vm_console_request as console_input, AttachConsoleMessage, AttachDiskRequest, + AttachDiskResponse, CreateVmRequest, CreateVmResponse, DeleteVmRequest, DeleteVmResponse, + GetVmRequest, ListVmsRequest, ListVmsResponse, PauseVmRequest, PauseVmResponse, + RemoveDiskRequest, RemoveDiskResponse, ResumeVmRequest, ResumeVmResponse, + ShutdownVmRequest, ShutdownVmResponse, StartVmRequest, StartVmResponse, + StreamVmConsoleRequest, StreamVmConsoleResponse, StreamVmEventsRequest, VmEvent, VmInfo, + VmState, VmStateChangedEvent, }, }; use hyper_util::rt::TokioIo; @@ -23,9 +27,10 @@ use prost::Message; use prost_types::Any; use std::{path::PathBuf, sync::Arc}; use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio_stream::StreamExt; use tonic::{ transport::{Channel, Endpoint, Error as TonicTransportError, Uri}, - Status, + Status, Streaming, }; use tower::service_fn; use uuid::Uuid; @@ -142,6 +147,21 @@ async fn get_vm_info( } } +async fn parse_vm_id_and_get_record( + vm_id_str: &str, + repository: &VmRepository, +) -> Result<(Uuid, VmRecord), VmServiceError> { + let vm_id = Uuid::parse_str(vm_id_str) + .map_err(|_| VmServiceError::InvalidArgument("Invalid VM ID format.".to_string()))?; + + match repository.get_vm(vm_id).await? { + Some(record) => Ok((vm_id, record)), + None => Err(VmServiceError::Vmm(crate::vmm::VmmError::VmNotFound( + vm_id.to_string(), + ))), + } +} + pub(crate) async fn handle_create_vm_command( repository: &VmRepository, req: CreateVmRequest, @@ -385,6 +405,71 @@ pub(crate) async fn handle_delete_vm_command( } } +async fn get_attach_message( + stream: &mut Streaming, +) -> Result { + match stream.next().await { + Some(Ok(msg)) => match msg.payload { + Some(console_input::Payload::Attach(AttachConsoleMessage { vm_id })) => Ok(vm_id), + _ => Err(Status::invalid_argument( + "First message must be an Attach message.", + )), + }, + Some(Err(e)) => Err(e), + None => Err(Status::invalid_argument( + "Client disconnected before sending Attach message.", + )), + } +} + +pub(crate) async fn handle_stream_vm_console_command( + repository: &VmRepository, + mut input_stream: Streaming, + output_tx: mpsc::Sender>, + hypervisor: Arc, +) { + let vm_id_str = match get_attach_message(&mut input_stream).await { + Ok(id) => id, + Err(status) => { + let _ = output_tx.send(Err(status)).await; + return; + } + }; + + let (_vm_id, record) = match parse_vm_id_and_get_record(&vm_id_str, repository).await { + Ok(result) => result, + Err(e) => { + if output_tx.send(Err(e.into())).await.is_err() { + warn!( + "StreamConsole: Client for {vm_id_str} disconnected before error could be sent." + ); + } + return; + } + }; + + if record.status.state != VmState::Running { + let status = VmServiceError::InvalidState(format!( + "Cannot open console for VM in {:?} state. Must be in Running.", + record.status.state + )) + .into(); + if output_tx.send(Err(status)).await.is_err() { + warn!( + "StreamConsole: Client for {vm_id_str} disconnected before precondition error could be sent." + ); + } + return; + } + + tokio::spawn(worker::spawn_console_bridge( + vm_id_str, + input_stream, + output_tx, + hypervisor, + )); +} + pub(crate) async fn handle_list_vms_command( repository: &VmRepository, _req: ListVmsRequest, @@ -407,6 +492,194 @@ pub(crate) async fn handle_list_vms_command( } } +pub(crate) async fn handle_start_vm_command( + repository: &VmRepository, + req: StartVmRequest, + responder: oneshot::Sender>, + hypervisor: Arc, + event_bus_tx: mpsc::Sender, + healthcheck_cancel_bus_tx: &broadcast::Sender, +) { + let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await { + Ok(result) => result, + Err(e) => { + let _ = responder.send(Err(e)); + return; + } + }; + + let current_state = record.status.state; + if !matches!(current_state, VmState::Created | VmState::Stopped) { + let _ = responder.send(Err(VmServiceError::InvalidState(format!( + "Cannot start VM in {current_state:?} state. Must be in Created or Stopped." + )))); + return; + } + + let cancel_bus = if current_state == VmState::Stopped { + None + } else { + Some(healthcheck_cancel_bus_tx.subscribe()) + }; + + tokio::spawn(worker::handle_start_vm( + req, + responder, + hypervisor, + event_bus_tx, + cancel_bus, + )); +} + +pub(crate) async fn handle_shutdown_vm_command( + repository: &VmRepository, + req: ShutdownVmRequest, + responder: oneshot::Sender>, + hypervisor: Arc, + event_bus_tx: mpsc::Sender, +) { + let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await { + Ok(result) => result, + Err(e) => { + let _ = responder.send(Err(e)); + return; + } + }; + + let current_state = record.status.state; + if current_state != VmState::Running { + let _ = responder.send(Err(VmServiceError::InvalidState(format!( + "Cannot shutdown VM in {current_state:?} state. Must be in Running." + )))); + return; + } + + tokio::spawn(worker::handle_shutdown_vm( + req, + responder, + hypervisor, + event_bus_tx, + )); +} + +pub(crate) async fn handle_pause_vm_command( + repository: &VmRepository, + req: PauseVmRequest, + responder: oneshot::Sender>, + hypervisor: Arc, + event_bus_tx: mpsc::Sender, +) { + let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await { + Ok(result) => result, + Err(e) => { + let _ = responder.send(Err(e)); + return; + } + }; + + let current_state = record.status.state; + if current_state != VmState::Running { + let _ = responder.send(Err(VmServiceError::InvalidState(format!( + "Cannot pause VM in {current_state:?} state. Must be in Running." + )))); + return; + } + + tokio::spawn(worker::handle_pause_vm( + req, + responder, + hypervisor, + event_bus_tx, + )); +} + +pub(crate) async fn handle_resume_vm_command( + repository: &VmRepository, + req: ResumeVmRequest, + responder: oneshot::Sender>, + hypervisor: Arc, + event_bus_tx: mpsc::Sender, +) { + let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await { + Ok(result) => result, + Err(e) => { + let _ = responder.send(Err(e)); + return; + } + }; + + let current_state = record.status.state; + if current_state != VmState::Paused { + let _ = responder.send(Err(VmServiceError::InvalidState(format!( + "Cannot resume VM in {current_state:?} state. Must be in Paused." + )))); + return; + } + + tokio::spawn(worker::handle_resume_vm( + req, + responder, + hypervisor, + event_bus_tx, + )); +} + +pub(crate) async fn handle_attach_disk_command( + repository: &VmRepository, + req: AttachDiskRequest, + responder: oneshot::Sender>, + hypervisor: Arc, +) { + let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await { + Ok(result) => result, + Err(e) => { + let _ = responder.send(Err(e)); + return; + } + }; + + let current_state = record.status.state; + if !matches!( + current_state, + VmState::Created | VmState::Running | VmState::Paused | VmState::Stopped + ) { + let _ = responder.send(Err(VmServiceError::InvalidState(format!( + "Cannot attach disk to VM in {current_state:?} state." + )))); + return; + } + + tokio::spawn(worker::handle_attach_disk(req, responder, hypervisor)); +} + +pub(crate) async fn handle_remove_disk_command( + repository: &VmRepository, + req: RemoveDiskRequest, + responder: oneshot::Sender>, + hypervisor: Arc, +) { + let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await { + Ok(result) => result, + Err(e) => { + let _ = responder.send(Err(e)); + return; + } + }; + + let current_state = record.status.state; + if !matches!( + current_state, + VmState::Created | VmState::Running | VmState::Paused | VmState::Stopped + ) { + let _ = responder.send(Err(VmServiceError::InvalidState(format!( + "Cannot remove disk from VM in {current_state:?} state." + )))); + return; + } + + tokio::spawn(worker::handle_remove_disk(req, responder, hypervisor)); +} + pub(crate) async fn check_and_cleanup_vms( repository: &VmRepository, hypervisor: Arc, diff --git a/feos/services/vm-service/src/error.rs b/feos/services/vm-service/src/error.rs index b09121c..cd0659f 100644 --- a/feos/services/vm-service/src/error.rs +++ b/feos/services/vm-service/src/error.rs @@ -20,6 +20,9 @@ pub enum VmServiceError { #[error("VM with ID {0} already exists")] AlreadyExists(String), + + #[error("Invalid VM state for operation: {0}")] + InvalidState(String), } impl From for Status { @@ -38,6 +41,7 @@ impl From for Status { } VmServiceError::InvalidArgument(msg) => Status::invalid_argument(msg), VmServiceError::AlreadyExists(msg) => Status::already_exists(msg), + VmServiceError::InvalidState(msg) => Status::failed_precondition(msg), } } } diff --git a/feos/services/vm-service/src/worker.rs b/feos/services/vm-service/src/worker.rs index ade2e3b..6d1f4e6 100644 --- a/feos/services/vm-service/src/worker.rs +++ b/feos/services/vm-service/src/worker.rs @@ -8,10 +8,10 @@ use crate::{ use feos_proto::{ image_service::{ImageState as OciImageState, WatchImageStatusRequest}, vm_service::{ - stream_vm_console_request as console_input, AttachConsoleMessage, AttachDiskRequest, - AttachDiskResponse, ConsoleData, CreateVmRequest, CreateVmResponse, DeleteVmRequest, - DeleteVmResponse, GetVmRequest, PauseVmRequest, PauseVmResponse, PingVmRequest, - PingVmResponse, RemoveDiskRequest, RemoveDiskResponse, ResumeVmRequest, ResumeVmResponse, + stream_vm_console_request as console_input, AttachDiskRequest, AttachDiskResponse, + ConsoleData, CreateVmRequest, CreateVmResponse, DeleteVmRequest, DeleteVmResponse, + GetVmRequest, PauseVmRequest, PauseVmResponse, PingVmRequest, PingVmResponse, + RemoveDiskRequest, RemoveDiskResponse, ResumeVmRequest, ResumeVmResponse, ShutdownVmRequest, ShutdownVmResponse, StartVmRequest, StartVmResponse, StreamVmConsoleRequest, StreamVmConsoleResponse, StreamVmEventsRequest, VmEvent, VmInfo, VmState, VmStateChangedEvent, @@ -179,7 +179,7 @@ pub async fn handle_start_vm( responder: oneshot::Sender>, hypervisor: Arc, broadcast_tx: mpsc::Sender, - cancel_bus: broadcast::Receiver, + cancel_bus: Option>, ) { let vm_id = req.vm_id.clone(); let result = hypervisor.start_vm(req).await; @@ -197,7 +197,9 @@ pub async fn handle_start_vm( ) .await; - start_healthcheck_monitor(vm_id, hypervisor, broadcast_tx, cancel_bus); + if let Some(cancel_bus) = cancel_bus { + start_healthcheck_monitor(vm_id, hypervisor, broadcast_tx, cancel_bus); + } } if responder.send(result.map_err(Into::into)).is_err() { @@ -295,19 +297,12 @@ pub async fn handle_delete_vm( } } -pub async fn handle_stream_vm_console( - mut input_stream: Streaming, +pub async fn spawn_console_bridge( + vm_id: String, + input_stream: Streaming, output_tx: mpsc::Sender>, hypervisor: Arc, ) { - let vm_id = match get_attach_message(&mut input_stream).await { - Ok(id) => id, - Err(status) => { - let _ = output_tx.send(Err(status)).await; - return; - } - }; - let socket_path = match hypervisor.get_console_socket_path(&vm_id).await { Ok(path) => path, Err(e) => { @@ -436,23 +431,6 @@ pub async fn handle_remove_disk( } } -async fn get_attach_message( - stream: &mut Streaming, -) -> Result { - match stream.next().await { - Some(Ok(msg)) => match msg.payload { - Some(console_input::Payload::Attach(AttachConsoleMessage { vm_id })) => Ok(vm_id), - _ => Err(Status::invalid_argument( - "First message must be an Attach message.", - )), - }, - Some(Err(e)) => Err(e), - None => Err(Status::invalid_argument( - "Client disconnected before sending Attach message.", - )), - } -} - async fn bridge_console_streams( socket_path: PathBuf, mut grpc_input: Streaming, diff --git a/feos/tests/integration_tests.rs b/feos/tests/integration_tests.rs index 34d6bc6..8e05636 100644 --- a/feos/tests/integration_tests.rs +++ b/feos/tests/integration_tests.rs @@ -12,9 +12,11 @@ use feos_proto::{ ListImagesRequest, PullImageRequest, WatchImageStatusRequest, }, vm_service::{ - vm_service_client::VmServiceClient, CpuConfig, CreateVmRequest, DeleteVmRequest, - GetVmRequest, MemoryConfig, PingVmRequest, StartVmRequest, StreamVmEventsRequest, VmConfig, - VmEvent, VmState, VmStateChangedEvent, + stream_vm_console_request as console_input, vm_service_client::VmServiceClient, + AttachConsoleMessage, CpuConfig, CreateVmRequest, DeleteVmRequest, GetVmRequest, + MemoryConfig, PauseVmRequest, PingVmRequest, ResumeVmRequest, ShutdownVmRequest, + StartVmRequest, StreamVmConsoleRequest, StreamVmEventsRequest, VmConfig, VmEvent, VmState, + VmStateChangedEvent, }, }; use hyper_util::rt::TokioIo; @@ -158,7 +160,7 @@ impl Drop for VmGuard { } } -async fn wait_for_vm_state( +async fn wait_for_target_state( stream: &mut tonic::Streaming, target_state: VmState, ) -> Result<()> { @@ -192,34 +194,6 @@ async fn wait_for_vm_state( )) } -async fn wait_for_target_state( - stream: &mut tonic::Streaming, - target_state: VmState, -) -> Result<()> { - while let Some(event_res) = stream.next().await { - let event = event_res?; - let any_data = event.data.expect("Event should have data payload"); - if any_data.type_url == "type.googleapis.com/feos.vm.vmm.api.v1.VmStateChangedEvent" { - let state_change = VmStateChangedEvent::decode(&*any_data.value)?; - let new_state = - VmState::try_from(state_change.new_state).unwrap_or(VmState::Unspecified); - - info!( - "Received VM state change event: new_state={:?}, reason='{}'", - new_state, state_change.reason - ); - - if new_state == target_state { - return Ok(()); - } - } - } - Err(anyhow::anyhow!( - "Event stream ended before VM reached {:?} state.", - target_state - )) -} - #[tokio::test] async fn test_create_and_start_vm() -> Result<()> { if skip_if_ch_binary_missing() { @@ -260,6 +234,16 @@ async fn test_create_and_start_vm() -> Result<()> { cleanup_disabled: false, }; + info!( + "Immediately calling StartVm for vm_id: {}, expecting error", + &vm_id + ); + let start_req = StartVmRequest { + vm_id: vm_id.clone(), + }; + let result = vm_client.start_vm(start_req.clone()).await; + assert!(result.is_err(), "StartVm should fail when VM is Creating"); + info!("Connecting to StreamVmEvents stream for vm_id: {}", &vm_id); let events_req = StreamVmEventsRequest { vm_id: Some(vm_id.clone()), @@ -269,25 +253,139 @@ async fn test_create_and_start_vm() -> Result<()> { timeout( Duration::from_secs(180), - wait_for_vm_state(&mut stream, VmState::Created), + wait_for_target_state(&mut stream, VmState::Created), ) .await .expect("Timed out waiting for VM to become created")?; info!("VM is in CREATED state"); info!("Sending StartVm request for vm_id: {}", &vm_id); - let start_req = StartVmRequest { + vm_client.start_vm(start_req.clone()).await?; + + timeout( + Duration::from_secs(30), + wait_for_target_state(&mut stream, VmState::Running), + ) + .await + .expect("Timed out waiting for VM to become running")?; + info!("VM is in RUNNING state"); + + info!("Sending ShutdownVm request for vm_id: {}", &vm_id); + let shutdown_req = ShutdownVmRequest { + vm_id: vm_id.clone(), + }; + vm_client.shutdown_vm(shutdown_req).await?; + + timeout( + Duration::from_secs(30), + wait_for_target_state(&mut stream, VmState::Stopped), + ) + .await + .expect("Timed out waiting for VM to become stopped")?; + info!("VM is in STOPPED state"); + + info!( + "Calling ResumeVm in Stopped state for vm_id: {}, expecting error", + &vm_id + ); + let resume_req = ResumeVmRequest { + vm_id: vm_id.clone(), + }; + let result = vm_client.resume_vm(resume_req.clone()).await; + assert!(result.is_err(), "ResumeVm should fail when VM is Stopped"); + + info!( + "Calling StreamVmConsole in Stopped state for vm_id: {}, expecting error", + &vm_id + ); + let (console_tx, console_rx) = tokio::sync::mpsc::channel(1); + let console_stream = tokio_stream::wrappers::ReceiverStream::new(console_rx); + + let attach_payload = console_input::Payload::Attach(AttachConsoleMessage { vm_id: vm_id.clone(), + }); + let attach_input = StreamVmConsoleRequest { + payload: Some(attach_payload), }; + + console_tx + .send(attach_input) + .await + .expect("Failed to send attach message"); + + let response = vm_client.stream_vm_console(console_stream).await; + assert!( + response.is_ok(), + "StreamVmConsole should establish stream successfully" + ); + + let mut output_stream = response.unwrap().into_inner(); + + let stream_result = output_stream.next().await; + match stream_result { + Some(Err(status)) => { + info!( + "Received expected error from console stream: {}", + status.message() + ); + assert!( + status.message().contains("Invalid VM state") + || status.message().contains("Stopped"), + "Error should be about invalid VM state, got: {}", + status.message() + ); + } + Some(Ok(_)) => { + panic!("StreamVmConsole should fail when VM is Stopped, but got success response") + } + None => panic!("StreamVmConsole stream ended unexpectedly without error"), + } + + info!("Sending StartVm request again for vm_id: {}", &vm_id); vm_client.start_vm(start_req).await?; timeout( Duration::from_secs(30), - wait_for_vm_state(&mut stream, VmState::Running), + wait_for_target_state(&mut stream, VmState::Running), ) .await - .expect("Timed out waiting for VM to become running")?; - info!("VM is in RUNNING state"); + .expect("Timed out waiting for VM to become running again")?; + info!("VM is in RUNNING state again"); + + info!("Sending PauseVm request for vm_id: {}", &vm_id); + let pause_req = PauseVmRequest { + vm_id: vm_id.clone(), + }; + vm_client.pause_vm(pause_req).await?; + + timeout( + Duration::from_secs(30), + wait_for_target_state(&mut stream, VmState::Paused), + ) + .await + .expect("Timed out waiting for VM to become paused")?; + info!("VM is in PAUSED state"); + + info!( + "Calling StartVm in Paused state for vm_id: {}, expecting error", + &vm_id + ); + let start_req_paused = StartVmRequest { + vm_id: vm_id.clone(), + }; + let result = vm_client.start_vm(start_req_paused).await; + assert!(result.is_err(), "StartVm should fail when VM is Paused"); + + info!("Sending ResumeVm request for vm_id: {}", &vm_id); + vm_client.resume_vm(resume_req).await?; + + timeout( + Duration::from_secs(30), + wait_for_target_state(&mut stream, VmState::Running), + ) + .await + .expect("Timed out waiting for VM to become running after resume")?; + info!("VM is in RUNNING state after resume"); let get_req = GetVmRequest { vm_id: vm_id.clone(), @@ -375,7 +473,7 @@ async fn test_vm_healthcheck_and_crash_recovery() -> Result<()> { timeout( Duration::from_secs(180), - wait_for_vm_state(&mut stream, VmState::Created), + wait_for_target_state(&mut stream, VmState::Created), ) .await .expect("Timed out waiting for VM to become created")?; @@ -389,7 +487,7 @@ async fn test_vm_healthcheck_and_crash_recovery() -> Result<()> { timeout( Duration::from_secs(30), - wait_for_vm_state(&mut stream, VmState::Running), + wait_for_target_state(&mut stream, VmState::Running), ) .await .expect("Timed out waiting for VM to become running")?; @@ -419,6 +517,42 @@ async fn test_vm_healthcheck_and_crash_recovery() -> Result<()> { .expect("Timed out waiting for VM to enter Crashed state")?; info!("VM is in CRASHED state as expected"); + info!("Verifying API calls fail for crashed VM: {}", &vm_id); + + let start_req = StartVmRequest { + vm_id: vm_id.clone(), + }; + assert!( + vm_client.start_vm(start_req).await.is_err(), + "StartVm should fail for a crashed VM" + ); + + let pause_req = PauseVmRequest { + vm_id: vm_id.clone(), + }; + assert!( + vm_client.pause_vm(pause_req).await.is_err(), + "PauseVm should fail for a crashed VM" + ); + + let shutdown_req = ShutdownVmRequest { + vm_id: vm_id.clone(), + }; + assert!( + vm_client.shutdown_vm(shutdown_req).await.is_err(), + "ShutdownVm should fail for a crashed VM" + ); + + let resume_req = ResumeVmRequest { + vm_id: vm_id.clone(), + }; + assert!( + vm_client.resume_vm(resume_req).await.is_err(), + "ResumeVm should fail for a crashed VM" + ); + + info!("API call failure checks passed for crashed VM"); + info!("Deleting crashed VM: {}", &vm_id); let delete_req = DeleteVmRequest { vm_id: vm_id.clone(),