Skip to content

Commit 8e7a39d

Browse files
committed
VM healthcheck async routine should be cancellable
Signed-off-by: Guvenc Gulce <[email protected]>
1 parent b557e9e commit 8e7a39d

File tree

4 files changed

+77
-31
lines changed

4 files changed

+77
-31
lines changed

feos/services/vm-service/src/dispatcher.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@ pub struct VmServiceDispatcher {
2727
status_channel_tx: broadcast::Sender<VmEventWrapper>,
2828
hypervisor: Arc<dyn Hypervisor>,
2929
repository: VmRepository,
30+
healthcheck_cancel_bus: broadcast::Sender<Uuid>,
3031
}
3132

3233
impl VmServiceDispatcher {
3334
pub async fn new(rx: mpsc::Receiver<Command>, db_url: &str) -> Result<Self> {
3435
let (event_bus_tx, event_bus_rx_for_dispatcher) = mpsc::channel(32);
3536
let (status_channel_tx, _) = broadcast::channel(32);
37+
let (healthcheck_cancel_bus, _) = broadcast::channel::<Uuid>(32);
3638
let hypervisor = Arc::from(factory(VmmType::CloudHypervisor));
3739
info!("VM_DISPATCHER: Connecting to persistence layer at {db_url}...");
3840
let repository = VmRepository::connect(db_url).await?;
@@ -44,6 +46,7 @@ impl VmServiceDispatcher {
4446
status_channel_tx,
4547
hypervisor,
4648
repository,
49+
healthcheck_cancel_bus,
4750
})
4851
}
4952

@@ -170,13 +173,15 @@ impl VmServiceDispatcher {
170173
let hypervisor = self.hypervisor.clone();
171174
let event_bus_tx = self.event_bus_tx.clone();
172175
let status_channel_tx = self.status_channel_tx.clone();
176+
let healthcheck_cancel_bus_tx = self.healthcheck_cancel_bus.clone();
173177

174178
match cmd {
175179
Command::CreateVm(req, responder) => {
176180
self.handle_create_vm_command(req, responder, hypervisor, event_bus_tx).await;
177181
}
178182
Command::StartVm(req, responder) => {
179-
tokio::spawn(worker::handle_start_vm(req, responder, hypervisor, event_bus_tx));
183+
let cancel_bus = healthcheck_cancel_bus_tx.subscribe();
184+
tokio::spawn(worker::handle_start_vm(req, responder, hypervisor, event_bus_tx, cancel_bus));
180185
}
181186
Command::GetVm(req, responder) => {
182187
self.handle_get_vm_command(req, responder).await;
@@ -536,6 +541,12 @@ impl VmServiceDispatcher {
536541
}
537542
info!("VM_DISPATCHER: Deleted record for VM {vm_id} from database.");
538543

544+
if let Err(e) = self.healthcheck_cancel_bus.send(vm_id) {
545+
warn!(
546+
"VM_DISPATCHER: Failed to send healthcheck cancellation for {vm_id}: {e}"
547+
);
548+
}
549+
539550
tokio::spawn(worker::handle_delete_vm(
540551
req,
541552
image_uuid_to_delete,
@@ -548,6 +559,13 @@ impl VmServiceDispatcher {
548559
Ok(None) => {
549560
let msg = format!("VM with ID {vm_id} not found in database for deletion");
550561
warn!("VM_DISPATCHER: {msg}. Still attempting hypervisor cleanup.");
562+
563+
if let Err(e) = self.healthcheck_cancel_bus.send(vm_id) {
564+
warn!(
565+
"VM_DISPATCHER: Failed to send healthcheck cancellation for {vm_id}: {e}"
566+
);
567+
}
568+
551569
tokio::spawn(worker::handle_delete_vm(
552570
req,
553571
String::new(),

feos/services/vm-service/src/vmm/ch_adapter.rs

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@ use feos_proto::vm_service::{
1515
};
1616
use hyper_util::client::legacy::Client;
1717
use hyperlocal::{UnixClientExt, UnixConnector, Uri as HyperlocalUri};
18-
use log::{info, warn};
18+
use log::{error, info, warn};
1919
use nix::sys::signal::{kill, Signal};
2020
use nix::unistd::{self, Pid};
2121
use std::io;
2222
use std::path::{Path, PathBuf};
2323
use std::sync::Arc;
2424
use tokio::process::Command as TokioCommand;
25-
use tokio::sync::mpsc;
25+
use tokio::sync::{broadcast, mpsc};
2626
use tokio::time::{self, timeout, Duration};
27+
use uuid::Uuid;
2728

2829
pub struct CloudHypervisorAdapter {
2930
ch_binary_path: PathBuf,
@@ -257,36 +258,56 @@ impl Hypervisor for CloudHypervisorAdapter {
257258
Ok(StartVmResponse {})
258259
}
259260

260-
async fn healthcheck_vm(&self, vm_id: String, broadcast_tx: mpsc::Sender<VmEventWrapper>) {
261+
async fn healthcheck_vm(
262+
&self,
263+
vm_id: String,
264+
broadcast_tx: mpsc::Sender<VmEventWrapper>,
265+
mut cancel_bus: broadcast::Receiver<Uuid>,
266+
) {
261267
info!("CH_ADAPTER ({vm_id}): Starting healthcheck monitoring.");
262268
let mut interval = time::interval(Duration::from_secs(10));
269+
let vm_id_uuid = match Uuid::parse_str(&vm_id) {
270+
Ok(id) => id,
271+
Err(e) => {
272+
error!("CH_ADAPTER ({vm_id}): Invalid UUID format, cannot start healthcheck: {e}");
273+
return;
274+
}
275+
};
263276

264277
loop {
265-
interval.tick().await;
266-
log::debug!("CH_ADAPTER ({vm_id}): Performing healthcheck ping.");
267-
let req = PingVmRequest {
268-
vm_id: vm_id.clone(),
269-
};
270-
271-
match self.ping_vm(req).await {
272-
Ok(_) => {
273-
log::debug!("CH_ADAPTER ({vm_id}): Healthcheck ping successful.");
278+
tokio::select! {
279+
_ = interval.tick() => {
280+
log::debug!("CH_ADAPTER ({vm_id}): Performing healthcheck ping.");
281+
let req = PingVmRequest {
282+
vm_id: vm_id.clone(),
283+
};
284+
285+
if let Err(e) = self.ping_vm(req).await {
286+
warn!("CH_ADAPTER ({vm_id}): Healthcheck failed: {e}. VM is considered unhealthy.");
287+
super::broadcast_state_change_event(
288+
&broadcast_tx,
289+
&vm_id,
290+
"vm-health-monitor",
291+
feos_proto::vm_service::VmStateChangedEvent {
292+
new_state: VmState::Crashed as i32,
293+
reason: format!("Healthcheck failed: {e}"),
294+
},
295+
None,
296+
)
297+
.await;
298+
break;
299+
} else {
300+
log::debug!("CH_ADAPTER ({vm_id}): Healthcheck ping successful.");
301+
}
302+
}
303+
Ok(cancelled_vm_id) = cancel_bus.recv() => {
304+
if cancelled_vm_id == vm_id_uuid {
305+
info!("CH_ADAPTER ({vm_id}): Received cancellation signal. Stopping healthcheck.");
306+
break;
307+
}
274308
}
275-
Err(e) => {
276-
warn!(
277-
"CH_ADAPTER ({vm_id}): Healthcheck failed: {e}. VM is considered unhealthy."
278-
);
279-
super::broadcast_state_change_event(
280-
&broadcast_tx,
281-
&vm_id,
282-
"vm-health-monitor",
283-
feos_proto::vm_service::VmStateChangedEvent {
284-
new_state: VmState::Crashed as i32,
285-
reason: format!("Healthcheck failed: {e}"),
286-
},
287-
None,
288-
)
289-
.await;
309+
else => {
310+
info!("CH_ADAPTER ({vm_id}): Healthcheck cancellation channel closed. Stopping healthcheck.");
290311
break;
291312
}
292313
}

feos/services/vm-service/src/vmm/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use feos_proto::vm_service::{
88
use prost::Message;
99
use prost_types::Any;
1010
use std::path::{Path, PathBuf};
11-
use tokio::sync::mpsc;
11+
use tokio::sync::{broadcast, mpsc};
1212
use tonic::Status;
1313
use uuid::Uuid;
1414

@@ -64,7 +64,12 @@ pub trait Hypervisor: Send + Sync {
6464

6565
async fn start_vm(&self, req: StartVmRequest) -> Result<StartVmResponse, VmmError>;
6666

67-
async fn healthcheck_vm(&self, vm_id: String, broadcast_tx: mpsc::Sender<VmEventWrapper>);
67+
async fn healthcheck_vm(
68+
&self,
69+
vm_id: String,
70+
broadcast_tx: mpsc::Sender<VmEventWrapper>,
71+
cancel_bus: broadcast::Receiver<Uuid>,
72+
);
6873

6974
async fn get_vm(&self, req: GetVmRequest) -> Result<VmInfo, VmmError>;
7075

feos/services/vm-service/src/worker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::sync::Arc;
1515
use tokio::sync::{broadcast, mpsc, oneshot};
1616
use tokio_stream::StreamExt;
1717
use tonic::{Status, Streaming};
18+
use uuid::Uuid;
1819

1920
async fn wait_for_image_ready(image_uuid: &str, image_ref: &str) -> Result<(), VmmError> {
2021
let mut client = vmservice_helper::get_image_service_client()
@@ -154,6 +155,7 @@ pub async fn handle_start_vm(
154155
responder: oneshot::Sender<Result<StartVmResponse, Status>>,
155156
hypervisor: Arc<dyn Hypervisor>,
156157
broadcast_tx: mpsc::Sender<VmEventWrapper>,
158+
cancel_bus: broadcast::Receiver<Uuid>,
157159
) {
158160
let vm_id = req.vm_id.clone();
159161
let result = hypervisor.start_vm(req).await;
@@ -175,7 +177,7 @@ pub async fn handle_start_vm(
175177
let health_broadcast_tx = broadcast_tx.clone();
176178
tokio::spawn(async move {
177179
health_hypervisor
178-
.healthcheck_vm(vm_id, health_broadcast_tx)
180+
.healthcheck_vm(vm_id, health_broadcast_tx, cancel_bus)
179181
.await;
180182
});
181183
}

0 commit comments

Comments
 (0)