Skip to content

Commit c210fd2

Browse files
committed
Make vmEvents vm_id optional so that it can subscribe to all events
Signed-off-by: Guvenc Gulce <[email protected]>
1 parent 1ac71e7 commit c210fd2

File tree

5 files changed

+152
-83
lines changed

5 files changed

+152
-83
lines changed

cli/src/vm_commands.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ pub enum VmCommand {
7575
vm_id: String,
7676
},
7777
Events {
78-
#[arg(required = true)]
79-
vm_id: String,
78+
#[arg(long)]
79+
vm_id: Option<String>,
8080
},
8181
Console {
8282
#[arg(required = true)]
@@ -267,10 +267,15 @@ async fn delete_vm(client: &mut VmServiceClient<Channel>, vm_id: String) -> Resu
267267
Ok(())
268268
}
269269

270-
async fn watch_events(client: &mut VmServiceClient<Channel>, vm_id: String) -> Result<()> {
271-
println!("Watching events for VM: {vm_id}. Press Ctrl+C to stop.");
270+
async fn watch_events(client: &mut VmServiceClient<Channel>, vm_id: Option<String>) -> Result<()> {
271+
if let Some(id) = &vm_id {
272+
println!("Watching events for VM: {id}. Press Ctrl+C to stop.");
273+
} else {
274+
println!("Watching events for all VMs. Press Ctrl+C to stop.");
275+
}
276+
272277
let request = StreamVmEventsRequest {
273-
vm_id: vm_id.clone(),
278+
vm_id,
274279
..Default::default()
275280
};
276281
let mut stream = client.stream_vm_events(request).await?.into_inner();

feos/services/vm-service/src/dispatcher.rs

Lines changed: 127 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -349,88 +349,144 @@ impl VmServiceDispatcher {
349349
hypervisor: Arc<dyn Hypervisor>,
350350
broadcast_tx: broadcast::Sender<VmEventWrapper>,
351351
) {
352-
let vm_id_str = req.vm_id.clone();
353-
let vm_id = match Uuid::parse_str(&vm_id_str) {
354-
Ok(id) => id,
355-
Err(_) => {
356-
if stream_tx
357-
.send(Err(Status::invalid_argument("Invalid VM ID format.")))
358-
.await
359-
.is_err()
360-
{
361-
warn!(
362-
"STREAM_EVENTS: Client for {vm_id_str} disconnected before error could be sent."
363-
);
352+
if let Some(vm_id_str) = req.vm_id.clone() {
353+
let vm_id = match Uuid::parse_str(&vm_id_str) {
354+
Ok(id) => id,
355+
Err(_) => {
356+
if stream_tx
357+
.send(Err(Status::invalid_argument("Invalid VM ID format.")))
358+
.await
359+
.is_err()
360+
{
361+
warn!(
362+
"STREAM_EVENTS: Client for {vm_id_str} disconnected before error could be sent."
363+
);
364+
}
365+
return;
364366
}
365-
return;
366-
}
367-
};
368-
369-
match self.repository.get_vm(vm_id).await {
370-
Ok(Some(record)) => {
371-
info!(
372-
"STREAM_EVENTS: Sending initial state for VM {}: {:?}",
373-
vm_id_str, record.status.state
374-
);
375-
let state_change_event = VmStateChangedEvent {
376-
new_state: record.status.state as i32,
377-
reason: record.status.last_msg,
378-
};
379-
let initial_event = VmEvent {
380-
vm_id: vm_id_str.clone(),
381-
id: Uuid::new_v4().to_string(),
382-
component_id: "vm-service-db".to_string(),
383-
data: Some(Any {
384-
type_url: "type.googleapis.com/feos.vm.vmm.api.v1.VmStateChangedEvent"
385-
.to_string(),
386-
value: state_change_event.encode_to_vec(),
387-
}),
388-
};
367+
};
389368

390-
if stream_tx.send(Ok(initial_event)).await.is_err() {
369+
match self.repository.get_vm(vm_id).await {
370+
Ok(Some(record)) => {
391371
info!(
392-
"STREAM_EVENTS: Client for {vm_id_str} disconnected before live events could be streamed."
372+
"STREAM_EVENTS: Sending initial state for VM {}: {:?}",
373+
vm_id_str, record.status.state
393374
);
394-
return;
395-
}
375+
let state_change_event = VmStateChangedEvent {
376+
new_state: record.status.state as i32,
377+
reason: record.status.last_msg,
378+
};
379+
let initial_event = VmEvent {
380+
vm_id: vm_id_str.clone(),
381+
id: Uuid::new_v4().to_string(),
382+
component_id: "vm-service-db".to_string(),
383+
data: Some(Any {
384+
type_url: "type.googleapis.com/feos.vm.vmm.api.v1.VmStateChangedEvent"
385+
.to_string(),
386+
value: state_change_event.encode_to_vec(),
387+
}),
388+
};
396389

397-
tokio::spawn(worker::handle_stream_vm_events(
398-
req,
399-
stream_tx,
400-
hypervisor,
401-
broadcast_tx,
402-
));
403-
}
404-
Ok(None) => {
405-
warn!("VM with ID {vm_id} not found",);
406-
if stream_tx
407-
.send(Err(Status::not_found(format!(
408-
"VM with ID {vm_id} not found"
409-
))))
410-
.await
411-
.is_err()
412-
{
413-
warn!(
414-
"STREAM_EVENTS: Client for {vm_id_str} disconnected before not-found error could be sent."
390+
if stream_tx.send(Ok(initial_event)).await.is_err() {
391+
info!(
392+
"STREAM_EVENTS: Client for {vm_id_str} disconnected before live events could be streamed."
393+
);
394+
return;
395+
}
396+
397+
tokio::spawn(worker::handle_stream_vm_events(
398+
req,
399+
stream_tx,
400+
hypervisor,
401+
broadcast_tx,
402+
));
403+
}
404+
Ok(None) => {
405+
warn!("VM with ID {vm_id} not found");
406+
if stream_tx
407+
.send(Err(Status::not_found(format!(
408+
"VM with ID {vm_id} not found"
409+
))))
410+
.await
411+
.is_err()
412+
{
413+
warn!(
414+
"STREAM_EVENTS: Client for {vm_id_str} disconnected before not-found error could be sent."
415+
);
416+
}
417+
}
418+
Err(e) => {
419+
error!(
420+
"STREAM_EVENTS: Failed to get VM {vm_id_str} from database for event stream: {e}"
415421
);
422+
if stream_tx
423+
.send(Err(Status::internal(
424+
"Failed to retrieve VM information for event stream.",
425+
)))
426+
.await
427+
.is_err()
428+
{
429+
warn!(
430+
"STREAM_EVENTS: Client for {vm_id_str} disconnected before internal-error could be sent."
431+
);
432+
}
416433
}
417434
}
418-
Err(e) => {
419-
error!(
420-
"STREAM_EVENTS: Failed to get VM {vm_id_str} from database for event stream: {e}"
421-
);
422-
if stream_tx
423-
.send(Err(Status::internal(
424-
"Failed to retrieve VM information for event stream.",
425-
)))
426-
.await
427-
.is_err()
428-
{
429-
warn!(
430-
"STREAM_EVENTS: Client for {vm_id_str} disconnected before internal-error could be sent."
435+
} else {
436+
info!("STREAM_EVENTS: Request to stream events for all VMs received.");
437+
match self.repository.list_all_vms().await {
438+
Ok(records) => {
439+
info!(
440+
"STREAM_EVENTS: Found {} existing VMs to send initial state for.",
441+
records.len()
431442
);
443+
for record in records {
444+
let state_change_event = VmStateChangedEvent {
445+
new_state: record.status.state as i32,
446+
reason: format!("Initial state from DB: {}", record.status.last_msg),
447+
};
448+
let initial_event = VmEvent {
449+
vm_id: record.vm_id.to_string(),
450+
id: Uuid::new_v4().to_string(),
451+
component_id: "vm-service-db".to_string(),
452+
data: Some(Any {
453+
type_url:
454+
"type.googleapis.com/feos.vm.vmm.api.v1.VmStateChangedEvent"
455+
.to_string(),
456+
value: state_change_event.encode_to_vec(),
457+
}),
458+
};
459+
460+
if stream_tx.send(Ok(initial_event)).await.is_err() {
461+
info!("STREAM_EVENTS: Client for all VMs disconnected while sending initial states.");
462+
return;
463+
}
464+
}
465+
}
466+
Err(e) => {
467+
error!(
468+
"STREAM_EVENTS: Failed to list all VMs from database for event stream: {e}"
469+
);
470+
if stream_tx
471+
.send(Err(Status::internal(
472+
"Failed to retrieve initial VM list for event stream.",
473+
)))
474+
.await
475+
.is_err()
476+
{
477+
warn!("STREAM_EVENTS: Client for all VMs disconnected before internal-error could be sent.");
478+
}
479+
return;
432480
}
433481
}
482+
483+
info!("STREAM_EVENTS: Initial states sent. Starting live event stream for all VMs.");
484+
tokio::spawn(worker::handle_stream_vm_events(
485+
req,
486+
stream_tx,
487+
hypervisor,
488+
broadcast_tx,
489+
));
434490
}
435491
}
436492

feos/services/vm-service/src/vmm/ch_adapter.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ impl Hypervisor for CloudHypervisorAdapter {
219219
&vm_id_clone,
220220
"vm-process",
221221
VmStateChangedEvent {
222-
new_state: VmState::Stopped as i32,
222+
new_state: VmState::Crashed as i32,
223223
reason: format!(
224224
"Process exited with code {}",
225225
status.code().unwrap_or(-1)
@@ -480,20 +480,25 @@ impl Hypervisor for CloudHypervisorAdapter {
480480
let vm_id_to_watch = req.vm_id;
481481

482482
tokio::spawn(async move {
483+
let watcher_desc = vm_id_to_watch
484+
.clone()
485+
.unwrap_or_else(|| "all VMs".to_string());
483486
loop {
484487
match broadcast_rx.recv().await {
485488
Ok(VmEventWrapper { event, .. }) => {
486-
if event.vm_id == vm_id_to_watch && tx.send(Ok(event)).await.is_err() {
487-
info!("gRPC event stream for VM {vm_id_to_watch} disconnected.");
489+
if vm_id_to_watch.as_ref().is_none_or(|id| event.vm_id == *id)
490+
&& tx.send(Ok(event)).await.is_err()
491+
{
492+
info!("gRPC event stream for '{watcher_desc}' disconnected.");
488493
break;
489494
}
490495
}
491496
Err(broadcast::error::RecvError::Lagged(n)) => {
492-
warn!("Event stream for VM {vm_id_to_watch} lagged by {n} messages.");
497+
warn!("Event stream for '{watcher_desc}' lagged by {n} messages.");
493498
}
494499
Err(broadcast::error::RecvError::Closed) => {
495500
info!(
496-
"Broadcast channel closed. Shutting down stream for VM {vm_id_to_watch}."
501+
"Broadcast channel closed. Shutting down stream for '{watcher_desc}'."
497502
);
498503
break;
499504
}

feos/tests/integration_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ async fn test_create_and_start_vm() -> Result<()> {
220220

221221
info!("Connecting to StreamVmEvents stream for vm_id: {}", &vm_id);
222222
let events_req = StreamVmEventsRequest {
223-
vm_id: vm_id.clone(),
223+
vm_id: Some(vm_id.clone()),
224224
..Default::default()
225225
};
226226
let mut stream = vm_client.stream_vm_events(events_req).await?.into_inner();

proto/v1/vm.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,10 @@ message VmStateChangedEvent {
7777

7878
message StreamVmEventsRequest {
7979
// The ID of the Virtual Machine for which to retrieve events.
80-
string vm_id = 1;
80+
// If not provided, the stream will start by sending the current state
81+
// of all existing VMs, and then continue to stream all subsequent events
82+
// from all VMs.
83+
optional string vm_id = 1;
8184
// Filter the stream to only include events from a specific "component" in VM
8285
string with_component_id = 5;
8386

0 commit comments

Comments
 (0)