Skip to content

Commit 3371498

Browse files
committed
Add sanity check (db, process comparison) to vm-service init
Signed-off-by: Guvenc Gulce <[email protected]>
1 parent e33a46f commit 3371498

File tree

2 files changed

+70
-7
lines changed

2 files changed

+70
-7
lines changed

feos/services/vm-service/src/dispatcher.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use feos_proto::{
1313
},
1414
};
1515
use log::{debug, error, info, warn};
16+
use nix::unistd::Pid;
1617
use prost::Message;
1718
use prost_types::Any;
1819
use std::sync::Arc;
@@ -165,6 +166,59 @@ impl VmServiceDispatcher {
165166
}
166167

167168
pub async fn run(mut self) {
169+
info!("VM_DISPATCHER: Running initial sanity check...");
170+
match self.repository.list_all_vms().await {
171+
Ok(vms) => {
172+
if vms.is_empty() {
173+
info!("VM_DISPATCHER (Sanity Check): No VMs found in persistence, check complete.");
174+
} else {
175+
info!("VM_DISPATCHER (Sanity Check): Found {} VMs in persistence, checking status...", vms.len());
176+
for vm in vms {
177+
if let Some(pid) = vm.status.process_id {
178+
let pid_obj = Pid::from_raw(pid as i32);
179+
let process_exists = nix::sys::signal::kill(pid_obj, None).is_ok();
180+
181+
if process_exists {
182+
info!("VM_DISPATCHER (Sanity Check): Found running VM {} (PID: {}) from previous session. Starting health monitor.", vm.vm_id, pid);
183+
let cancel_bus = self.healthcheck_cancel_bus.subscribe();
184+
worker::start_healthcheck_monitor(
185+
vm.vm_id.to_string(),
186+
self.hypervisor.clone(),
187+
self.event_bus_tx.clone(),
188+
cancel_bus,
189+
);
190+
} else {
191+
warn!("VM_DISPATCHER (Sanity Check): Found VM {} in DB with PID {}, but process does not exist. Cleaning up.", vm.vm_id, pid);
192+
let (resp_tx, resp_rx) = oneshot::channel();
193+
let req = DeleteVmRequest {
194+
vm_id: vm.vm_id.to_string(),
195+
};
196+
let vm_id_for_log = vm.vm_id;
197+
198+
self.handle_delete_vm_command(
199+
req,
200+
resp_tx,
201+
self.hypervisor.clone(),
202+
self.event_bus_tx.clone(),
203+
)
204+
.await;
205+
206+
match resp_rx.await {
207+
Ok(Ok(_)) => info!("VM_DISPATCHER (Sanity Check): Successfully cleaned up zombie VM {vm_id_for_log}."),
208+
Ok(Err(status)) => error!("VM_DISPATCHER (Sanity Check): Failed to clean up zombie VM {vm_id_for_log}: {status}"),
209+
Err(_) => error!("VM_DISPATCHER (Sanity Check): Cleanup task for zombie VM {vm_id_for_log} did not return a response."),
210+
}
211+
}
212+
}
213+
}
214+
info!("VM_DISPATCHER (Sanity Check): Check complete.");
215+
}
216+
}
217+
Err(e) => {
218+
error!("VM_DISPATCHER (Sanity Check): Failed to list VMs from repository: {e}. Skipping check.");
219+
}
220+
}
221+
168222
info!("VM_DISPATCHER: Running and waiting for commands and events.");
169223
loop {
170224
tokio::select! {

feos/services/vm-service/src/worker.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,21 @@ pub async fn handle_create_vm(
150150
}
151151
}
152152

153+
pub fn start_healthcheck_monitor(
154+
vm_id: String,
155+
hypervisor: Arc<dyn Hypervisor>,
156+
broadcast_tx: mpsc::Sender<VmEventWrapper>,
157+
cancel_bus: broadcast::Receiver<Uuid>,
158+
) {
159+
let health_hypervisor = hypervisor;
160+
let health_broadcast_tx = broadcast_tx;
161+
tokio::spawn(async move {
162+
health_hypervisor
163+
.healthcheck_vm(vm_id, health_broadcast_tx, cancel_bus)
164+
.await;
165+
});
166+
}
167+
153168
pub async fn handle_start_vm(
154169
req: StartVmRequest,
155170
responder: oneshot::Sender<Result<StartVmResponse, Status>>,
@@ -173,13 +188,7 @@ pub async fn handle_start_vm(
173188
)
174189
.await;
175190

176-
let health_hypervisor = hypervisor.clone();
177-
let health_broadcast_tx = broadcast_tx.clone();
178-
tokio::spawn(async move {
179-
health_hypervisor
180-
.healthcheck_vm(vm_id, health_broadcast_tx, cancel_bus)
181-
.await;
182-
});
191+
start_healthcheck_monitor(vm_id, hypervisor, broadcast_tx, cancel_bus);
183192
}
184193

185194
if responder.send(result.map_err(Into::into)).is_err() {

0 commit comments

Comments
 (0)