Skip to content

Commit 4128eed

Browse files
committed
Move state announcements from ch_adapter to workers in vm-service
Signed-off-by: Guvenc Gulce <guevenc.guelce@sap.com>
1 parent 76d0f13 commit 4128eed

File tree

3 files changed

+216
-255
lines changed

3 files changed

+216
-255
lines changed

feos/services/vm-service/src/vmm/ch_adapter.rs

Lines changed: 89 additions & 187 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{broadcast_state_change_event, Hypervisor, VmmError};
1+
use super::{Hypervisor, VmmError};
22
use crate::{VmEventWrapper, IMAGE_DIR, VM_API_SOCKET_DIR, VM_CONSOLE_DIR};
33
use cloud_hypervisor_client::{
44
apis::{configuration::Configuration, DefaultApi, DefaultApiClient},
@@ -11,12 +11,12 @@ use feos_proto::vm_service::{
1111
AttachDiskRequest, AttachDiskResponse, CreateVmRequest, DeleteVmRequest, DeleteVmResponse,
1212
GetVmRequest, PauseVmRequest, PauseVmResponse, PingVmRequest, PingVmResponse,
1313
RemoveDiskRequest, RemoveDiskResponse, ResumeVmRequest, ResumeVmResponse, ShutdownVmRequest,
14-
ShutdownVmResponse, StartVmRequest, StartVmResponse, StreamVmEventsRequest, VmEvent, VmInfo,
15-
VmState, VmStateChangedEvent,
14+
ShutdownVmResponse, StartVmRequest, StartVmResponse, StreamVmEventsRequest, VmConfig, VmEvent,
15+
VmInfo, VmState,
1616
};
1717
use hyper_util::client::legacy::Client;
1818
use hyperlocal::{UnixClientExt, UnixConnector, Uri as HyperlocalUri};
19-
use log::{error, info, warn};
19+
use log::{info, warn};
2020
use nix::sys::signal::{kill, Signal};
2121
use nix::unistd::{self, Pid};
2222
use std::io;
@@ -56,104 +56,13 @@ impl CloudHypervisorAdapter {
5656
Ok(DefaultApiClient::new(Arc::new(configuration)))
5757
}
5858

59-
async fn cleanup_socket_file(&self, vm_id: &str, socket_path: &Path, socket_type: &str) {
60-
if let Err(e) = tokio::fs::remove_file(socket_path).await {
61-
if e.kind() != std::io::ErrorKind::NotFound {
62-
warn!(
63-
"CH_ADAPTER ({}): Failed to remove {} socket {}: {}",
64-
vm_id,
65-
socket_type,
66-
socket_path.display(),
67-
e
68-
);
69-
}
70-
} else {
71-
info!(
72-
"CH_ADAPTER ({}): Successfully removed {} socket {}",
73-
vm_id,
74-
socket_type,
75-
socket_path.display()
76-
);
77-
}
78-
}
79-
}
80-
81-
#[tonic::async_trait]
82-
impl Hypervisor for CloudHypervisorAdapter {
83-
async fn create_vm(
59+
async fn perform_vm_creation(
8460
&self,
8561
vm_id: &str,
86-
req: CreateVmRequest,
62+
config: VmConfig,
8763
image_uuid: String,
88-
broadcast_tx: broadcast::Sender<VmEventWrapper>,
64+
api_socket_path: &Path,
8965
) -> Result<(), VmmError> {
90-
info!("CH_ADAPTER: Creating VM with provided ID: {vm_id}");
91-
92-
broadcast_state_change_event(
93-
&broadcast_tx,
94-
vm_id,
95-
"vm-service",
96-
VmStateChangedEvent {
97-
new_state: VmState::Creating as i32,
98-
reason: "VM creation process started".to_string(),
99-
},
100-
None,
101-
)
102-
.await;
103-
104-
let config = req
105-
.config
106-
.ok_or_else(|| VmmError::InvalidConfig("VmConfig is required".to_string()))?;
107-
108-
let api_socket_path = PathBuf::from(VM_API_SOCKET_DIR).join(vm_id);
109-
110-
info!(
111-
"CH_ADAPTER ({}): Spawning cloud-hypervisor process...",
112-
vm_id
113-
);
114-
let mut child = unsafe {
115-
TokioCommand::new(&self.ch_binary_path)
116-
.arg("--api-socket")
117-
.arg(&api_socket_path)
118-
.pre_exec(|| unistd::setsid().map(|_pid| ()).map_err(io::Error::other))
119-
.spawn()
120-
}
121-
.map_err(|e| VmmError::ProcessSpawnFailed(e.to_string()))?;
122-
let pid = child.id().map(|id| id as i64);
123-
124-
let vm_id_clone = vm_id.to_string();
125-
let broadcast_tx_clone = broadcast_tx.clone();
126-
tokio::spawn(async move {
127-
match child.wait().await {
128-
Ok(status) => {
129-
warn!(
130-
"CH_ADAPTER ({}): Process exited with status: {}",
131-
&vm_id_clone, status
132-
);
133-
broadcast_state_change_event(
134-
&broadcast_tx_clone,
135-
&vm_id_clone,
136-
"vm-process",
137-
VmStateChangedEvent {
138-
new_state: VmState::Crashed as i32,
139-
reason: format!(
140-
"Process exited with code {}",
141-
status.code().unwrap_or(-1)
142-
),
143-
},
144-
None,
145-
)
146-
.await;
147-
}
148-
Err(e) => {
149-
error!(
150-
"CH_ADAPTER ({}): Failed to wait for child process: {}",
151-
&vm_id_clone, e
152-
);
153-
}
154-
}
155-
});
156-
15766
let wait_for_socket = async {
15867
while !api_socket_path.exists() {
15968
tokio::time::sleep(Duration::from_millis(50)).await;
@@ -168,7 +77,7 @@ impl Hypervisor for CloudHypervisorAdapter {
16877
"Timed out waiting for API socket".to_string(),
16978
));
17079
}
171-
info!("CH_ADAPTER ({}): API socket is available.", vm_id);
80+
info!("CH_ADAPTER ({vm_id}): API socket is available.");
17281

17382
let client = self.get_ch_api_client(vm_id)?;
17483
tokio::fs::create_dir_all(VM_CONSOLE_DIR)
@@ -229,46 +138,85 @@ impl Hypervisor for CloudHypervisorAdapter {
229138
.await
230139
.map_err(|e| VmmError::ApiOperationFailed(format!("vm.create API call failed: {e}")))?;
231140

232-
info!("CH_ADAPTER ({}): vm.create API call successful.", vm_id);
141+
info!("CH_ADAPTER ({vm_id}): vm.create API call successful.");
233142

234-
broadcast_state_change_event(
235-
&broadcast_tx,
236-
vm_id,
237-
"vm-service",
238-
VmStateChangedEvent {
239-
new_state: VmState::Created as i32,
240-
reason: "Hypervisor process started and VM configured".to_string(),
241-
},
242-
pid,
243-
)
244-
.await;
143+
Ok::<(), VmmError>(())
144+
}
245145

246-
Ok(())
146+
async fn cleanup_socket_file(&self, vm_id: &str, socket_path: &Path, socket_type: &str) {
147+
if let Err(e) = tokio::fs::remove_file(socket_path).await {
148+
if e.kind() != std::io::ErrorKind::NotFound {
149+
warn!(
150+
"CH_ADAPTER ({vm_id}): Failed to remove {socket_type} socket {}: {e}",
151+
socket_path.display()
152+
);
153+
}
154+
} else {
155+
info!(
156+
"CH_ADAPTER ({vm_id}): Successfully removed {socket_type} socket {}",
157+
socket_path.display()
158+
);
159+
}
247160
}
161+
}
248162

249-
async fn start_vm(
163+
#[tonic::async_trait]
164+
impl Hypervisor for CloudHypervisorAdapter {
165+
async fn create_vm(
250166
&self,
251-
req: StartVmRequest,
252-
broadcast_tx: broadcast::Sender<VmEventWrapper>,
253-
) -> Result<StartVmResponse, VmmError> {
167+
vm_id: &str,
168+
req: CreateVmRequest,
169+
image_uuid: String,
170+
) -> Result<Option<i64>, VmmError> {
171+
info!("CH_ADAPTER: Creating VM with provided ID: {vm_id}");
172+
173+
let config = req
174+
.config
175+
.ok_or_else(|| VmmError::InvalidConfig("VmConfig is required".to_string()))?;
176+
177+
let api_socket_path = PathBuf::from(VM_API_SOCKET_DIR).join(vm_id);
178+
179+
info!("CH_ADAPTER ({vm_id}): Spawning cloud-hypervisor process...");
180+
let mut child = unsafe {
181+
TokioCommand::new(&self.ch_binary_path)
182+
.arg("--api-socket")
183+
.arg(&api_socket_path)
184+
.pre_exec(|| unistd::setsid().map(|_pid| ()).map_err(io::Error::other))
185+
.spawn()
186+
}
187+
.map_err(|e| VmmError::ProcessSpawnFailed(e.to_string()))?;
188+
let pid = child.id().map(|id| id as i64);
189+
190+
let vm_creation = self.perform_vm_creation(vm_id, config, image_uuid, &api_socket_path);
191+
192+
tokio::select! {
193+
biased;
194+
exit_status_res = child.wait() => {
195+
let status = exit_status_res.map_err(|e| VmmError::ProcessSpawnFailed(format!("Failed to wait for child process: {e}")))?;
196+
Err(VmmError::ProcessSpawnFailed(format!("Process exited prematurely with status: {status}")))
197+
}
198+
creation_result = vm_creation => {
199+
match creation_result {
200+
Ok(_) => Ok(pid),
201+
Err(e) => {
202+
if let Err(kill_err) = child.kill().await {
203+
warn!("CH_ADAPTER ({vm_id}): Failed to kill child process after creation failure: {kill_err}");
204+
}
205+
let _ = child.wait().await;
206+
Err(e)
207+
}
208+
}
209+
}
210+
}
211+
}
212+
213+
async fn start_vm(&self, req: StartVmRequest) -> Result<StartVmResponse, VmmError> {
254214
let api_client = self.get_ch_api_client(&req.vm_id)?;
255215
api_client
256216
.boot_vm()
257217
.await
258218
.map_err(|e| VmmError::ApiOperationFailed(e.to_string()))?;
259219

260-
broadcast_state_change_event(
261-
&broadcast_tx,
262-
&req.vm_id,
263-
"vm-service",
264-
VmStateChangedEvent {
265-
new_state: VmState::Running as i32,
266-
reason: "Start command successful".to_string(),
267-
},
268-
None,
269-
)
270-
.await;
271-
272220
Ok(StartVmResponse {})
273221
}
274222

@@ -297,13 +245,12 @@ impl Hypervisor for CloudHypervisorAdapter {
297245
&self,
298246
req: DeleteVmRequest,
299247
process_id: Option<i64>,
300-
_broadcast_tx: broadcast::Sender<VmEventWrapper>,
301248
) -> Result<DeleteVmResponse, VmmError> {
302249
if let Ok(api_client) = self.get_ch_api_client(&req.vm_id) {
303250
if let Err(e) = api_client.delete_vm().await {
304251
warn!(
305-
"CH_ADAPTER ({}): API call to delete VM failed: {}. This might happen if the process is already gone. Continuing cleanup.",
306-
req.vm_id, e
252+
"CH_ADAPTER ({}): API call to delete VM failed: {e}. This might happen if the process is already gone. Continuing cleanup.",
253+
req.vm_id
307254
);
308255
} else {
309256
info!(
@@ -315,22 +262,22 @@ impl Hypervisor for CloudHypervisorAdapter {
315262

316263
if let Some(pid_val) = process_id {
317264
info!(
318-
"CH_ADAPTER ({}): Attempting to kill process with PID: {}",
319-
req.vm_id, pid_val
265+
"CH_ADAPTER ({}): Attempting to kill process with PID: {pid_val}",
266+
req.vm_id
320267
);
321268
let pid = Pid::from_raw(pid_val as i32);
322269
match kill(pid, Signal::SIGKILL) {
323270
Ok(_) => info!(
324-
"CH_ADAPTER ({}): Successfully sent SIGKILL to process {}.",
325-
req.vm_id, pid_val
271+
"CH_ADAPTER ({}): Successfully sent SIGKILL to process {pid_val}.",
272+
req.vm_id
326273
),
327274
Err(nix::Error::ESRCH) => info!(
328-
"CH_ADAPTER ({}): Process {} already exited.",
329-
req.vm_id, pid_val
275+
"CH_ADAPTER ({}): Process {pid_val} already exited.",
276+
req.vm_id
330277
),
331278
Err(e) => warn!(
332-
"CH_ADAPTER ({}): Failed to kill process {}: {}. It might already be gone.",
333-
req.vm_id, pid_val, e
279+
"CH_ADAPTER ({}): Failed to kill process {pid_val}: {e}. It might already be gone.",
280+
req.vm_id
334281
),
335282
}
336283
}
@@ -413,75 +360,30 @@ impl Hypervisor for CloudHypervisorAdapter {
413360
})
414361
}
415362

416-
async fn shutdown_vm(
417-
&self,
418-
req: ShutdownVmRequest,
419-
broadcast_tx: broadcast::Sender<VmEventWrapper>,
420-
) -> Result<ShutdownVmResponse, VmmError> {
363+
async fn shutdown_vm(&self, req: ShutdownVmRequest) -> Result<ShutdownVmResponse, VmmError> {
421364
let api_client = self.get_ch_api_client(&req.vm_id)?;
422365
api_client
423366
.shutdown_vm()
424367
.await
425368
.map_err(|e| VmmError::ApiOperationFailed(e.to_string()))?;
426-
broadcast_state_change_event(
427-
&broadcast_tx,
428-
&req.vm_id,
429-
"vm-service",
430-
VmStateChangedEvent {
431-
new_state: VmState::Stopped as i32,
432-
reason: "Shutdown command successful".to_string(),
433-
},
434-
None,
435-
)
436-
.await;
437369
Ok(ShutdownVmResponse {})
438370
}
439371

440-
async fn pause_vm(
441-
&self,
442-
req: PauseVmRequest,
443-
broadcast_tx: broadcast::Sender<VmEventWrapper>,
444-
) -> Result<PauseVmResponse, VmmError> {
372+
async fn pause_vm(&self, req: PauseVmRequest) -> Result<PauseVmResponse, VmmError> {
445373
let api_client = self.get_ch_api_client(&req.vm_id)?;
446374
api_client
447375
.pause_vm()
448376
.await
449377
.map_err(|e| VmmError::ApiOperationFailed(e.to_string()))?;
450-
broadcast_state_change_event(
451-
&broadcast_tx,
452-
&req.vm_id,
453-
"vm-service",
454-
VmStateChangedEvent {
455-
new_state: VmState::Paused as i32,
456-
reason: "Pause command successful".to_string(),
457-
},
458-
None,
459-
)
460-
.await;
461378
Ok(PauseVmResponse {})
462379
}
463380

464-
async fn resume_vm(
465-
&self,
466-
req: ResumeVmRequest,
467-
broadcast_tx: broadcast::Sender<VmEventWrapper>,
468-
) -> Result<ResumeVmResponse, VmmError> {
381+
async fn resume_vm(&self, req: ResumeVmRequest) -> Result<ResumeVmResponse, VmmError> {
469382
let api_client = self.get_ch_api_client(&req.vm_id)?;
470383
api_client
471384
.resume_vm()
472385
.await
473386
.map_err(|e| VmmError::ApiOperationFailed(e.to_string()))?;
474-
broadcast_state_change_event(
475-
&broadcast_tx,
476-
&req.vm_id,
477-
"vm-service",
478-
VmStateChangedEvent {
479-
new_state: VmState::Running as i32,
480-
reason: "Resume command successful".to_string(),
481-
},
482-
None,
483-
)
484-
.await;
485387
Ok(ResumeVmResponse {})
486388
}
487389

0 commit comments

Comments
 (0)