Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/common/proto/monitoringserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package monitoringserver;
service MonitoringServerConnection {
rpc SendContainerList (ContainerList) returns (SendContainerListResponse);
rpc SendNodeInfo (NodeInfo) returns (SendNodeInfoResponse);
rpc SendStressMonitoringMetric (StressMonitoringMetric) returns (StressMonitoringMetricResponse);
}

message SendContainerListResponse {
Expand Down Expand Up @@ -50,4 +51,13 @@ message NodeInfo {
string os = 12;
string arch = 13;
string ip = 14;
}

// Stress monitoring metric: single JSON string payload from App Data Provider
message StressMonitoringMetric {
string json = 1; // JSON string containing process_name, pid, core_masking, core_count, fps, latency, cpu_loads, etc.
}

message StressMonitoringMetricResponse {
string resp = 1;
}
6 changes: 3 additions & 3 deletions src/player/filtergateway/src/vehicle/dds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ impl DdsManager {

// Include generated DDS types at runtime
#[allow(unused)]
#[allow(non_snake_case)]
pub mod dds_types {
#![allow(non_snake_case)]
#[allow(unused_variables, unused_imports)]
include! {
concat!(env!("OUT_DIR"), "/dds_types.rs")
Expand All @@ -268,15 +268,15 @@ pub mod dds_type_metadata {

/// Returns a vector of available DDS type names obtained from the generated type metadata.
pub fn get_available_types() -> Vec<String> {
dds_type_metadata::get_type_metadata()
generated_metadata::get_type_metadata()
.keys()
.cloned()
.collect()
}

// Always include the generated type metadata; this file is generated by build.rs.

pub mod dds_type_metadata {
pub mod generated_metadata {
#[allow(unused_variables, unused_imports)]
include! {
concat!(env!("OUT_DIR"), "/dds_type_metadata.rs")
Expand Down
2 changes: 1 addition & 1 deletion src/server/monitoringserver/src/data_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ mod tests {
let node = sample_node("node1", "192.168.10.201");
let soc = SocInfo::new("192.168.10.200".to_string(), node.clone());
ds.socs.insert("192.168.10.200".to_string(), soc.clone());
let mut board = BoardInfo::new("192.168.10.200".to_string(), node.clone());
let board = BoardInfo::new("192.168.10.200".to_string(), node.clone());
ds.boards
.insert("192.168.10.200".to_string(), board.clone());

Expand Down
33 changes: 32 additions & 1 deletion src/server/monitoringserver/src/etcd_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use crate::data_structures::{BoardInfo, SocInfo};
use common::monitoringserver::{ContainerInfo, NodeInfo}; // Use protobuf types
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;

/// Generic function to store info in etcd
async fn store_info<T: Serialize>(
Expand Down Expand Up @@ -229,6 +230,37 @@ pub async fn get_all_containers() -> common::Result<Vec<ContainerInfo>> {
Ok(containers)
}

/// Store a raw stress metric JSON string in etcd under /piccolo/metrics/stress/{process}/{pid}:{ts}
pub async fn store_stress_metric_json(json_str: &str) -> common::Result<()> {
// parse & validate JSON
let v: Value = serde_json::from_str(json_str)
.map_err(|e| format!("Failed to parse stress metric JSON: {}", e))?;

let process_name = v
.get("process_name")
.and_then(|s| s.as_str())
.unwrap_or("unknown");
let pid_str = v
.get("pid")
.and_then(|p| p.as_i64().map(|n| n.to_string()))
.unwrap_or_else(|| "0".to_string());

let resource_id = format!("{}/{}", process_name, pid_str);

// store_info - serde_json::Value implements Serialize
store_info("stress", &resource_id, &v).await
}

/// Retrieve all stored stress metrics as JSON values.
pub async fn get_all_stress_metrics() -> common::Result<Vec<Value>> {
get_all_info("stress").await
}

/// Delete a stored stress metric by resource id (the id returned/used when storing)
pub async fn delete_stress_metric(resource_id: &str) -> common::Result<()> {
delete_info("stress", resource_id).await
}

/// Delete NodeInfo from etcd
pub async fn delete_node_info(node_name: &str) -> common::Result<()> {
delete_info("nodes", node_name).await
Expand All @@ -254,7 +286,6 @@ mod tests {
use super::*;
use crate::data_structures::{BoardInfo, SocInfo};
use common::monitoringserver::{ContainerInfo, NodeInfo};
use std::collections::HashMap;
use std::time::SystemTime;

fn sample_node(name: &str, ip: &str) -> NodeInfo {
Expand Down
209 changes: 201 additions & 8 deletions src/server/monitoringserver/src/grpc/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,75 @@
use common::monitoringserver::monitoring_server_connection_server::MonitoringServerConnection;
use common::monitoringserver::{
ContainerList, NodeInfo, SendContainerListResponse, SendNodeInfoResponse,
StressMonitoringMetric, StressMonitoringMetricResponse,
};
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};

use serde::Deserialize;
use serde_json;
use std::fmt;

/// JSON types for StressMonitoringMetric payload
#[derive(Debug, Deserialize)]
pub struct CpuLoad {
pub core_id: u32,
pub load: f64,
}

#[derive(Debug, Deserialize)]
pub struct StressMonitoringMetricParsed {
pub process_name: String,
pub pid: u32,
pub core_masking: Option<String>,
pub core_count: Option<u32>,
pub fps: f64,
pub latency: u64,
pub cpu_loads: Vec<CpuLoad>,
}

impl StressMonitoringMetricParsed {
// If core_count was provided, return it; otherwise derive from max core_id in cpu_loads.
pub fn effective_core_count(&self) -> u32 {
if let Some(c) = self.core_count {
c
} else {
self.cpu_loads
.iter()
.map(|c| c.core_id)
.max()
.unwrap_or(0)
.saturating_add(1)
}
}
}

impl fmt::Display for StressMonitoringMetricParsed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"process={} pid={} cores={} fps={} latency={}",
self.process_name,
self.pid,
self.effective_core_count(),
self.fps,
self.latency
)
}
}

pub fn parse_stress_metric_json(
s: &str,
) -> Result<StressMonitoringMetricParsed, serde_json::Error> {
serde_json::from_str(s)
}

/// MonitoringServer gRPC service handler
#[derive(Clone)]
pub struct MonitoringServerReceiver {
pub tx_container: mpsc::Sender<ContainerList>,
pub tx_node: mpsc::Sender<NodeInfo>,
pub tx_stress: mpsc::Sender<String>,
}

#[tonic::async_trait]
Expand Down Expand Up @@ -57,17 +117,38 @@ impl MonitoringServerConnection for MonitoringServerReceiver {
)),
}
}

/// Handle a StressMonitoringMetric message (single JSON string) from App Data Provider
///
/// Parses the JSON payload to validate format, then forwards the original JSON string to the manager via channel.
async fn send_stress_monitoring_metric<'life>(
&'life self,
request: Request<StressMonitoringMetric>,
) -> Result<Response<StressMonitoringMetricResponse>, Status> {
let req: StressMonitoringMetric = request.into_inner();
// validate JSON by parsing to struct
parse_stress_metric_json(&req.json)
.map_err(|e| Status::invalid_argument(format!("invalid stress metric json: {}", e)))?;

match self.tx_stress.send(req.json).await {
Ok(_) => Ok(Response::new(StressMonitoringMetricResponse {
resp: "Successfully processed StressMonitoringMetric".to_string(),
})),
Err(e) => Err(Status::new(
tonic::Code::Unavailable,
format!("cannot send stress metric: {}", e),
)),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use common::monitoringserver::{
ContainerList, NodeInfo, SendContainerListResponse, SendNodeInfoResponse,
};
use common::monitoringserver::{ContainerList, NodeInfo, StressMonitoringMetric};
use tokio::sync::mpsc;
use tokio::time::{timeout, Duration};
use tonic::{Code, Request, Status};
use tonic::{Code, Request};

fn sample_node(name: &str, ip: &str) -> NodeInfo {
NodeInfo {
Expand Down Expand Up @@ -95,13 +176,32 @@ mod tests {
}
}

fn sample_stress_json() -> String {
r#"{
"process_name":"example_process",
"pid":12345,
"core_masking":"0x0000F",
"core_count":20,
"fps":58.7,
"latency":38,
"cpu_loads":[
{"core_id":0,"load":23.5},
{"core_id":1,"load":45.2},
{"core_id":2,"load":12.8}
]
}"#
.to_string()
}

#[tokio::test]
async fn test_send_container_list_success() {
let (tx, mut rx) = mpsc::channel(1);
let dummy_tx = mpsc::channel(1).0;
let dummy_tx_node = mpsc::channel::<NodeInfo>(1).0;
let dummy_stress = mpsc::channel::<String>(1).0;
let receiver = MonitoringServerReceiver {
tx_container: tx,
tx_node: dummy_tx,
tx_node: dummy_tx_node,
tx_stress: dummy_stress,
};
let req = Request::new(sample_container_list("node1"));
let resp = receiver.send_container_list(req).await.unwrap();
Expand All @@ -117,9 +217,11 @@ mod tests {
let (tx, rx) = mpsc::channel(1);
drop(rx);
let dummy_tx = mpsc::channel(1).0;
let dummy_stress = mpsc::channel(1).0;
let receiver = MonitoringServerReceiver {
tx_container: tx,
tx_node: dummy_tx,
tx_stress: dummy_stress,
};
let req = Request::new(sample_container_list("node1"));
let resp = receiver.send_container_list(req).await;
Expand All @@ -131,10 +233,12 @@ mod tests {
#[tokio::test]
async fn test_send_node_info_success() {
let (tx, mut rx) = mpsc::channel(1);
let dummy_tx = mpsc::channel(1).0;
let dummy_tx_container = mpsc::channel::<ContainerList>(1).0;
let dummy_stress = mpsc::channel::<String>(1).0;
let receiver = MonitoringServerReceiver {
tx_container: dummy_tx,
tx_container: dummy_tx_container,
tx_node: tx,
tx_stress: dummy_stress,
};
let req = Request::new(sample_node("node1", "192.168.10.201"));
let resp = receiver.send_node_info(req).await.unwrap();
Expand All @@ -150,14 +254,103 @@ mod tests {
let (tx, rx) = mpsc::channel(1);
drop(rx);
let dummy_tx = mpsc::channel(1).0;
let dummy_stress = mpsc::channel(1).0;
let receiver = MonitoringServerReceiver {
tx_container: dummy_tx,
tx_node: tx,
tx_stress: dummy_stress,
};
let req = Request::new(sample_node("node1", "192.168.10.201"));
let resp = receiver.send_node_info(req).await;
assert!(resp.is_err());
let status = resp.err().unwrap();
assert_eq!(status.code(), Code::Unavailable);
}

#[tokio::test]
async fn test_send_stress_metric_success() {
let (tx, mut rx) = mpsc::channel(1);
let dummy_tx_container = mpsc::channel::<ContainerList>(1).0;
let dummy_tx_node = mpsc::channel::<NodeInfo>(1).0;
let receiver = MonitoringServerReceiver {
tx_container: dummy_tx_container,
tx_node: dummy_tx_node,
tx_stress: tx,
};
let req = Request::new(StressMonitoringMetric {
json: sample_stress_json(),
});
let resp = receiver.send_stress_monitoring_metric(req).await.unwrap();
assert_eq!(
resp.get_ref().resp,
"Successfully processed StressMonitoringMetric"
);
let received = timeout(Duration::from_millis(100), rx.recv()).await;
assert!(received.is_ok());
}

#[tokio::test]
async fn test_send_stress_metric_roundtrip() {
use crate::etcd_storage;
use crate::manager;

// create channels: tx -> receiver, rx -> manager
let (tx_container, rx_container) = mpsc::channel::<ContainerList>(4);
let (tx_node, rx_node) = mpsc::channel::<NodeInfo>(4);
let (tx_stress, rx_stress) = mpsc::channel::<String>(8);

// create and spawn the real manager (it will consume rx_stress and call etcd)
let mgr = manager::MonitoringServerManager::new(rx_container, rx_node, rx_stress).await;
let mgr_handle = tokio::spawn(async move {
// run will spawn internal tasks and block until channels are closed
let _ = mgr.run().await;
});

// construct receiver with the tx side of the channels
let receiver = MonitoringServerReceiver {
tx_container: tx_container.clone(),
tx_node: tx_node.clone(),
tx_stress: tx_stress.clone(),
};

// send the stress metric via gRPC handler (synchronous call)
let req = Request::new(StressMonitoringMetric {
json: sample_stress_json(),
});
let resp = receiver.send_stress_monitoring_metric(req).await.unwrap();
assert_eq!(
resp.get_ref().resp,
"Successfully processed StressMonitoringMetric"
);

// wait briefly for manager to process and store to etcd
tokio::time::sleep(Duration::from_millis(300)).await;
// verify etcd has at least one stress metric with expected process_name
// NOTE: requires working etcd and correct common::etcd configuration
let metrics = etcd_storage::get_all_stress_metrics().await;
assert!(
metrics.is_ok(),
"failed to list stress metrics from etcd: {:?}",
metrics.err()
);
let items = metrics.unwrap();
let found = items.iter().any(|v| {
v.get("process_name")
.and_then(|s| s.as_str())
.map(|s| s == "example_process")
.unwrap_or(false)
});
assert!(
found,
"stored stress metric with process_name 'example_process' not found in etcd"
);

// cleanup: drop tx to allow manager tasks to exit, then await manager handle
drop(tx_container);
drop(tx_node);
drop(tx_stress);

// give manager a moment to finish
let _ = tokio::time::timeout(Duration::from_secs(1), mgr_handle).await;
}
}
Loading
Loading