Skip to content

Commit 9bdecc7

Browse files
authored
Enhance VM-api (#87)
* Improve vm-service api so that it returns better errors Signed-off-by: Guvenc Gulce <[email protected]> * Enhance automated tests for more vm state transition tests Signed-off-by: Guvenc Gulce <[email protected]> * Check console_vm API call precondition Signed-off-by: Guvenc Gulce <[email protected]> * Start vm health_check monitor only if the vm was in created state Signed-off-by: Guvenc Gulce <[email protected]> * Add test for console_vm precondition check Signed-off-by: Guvenc Gulce <[email protected]> --------- Signed-off-by: Guvenc Gulce <[email protected]>
1 parent a3ca586 commit 9bdecc7

File tree

5 files changed

+477
-87
lines changed

5 files changed

+477
-87
lines changed

feos/services/vm-service/src/dispatcher.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33

44
use crate::{
55
dispatcher_handlers::{
6-
handle_create_vm_command, handle_delete_vm_command, handle_get_vm_command,
7-
handle_list_vms_command, handle_stream_vm_events_command, perform_startup_sanity_check,
6+
handle_attach_disk_command, handle_create_vm_command, handle_delete_vm_command,
7+
handle_get_vm_command, handle_list_vms_command, handle_pause_vm_command,
8+
handle_remove_disk_command, handle_resume_vm_command, handle_shutdown_vm_command,
9+
handle_start_vm_command, handle_stream_vm_console_command, handle_stream_vm_events_command,
10+
perform_startup_sanity_check,
811
},
912
error::VmServiceError,
1013
persistence::repository::VmRepository,
@@ -65,15 +68,13 @@ impl VmServiceDispatcher {
6568
let hypervisor = self.hypervisor.clone();
6669
let event_bus_tx = self.event_bus_tx.clone();
6770
let status_channel_tx = self.status_channel_tx.clone();
68-
let healthcheck_cancel_bus_tx = self.healthcheck_cancel_bus.clone();
6971

7072
match cmd {
7173
Command::CreateVm(req, responder) => {
7274
handle_create_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await;
7375
}
7476
Command::StartVm(req, responder) => {
75-
let cancel_bus = healthcheck_cancel_bus_tx.subscribe();
76-
tokio::spawn(worker::handle_start_vm(req, responder, hypervisor, event_bus_tx, cancel_bus));
77+
handle_start_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx, &self.healthcheck_cancel_bus).await;
7778
}
7879
Command::GetVm(req, responder) => {
7980
handle_get_vm_command(&self.repository, req, responder).await;
@@ -85,7 +86,7 @@ impl VmServiceDispatcher {
8586
handle_delete_vm_command(&self.repository, &self.healthcheck_cancel_bus, req, responder, hypervisor, event_bus_tx).await;
8687
}
8788
Command::StreamVmConsole(input_stream, output_tx) => {
88-
tokio::spawn(worker::handle_stream_vm_console(*input_stream, output_tx, hypervisor));
89+
handle_stream_vm_console_command(&self.repository, *input_stream, output_tx, hypervisor).await;
8990
}
9091
Command::ListVms(req, responder) => {
9192
handle_list_vms_command(&self.repository, req, responder).await;
@@ -94,19 +95,19 @@ impl VmServiceDispatcher {
9495
tokio::spawn(worker::handle_ping_vm(req, responder, hypervisor));
9596
}
9697
Command::ShutdownVm(req, responder) => {
97-
tokio::spawn(worker::handle_shutdown_vm(req, responder, hypervisor, event_bus_tx));
98+
handle_shutdown_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await;
9899
}
99100
Command::PauseVm(req, responder) => {
100-
tokio::spawn(worker::handle_pause_vm(req, responder, hypervisor, event_bus_tx));
101+
handle_pause_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await;
101102
}
102103
Command::ResumeVm(req, responder) => {
103-
tokio::spawn(worker::handle_resume_vm(req, responder, hypervisor, event_bus_tx));
104+
handle_resume_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await;
104105
}
105106
Command::AttachDisk(req, responder) => {
106-
tokio::spawn(worker::handle_attach_disk(req, responder, hypervisor));
107+
handle_attach_disk_command(&self.repository, req, responder, hypervisor).await;
107108
}
108109
Command::RemoveDisk(req, responder) => {
109-
tokio::spawn(worker::handle_remove_disk(req, responder, hypervisor));
110+
handle_remove_disk_command(&self.repository, req, responder, hypervisor).await;
110111
}
111112
}
112113
},

feos/services/vm-service/src/dispatcher_handlers.rs

Lines changed: 277 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@ use crate::{
1010
use feos_proto::{
1111
image_service::{image_service_client::ImageServiceClient, PullImageRequest},
1212
vm_service::{
13-
CreateVmRequest, CreateVmResponse, DeleteVmRequest, DeleteVmResponse, GetVmRequest,
14-
ListVmsRequest, ListVmsResponse, StreamVmEventsRequest, VmEvent, VmInfo, VmState,
15-
VmStateChangedEvent,
13+
stream_vm_console_request as console_input, AttachConsoleMessage, AttachDiskRequest,
14+
AttachDiskResponse, CreateVmRequest, CreateVmResponse, DeleteVmRequest, DeleteVmResponse,
15+
GetVmRequest, ListVmsRequest, ListVmsResponse, PauseVmRequest, PauseVmResponse,
16+
RemoveDiskRequest, RemoveDiskResponse, ResumeVmRequest, ResumeVmResponse,
17+
ShutdownVmRequest, ShutdownVmResponse, StartVmRequest, StartVmResponse,
18+
StreamVmConsoleRequest, StreamVmConsoleResponse, StreamVmEventsRequest, VmEvent, VmInfo,
19+
VmState, VmStateChangedEvent,
1620
},
1721
};
1822
use hyper_util::rt::TokioIo;
@@ -23,9 +27,10 @@ use prost::Message;
2327
use prost_types::Any;
2428
use std::{path::PathBuf, sync::Arc};
2529
use tokio::sync::{broadcast, mpsc, oneshot};
30+
use tokio_stream::StreamExt;
2631
use tonic::{
2732
transport::{Channel, Endpoint, Error as TonicTransportError, Uri},
28-
Status,
33+
Status, Streaming,
2934
};
3035
use tower::service_fn;
3136
use uuid::Uuid;
@@ -142,6 +147,21 @@ async fn get_vm_info(
142147
}
143148
}
144149

150+
async fn parse_vm_id_and_get_record(
151+
vm_id_str: &str,
152+
repository: &VmRepository,
153+
) -> Result<(Uuid, VmRecord), VmServiceError> {
154+
let vm_id = Uuid::parse_str(vm_id_str)
155+
.map_err(|_| VmServiceError::InvalidArgument("Invalid VM ID format.".to_string()))?;
156+
157+
match repository.get_vm(vm_id).await? {
158+
Some(record) => Ok((vm_id, record)),
159+
None => Err(VmServiceError::Vmm(crate::vmm::VmmError::VmNotFound(
160+
vm_id.to_string(),
161+
))),
162+
}
163+
}
164+
145165
pub(crate) async fn handle_create_vm_command(
146166
repository: &VmRepository,
147167
req: CreateVmRequest,
@@ -385,6 +405,71 @@ pub(crate) async fn handle_delete_vm_command(
385405
}
386406
}
387407

408+
async fn get_attach_message(
409+
stream: &mut Streaming<StreamVmConsoleRequest>,
410+
) -> Result<String, Status> {
411+
match stream.next().await {
412+
Some(Ok(msg)) => match msg.payload {
413+
Some(console_input::Payload::Attach(AttachConsoleMessage { vm_id })) => Ok(vm_id),
414+
_ => Err(Status::invalid_argument(
415+
"First message must be an Attach message.",
416+
)),
417+
},
418+
Some(Err(e)) => Err(e),
419+
None => Err(Status::invalid_argument(
420+
"Client disconnected before sending Attach message.",
421+
)),
422+
}
423+
}
424+
425+
pub(crate) async fn handle_stream_vm_console_command(
426+
repository: &VmRepository,
427+
mut input_stream: Streaming<StreamVmConsoleRequest>,
428+
output_tx: mpsc::Sender<Result<StreamVmConsoleResponse, Status>>,
429+
hypervisor: Arc<dyn Hypervisor>,
430+
) {
431+
let vm_id_str = match get_attach_message(&mut input_stream).await {
432+
Ok(id) => id,
433+
Err(status) => {
434+
let _ = output_tx.send(Err(status)).await;
435+
return;
436+
}
437+
};
438+
439+
let (_vm_id, record) = match parse_vm_id_and_get_record(&vm_id_str, repository).await {
440+
Ok(result) => result,
441+
Err(e) => {
442+
if output_tx.send(Err(e.into())).await.is_err() {
443+
warn!(
444+
"StreamConsole: Client for {vm_id_str} disconnected before error could be sent."
445+
);
446+
}
447+
return;
448+
}
449+
};
450+
451+
if record.status.state != VmState::Running {
452+
let status = VmServiceError::InvalidState(format!(
453+
"Cannot open console for VM in {:?} state. Must be in Running.",
454+
record.status.state
455+
))
456+
.into();
457+
if output_tx.send(Err(status)).await.is_err() {
458+
warn!(
459+
"StreamConsole: Client for {vm_id_str} disconnected before precondition error could be sent."
460+
);
461+
}
462+
return;
463+
}
464+
465+
tokio::spawn(worker::spawn_console_bridge(
466+
vm_id_str,
467+
input_stream,
468+
output_tx,
469+
hypervisor,
470+
));
471+
}
472+
388473
pub(crate) async fn handle_list_vms_command(
389474
repository: &VmRepository,
390475
_req: ListVmsRequest,
@@ -407,6 +492,194 @@ pub(crate) async fn handle_list_vms_command(
407492
}
408493
}
409494

495+
pub(crate) async fn handle_start_vm_command(
496+
repository: &VmRepository,
497+
req: StartVmRequest,
498+
responder: oneshot::Sender<Result<StartVmResponse, VmServiceError>>,
499+
hypervisor: Arc<dyn Hypervisor>,
500+
event_bus_tx: mpsc::Sender<VmEventWrapper>,
501+
healthcheck_cancel_bus_tx: &broadcast::Sender<Uuid>,
502+
) {
503+
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
504+
Ok(result) => result,
505+
Err(e) => {
506+
let _ = responder.send(Err(e));
507+
return;
508+
}
509+
};
510+
511+
let current_state = record.status.state;
512+
if !matches!(current_state, VmState::Created | VmState::Stopped) {
513+
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
514+
"Cannot start VM in {current_state:?} state. Must be in Created or Stopped."
515+
))));
516+
return;
517+
}
518+
519+
let cancel_bus = if current_state == VmState::Stopped {
520+
None
521+
} else {
522+
Some(healthcheck_cancel_bus_tx.subscribe())
523+
};
524+
525+
tokio::spawn(worker::handle_start_vm(
526+
req,
527+
responder,
528+
hypervisor,
529+
event_bus_tx,
530+
cancel_bus,
531+
));
532+
}
533+
534+
pub(crate) async fn handle_shutdown_vm_command(
535+
repository: &VmRepository,
536+
req: ShutdownVmRequest,
537+
responder: oneshot::Sender<Result<ShutdownVmResponse, VmServiceError>>,
538+
hypervisor: Arc<dyn Hypervisor>,
539+
event_bus_tx: mpsc::Sender<VmEventWrapper>,
540+
) {
541+
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
542+
Ok(result) => result,
543+
Err(e) => {
544+
let _ = responder.send(Err(e));
545+
return;
546+
}
547+
};
548+
549+
let current_state = record.status.state;
550+
if current_state != VmState::Running {
551+
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
552+
"Cannot shutdown VM in {current_state:?} state. Must be in Running."
553+
))));
554+
return;
555+
}
556+
557+
tokio::spawn(worker::handle_shutdown_vm(
558+
req,
559+
responder,
560+
hypervisor,
561+
event_bus_tx,
562+
));
563+
}
564+
565+
pub(crate) async fn handle_pause_vm_command(
566+
repository: &VmRepository,
567+
req: PauseVmRequest,
568+
responder: oneshot::Sender<Result<PauseVmResponse, VmServiceError>>,
569+
hypervisor: Arc<dyn Hypervisor>,
570+
event_bus_tx: mpsc::Sender<VmEventWrapper>,
571+
) {
572+
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
573+
Ok(result) => result,
574+
Err(e) => {
575+
let _ = responder.send(Err(e));
576+
return;
577+
}
578+
};
579+
580+
let current_state = record.status.state;
581+
if current_state != VmState::Running {
582+
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
583+
"Cannot pause VM in {current_state:?} state. Must be in Running."
584+
))));
585+
return;
586+
}
587+
588+
tokio::spawn(worker::handle_pause_vm(
589+
req,
590+
responder,
591+
hypervisor,
592+
event_bus_tx,
593+
));
594+
}
595+
596+
pub(crate) async fn handle_resume_vm_command(
597+
repository: &VmRepository,
598+
req: ResumeVmRequest,
599+
responder: oneshot::Sender<Result<ResumeVmResponse, VmServiceError>>,
600+
hypervisor: Arc<dyn Hypervisor>,
601+
event_bus_tx: mpsc::Sender<VmEventWrapper>,
602+
) {
603+
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
604+
Ok(result) => result,
605+
Err(e) => {
606+
let _ = responder.send(Err(e));
607+
return;
608+
}
609+
};
610+
611+
let current_state = record.status.state;
612+
if current_state != VmState::Paused {
613+
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
614+
"Cannot resume VM in {current_state:?} state. Must be in Paused."
615+
))));
616+
return;
617+
}
618+
619+
tokio::spawn(worker::handle_resume_vm(
620+
req,
621+
responder,
622+
hypervisor,
623+
event_bus_tx,
624+
));
625+
}
626+
627+
pub(crate) async fn handle_attach_disk_command(
628+
repository: &VmRepository,
629+
req: AttachDiskRequest,
630+
responder: oneshot::Sender<Result<AttachDiskResponse, VmServiceError>>,
631+
hypervisor: Arc<dyn Hypervisor>,
632+
) {
633+
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
634+
Ok(result) => result,
635+
Err(e) => {
636+
let _ = responder.send(Err(e));
637+
return;
638+
}
639+
};
640+
641+
let current_state = record.status.state;
642+
if !matches!(
643+
current_state,
644+
VmState::Created | VmState::Running | VmState::Paused | VmState::Stopped
645+
) {
646+
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
647+
"Cannot attach disk to VM in {current_state:?} state."
648+
))));
649+
return;
650+
}
651+
652+
tokio::spawn(worker::handle_attach_disk(req, responder, hypervisor));
653+
}
654+
655+
pub(crate) async fn handle_remove_disk_command(
656+
repository: &VmRepository,
657+
req: RemoveDiskRequest,
658+
responder: oneshot::Sender<Result<RemoveDiskResponse, VmServiceError>>,
659+
hypervisor: Arc<dyn Hypervisor>,
660+
) {
661+
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
662+
Ok(result) => result,
663+
Err(e) => {
664+
let _ = responder.send(Err(e));
665+
return;
666+
}
667+
};
668+
669+
let current_state = record.status.state;
670+
if !matches!(
671+
current_state,
672+
VmState::Created | VmState::Running | VmState::Paused | VmState::Stopped
673+
) {
674+
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
675+
"Cannot remove disk from VM in {current_state:?} state."
676+
))));
677+
return;
678+
}
679+
680+
tokio::spawn(worker::handle_remove_disk(req, responder, hypervisor));
681+
}
682+
410683
pub(crate) async fn check_and_cleanup_vms(
411684
repository: &VmRepository,
412685
hypervisor: Arc<dyn Hypervisor>,

0 commit comments

Comments
 (0)