Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions feos/services/vm-service/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

use crate::{
dispatcher_handlers::{
handle_create_vm_command, handle_delete_vm_command, handle_get_vm_command,
handle_list_vms_command, handle_stream_vm_events_command, perform_startup_sanity_check,
handle_attach_disk_command, handle_create_vm_command, handle_delete_vm_command,
handle_get_vm_command, handle_list_vms_command, handle_pause_vm_command,
handle_remove_disk_command, handle_resume_vm_command, handle_shutdown_vm_command,
handle_start_vm_command, handle_stream_vm_console_command, handle_stream_vm_events_command,
perform_startup_sanity_check,
},
error::VmServiceError,
persistence::repository::VmRepository,
Expand Down Expand Up @@ -65,15 +68,13 @@ impl VmServiceDispatcher {
let hypervisor = self.hypervisor.clone();
let event_bus_tx = self.event_bus_tx.clone();
let status_channel_tx = self.status_channel_tx.clone();
let healthcheck_cancel_bus_tx = self.healthcheck_cancel_bus.clone();

match cmd {
Command::CreateVm(req, responder) => {
handle_create_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await;
}
Command::StartVm(req, responder) => {
let cancel_bus = healthcheck_cancel_bus_tx.subscribe();
tokio::spawn(worker::handle_start_vm(req, responder, hypervisor, event_bus_tx, cancel_bus));
handle_start_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx, &self.healthcheck_cancel_bus).await;
}
Command::GetVm(req, responder) => {
handle_get_vm_command(&self.repository, req, responder).await;
Expand All @@ -85,7 +86,7 @@ impl VmServiceDispatcher {
handle_delete_vm_command(&self.repository, &self.healthcheck_cancel_bus, req, responder, hypervisor, event_bus_tx).await;
}
Command::StreamVmConsole(input_stream, output_tx) => {
tokio::spawn(worker::handle_stream_vm_console(*input_stream, output_tx, hypervisor));
handle_stream_vm_console_command(&self.repository, *input_stream, output_tx, hypervisor).await;
}
Command::ListVms(req, responder) => {
handle_list_vms_command(&self.repository, req, responder).await;
Expand All @@ -94,19 +95,19 @@ impl VmServiceDispatcher {
tokio::spawn(worker::handle_ping_vm(req, responder, hypervisor));
}
Command::ShutdownVm(req, responder) => {
tokio::spawn(worker::handle_shutdown_vm(req, responder, hypervisor, event_bus_tx));
handle_shutdown_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await;
}
Command::PauseVm(req, responder) => {
tokio::spawn(worker::handle_pause_vm(req, responder, hypervisor, event_bus_tx));
handle_pause_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await;
}
Command::ResumeVm(req, responder) => {
tokio::spawn(worker::handle_resume_vm(req, responder, hypervisor, event_bus_tx));
handle_resume_vm_command(&self.repository, req, responder, hypervisor, event_bus_tx).await;
}
Command::AttachDisk(req, responder) => {
tokio::spawn(worker::handle_attach_disk(req, responder, hypervisor));
handle_attach_disk_command(&self.repository, req, responder, hypervisor).await;
}
Command::RemoveDisk(req, responder) => {
tokio::spawn(worker::handle_remove_disk(req, responder, hypervisor));
handle_remove_disk_command(&self.repository, req, responder, hypervisor).await;
}
}
},
Expand Down
281 changes: 277 additions & 4 deletions feos/services/vm-service/src/dispatcher_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ use crate::{
use feos_proto::{
image_service::{image_service_client::ImageServiceClient, PullImageRequest},
vm_service::{
CreateVmRequest, CreateVmResponse, DeleteVmRequest, DeleteVmResponse, GetVmRequest,
ListVmsRequest, ListVmsResponse, StreamVmEventsRequest, VmEvent, VmInfo, VmState,
VmStateChangedEvent,
stream_vm_console_request as console_input, AttachConsoleMessage, AttachDiskRequest,
AttachDiskResponse, CreateVmRequest, CreateVmResponse, DeleteVmRequest, DeleteVmResponse,
GetVmRequest, ListVmsRequest, ListVmsResponse, PauseVmRequest, PauseVmResponse,
RemoveDiskRequest, RemoveDiskResponse, ResumeVmRequest, ResumeVmResponse,
ShutdownVmRequest, ShutdownVmResponse, StartVmRequest, StartVmResponse,
StreamVmConsoleRequest, StreamVmConsoleResponse, StreamVmEventsRequest, VmEvent, VmInfo,
VmState, VmStateChangedEvent,
},
};
use hyper_util::rt::TokioIo;
Expand All @@ -23,9 +27,10 @@ use prost::Message;
use prost_types::Any;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::StreamExt;
use tonic::{
transport::{Channel, Endpoint, Error as TonicTransportError, Uri},
Status,
Status, Streaming,
};
use tower::service_fn;
use uuid::Uuid;
Expand Down Expand Up @@ -142,6 +147,21 @@ async fn get_vm_info(
}
}

async fn parse_vm_id_and_get_record(
vm_id_str: &str,
repository: &VmRepository,
) -> Result<(Uuid, VmRecord), VmServiceError> {
let vm_id = Uuid::parse_str(vm_id_str)
.map_err(|_| VmServiceError::InvalidArgument("Invalid VM ID format.".to_string()))?;

match repository.get_vm(vm_id).await? {
Some(record) => Ok((vm_id, record)),
None => Err(VmServiceError::Vmm(crate::vmm::VmmError::VmNotFound(
vm_id.to_string(),
))),
}
}

pub(crate) async fn handle_create_vm_command(
repository: &VmRepository,
req: CreateVmRequest,
Expand Down Expand Up @@ -385,6 +405,71 @@ pub(crate) async fn handle_delete_vm_command(
}
}

async fn get_attach_message(
stream: &mut Streaming<StreamVmConsoleRequest>,
) -> Result<String, Status> {
match stream.next().await {
Some(Ok(msg)) => match msg.payload {
Some(console_input::Payload::Attach(AttachConsoleMessage { vm_id })) => Ok(vm_id),
_ => Err(Status::invalid_argument(
"First message must be an Attach message.",
)),
},
Some(Err(e)) => Err(e),
None => Err(Status::invalid_argument(
"Client disconnected before sending Attach message.",
)),
}
}

pub(crate) async fn handle_stream_vm_console_command(
repository: &VmRepository,
mut input_stream: Streaming<StreamVmConsoleRequest>,
output_tx: mpsc::Sender<Result<StreamVmConsoleResponse, Status>>,
hypervisor: Arc<dyn Hypervisor>,
) {
let vm_id_str = match get_attach_message(&mut input_stream).await {
Ok(id) => id,
Err(status) => {
let _ = output_tx.send(Err(status)).await;
return;
}
};

let (_vm_id, record) = match parse_vm_id_and_get_record(&vm_id_str, repository).await {
Ok(result) => result,
Err(e) => {
if output_tx.send(Err(e.into())).await.is_err() {
warn!(
"StreamConsole: Client for {vm_id_str} disconnected before error could be sent."
);
}
return;
}
};

if record.status.state != VmState::Running {
let status = VmServiceError::InvalidState(format!(
"Cannot open console for VM in {:?} state. Must be in Running.",
record.status.state
))
.into();
if output_tx.send(Err(status)).await.is_err() {
warn!(
"StreamConsole: Client for {vm_id_str} disconnected before precondition error could be sent."
);
}
return;
}

tokio::spawn(worker::spawn_console_bridge(
vm_id_str,
input_stream,
output_tx,
hypervisor,
));
}

pub(crate) async fn handle_list_vms_command(
repository: &VmRepository,
_req: ListVmsRequest,
Expand All @@ -407,6 +492,194 @@ pub(crate) async fn handle_list_vms_command(
}
}

pub(crate) async fn handle_start_vm_command(
repository: &VmRepository,
req: StartVmRequest,
responder: oneshot::Sender<Result<StartVmResponse, VmServiceError>>,
hypervisor: Arc<dyn Hypervisor>,
event_bus_tx: mpsc::Sender<VmEventWrapper>,
healthcheck_cancel_bus_tx: &broadcast::Sender<Uuid>,
) {
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
Ok(result) => result,
Err(e) => {
let _ = responder.send(Err(e));
return;
}
};

let current_state = record.status.state;
if !matches!(current_state, VmState::Created | VmState::Stopped) {
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
"Cannot start VM in {current_state:?} state. Must be in Created or Stopped."
))));
return;
}

let cancel_bus = if current_state == VmState::Stopped {
None
} else {
Some(healthcheck_cancel_bus_tx.subscribe())
};

tokio::spawn(worker::handle_start_vm(
req,
responder,
hypervisor,
event_bus_tx,
cancel_bus,
));
}

pub(crate) async fn handle_shutdown_vm_command(
repository: &VmRepository,
req: ShutdownVmRequest,
responder: oneshot::Sender<Result<ShutdownVmResponse, VmServiceError>>,
hypervisor: Arc<dyn Hypervisor>,
event_bus_tx: mpsc::Sender<VmEventWrapper>,
) {
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
Ok(result) => result,
Err(e) => {
let _ = responder.send(Err(e));
return;
}
};

let current_state = record.status.state;
if current_state != VmState::Running {
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
"Cannot shutdown VM in {current_state:?} state. Must be in Running."
))));
return;
}

tokio::spawn(worker::handle_shutdown_vm(
req,
responder,
hypervisor,
event_bus_tx,
));
}

pub(crate) async fn handle_pause_vm_command(
repository: &VmRepository,
req: PauseVmRequest,
responder: oneshot::Sender<Result<PauseVmResponse, VmServiceError>>,
hypervisor: Arc<dyn Hypervisor>,
event_bus_tx: mpsc::Sender<VmEventWrapper>,
) {
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
Ok(result) => result,
Err(e) => {
let _ = responder.send(Err(e));
return;
}
};

let current_state = record.status.state;
if current_state != VmState::Running {
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
"Cannot pause VM in {current_state:?} state. Must be in Running."
))));
return;
}

tokio::spawn(worker::handle_pause_vm(
req,
responder,
hypervisor,
event_bus_tx,
));
}

pub(crate) async fn handle_resume_vm_command(
repository: &VmRepository,
req: ResumeVmRequest,
responder: oneshot::Sender<Result<ResumeVmResponse, VmServiceError>>,
hypervisor: Arc<dyn Hypervisor>,
event_bus_tx: mpsc::Sender<VmEventWrapper>,
) {
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
Ok(result) => result,
Err(e) => {
let _ = responder.send(Err(e));
return;
}
};

let current_state = record.status.state;
if current_state != VmState::Paused {
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
"Cannot resume VM in {current_state:?} state. Must be in Paused."
))));
return;
}

tokio::spawn(worker::handle_resume_vm(
req,
responder,
hypervisor,
event_bus_tx,
));
}

pub(crate) async fn handle_attach_disk_command(
repository: &VmRepository,
req: AttachDiskRequest,
responder: oneshot::Sender<Result<AttachDiskResponse, VmServiceError>>,
hypervisor: Arc<dyn Hypervisor>,
) {
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
Ok(result) => result,
Err(e) => {
let _ = responder.send(Err(e));
return;
}
};

let current_state = record.status.state;
if !matches!(
current_state,
VmState::Created | VmState::Running | VmState::Paused | VmState::Stopped
) {
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
"Cannot attach disk to VM in {current_state:?} state."
))));
return;
}

tokio::spawn(worker::handle_attach_disk(req, responder, hypervisor));
}

pub(crate) async fn handle_remove_disk_command(
repository: &VmRepository,
req: RemoveDiskRequest,
responder: oneshot::Sender<Result<RemoveDiskResponse, VmServiceError>>,
hypervisor: Arc<dyn Hypervisor>,
) {
let (_vm_id, record) = match parse_vm_id_and_get_record(&req.vm_id, repository).await {
Ok(result) => result,
Err(e) => {
let _ = responder.send(Err(e));
return;
}
};

let current_state = record.status.state;
if !matches!(
current_state,
VmState::Created | VmState::Running | VmState::Paused | VmState::Stopped
) {
let _ = responder.send(Err(VmServiceError::InvalidState(format!(
"Cannot remove disk from VM in {current_state:?} state."
))));
return;
}

tokio::spawn(worker::handle_remove_disk(req, responder, hypervisor));
}

pub(crate) async fn check_and_cleanup_vms(
repository: &VmRepository,
hypervisor: Arc<dyn Hypervisor>,
Expand Down
Loading
Loading