Skip to content

Commit 77d08fb

Browse files
authored
Merge pull request #269 from akshaylg0314/containerinfo
feat(api)-Store ContainerInfo into ETCD at MonitoringServer[#265]
2 parents 91e9bcf + 3c0d46e commit 77d08fb

File tree

5 files changed

+518
-30
lines changed

5 files changed

+518
-30
lines changed

src/agent/nodeagent/src/config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use serde::Deserialize;
22
use std::fs::File;
33
use std::io::Read;
44
use std::path::Path;
5-
use thiserror::Error;
65
use std::sync::OnceLock;
6+
use thiserror::Error;
77

88
// Global config instance
99
static NODEAGENT_CONFIG: OnceLock<Config> = OnceLock::new();
@@ -12,7 +12,7 @@ static NODEAGENT_CONFIG: OnceLock<Config> = OnceLock::new();
1212
pub enum ConfigError {
1313
#[error("Failed to read config file: {0}")]
1414
IoError(#[from] std::io::Error),
15-
15+
1616
#[error("Failed to parse YAML: {0}")]
1717
YamlError(#[from] serde_yaml::Error),
1818
}
@@ -62,7 +62,7 @@ impl Config {
6262
let mut file = File::open(path)?;
6363
let mut contents = String::new();
6464
file.read_to_string(&mut contents)?;
65-
65+
6666
let config = serde_yaml::from_str(&contents)?;
6767
Ok(config)
6868
}

src/agent/nodeagent/src/main.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
//! This file sets up the asynchronous runtime, initializes the manager and gRPC server,
44
//! and launches both concurrently. It also provides unit tests for initialization.
55
6-
use common::nodeagent::HandleYamlRequest;
76
use clap::Parser;
7+
use common::nodeagent::HandleYamlRequest;
88
use std::path::PathBuf;
99
mod bluechi;
10+
pub mod config;
1011
pub mod grpc;
1112
pub mod manager;
1213
pub mod resource;
13-
pub mod config;
1414

1515
use common::nodeagent::node_agent_connection_server::NodeAgentConnectionServer;
1616
use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -19,15 +19,19 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
1919
///
2020
/// This function creates the manager, initializes it, and then runs it.
2121
/// If initialization or running fails, errors are printed to stderr.
22-
async fn launch_manager(rx_grpc: Receiver<HandleYamlRequest>, hostname: String, config: config::Config) {
22+
async fn launch_manager(
23+
rx_grpc: Receiver<HandleYamlRequest>,
24+
hostname: String,
25+
config: config::Config,
26+
) {
2327
let mut manager = manager::NodeAgentManager::new(rx_grpc, hostname.clone()).await;
2428

2529
match manager.initialize().await {
2630
Ok(_) => {
2731
println!("NodeAgentManager successfully initialized");
2832
// Add registration with API server
2933
let mut sender = grpc::sender::NodeAgentSender::default();
30-
34+
3135
// Use IP address from config file
3236
let host_ip = config.get_host_ip();
3337
let node_id = format!("{}-{}", hostname, host_ip);
@@ -141,12 +145,16 @@ async fn main() {
141145
config
142146
}
143147
Err(err) => {
144-
eprintln!("Error loading configuration from {}: {}", args.config.display(), err);
148+
eprintln!(
149+
"Error loading configuration from {}: {}",
150+
args.config.display(),
151+
err
152+
);
145153
eprintln!("Falling back to default configuration");
146154
config::Config::default()
147155
}
148156
};
149-
157+
150158
// Set global config for other parts of the application
151159
config::Config::set_global(app_config.clone());
152160

@@ -169,13 +177,13 @@ async fn main() {
169177

170178
#[cfg(test)]
171179
mod tests {
172-
use crate::launch_manager;
173180
use crate::config::Config;
181+
use crate::launch_manager;
174182
use common::nodeagent::HandleYamlRequest;
183+
use std::path::PathBuf;
175184
use tokio::sync::mpsc::{channel, Receiver, Sender};
176185
use tokio::task::LocalSet;
177186
use tokio::time::{sleep, Duration};
178-
use std::path::PathBuf;
179187

180188
#[tokio::test]
181189
async fn test_main_initializes_channels() {

src/server/monitoringserver/src/data_structures.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6+
use common::monitoringserver::ContainerInfo;
67
use common::monitoringserver::NodeInfo;
78
use serde::{Deserialize, Serialize};
89
use std::collections::HashMap;
@@ -54,6 +55,8 @@ pub struct DataStore {
5455
pub nodes: HashMap<String, NodeInfo>,
5556
pub socs: HashMap<String, SocInfo>,
5657
pub boards: HashMap<String, BoardInfo>,
58+
pub containers: HashMap<String, ContainerInfo>,
59+
pub container_node_mapping: HashMap<String, String>, // ADD THIS LINE
5760
}
5861

5962
impl DataStore {
@@ -62,6 +65,8 @@ impl DataStore {
6265
nodes: HashMap::new(),
6366
socs: HashMap::new(),
6467
boards: HashMap::new(),
68+
containers: HashMap::new(),
69+
container_node_mapping: HashMap::new(), // ADD THIS LINE
6570
}
6671
}
6772

@@ -208,6 +213,133 @@ impl DataStore {
208213
Ok(())
209214
}
210215

216+
/// Stores ContainerInfo to memory and etcd
217+
pub async fn store_container_info(
218+
&mut self,
219+
container_info: ContainerInfo,
220+
) -> Result<(), String> {
221+
let container_id = container_info.id.clone();
222+
223+
// Store container in memory
224+
self.containers
225+
.insert(container_id.clone(), container_info.clone());
226+
227+
// Store to etcd with error handling
228+
if let Err(e) = crate::etcd_storage::store_container_info(&container_info).await {
229+
eprintln!(
230+
"[ETCD] Warning: Failed to store ContainerInfo to etcd: {}",
231+
e
232+
);
233+
// Don't fail the entire operation, just log the warning
234+
}
235+
236+
println!(
237+
"[DataStore] Stored container info: {} from node {}",
238+
container_id,
239+
container_info
240+
.names
241+
.first()
242+
.unwrap_or(&"unnamed".to_string())
243+
);
244+
Ok(())
245+
}
246+
247+
/// Stores ContainerInfo to memory and etcd, with explicit node association
248+
pub async fn store_container_info_with_node(
249+
&mut self,
250+
container_info: ContainerInfo,
251+
node_name: String,
252+
) -> Result<(), String> {
253+
let container_id = container_info.id.clone();
254+
255+
// Store container in memory
256+
self.containers
257+
.insert(container_id.clone(), container_info.clone());
258+
259+
// Store the node association
260+
self.container_node_mapping
261+
.insert(container_id.clone(), node_name.clone());
262+
263+
// Store to etcd
264+
if let Err(e) = crate::etcd_storage::store_container_info(&container_info).await {
265+
eprintln!(
266+
"[ETCD] Warning: Failed to store ContainerInfo to etcd: {}",
267+
e
268+
);
269+
}
270+
271+
println!(
272+
"[DataStore] Stored container {} on node {}",
273+
container_id, node_name
274+
);
275+
Ok(())
276+
}
277+
278+
/// Retrieves ContainerInfo from memory, fallback to etcd
279+
pub async fn get_container_info(&self, container_id: &str) -> Result<ContainerInfo, String> {
280+
// Try memory first
281+
if let Some(container_info) = self.containers.get(container_id) {
282+
return Ok(container_info.clone());
283+
}
284+
285+
// Fallback to etcd
286+
match crate::etcd_storage::get_container_info(container_id).await {
287+
Ok(container_info) => Ok(container_info),
288+
Err(e) => Err(format!("Container not found in memory or etcd: {}", e)),
289+
}
290+
}
291+
292+
/// Gets all containers from memory
293+
pub fn get_all_containers(&self) -> &HashMap<String, ContainerInfo> {
294+
&self.containers
295+
}
296+
297+
/// Gets all containers for a specific node
298+
pub fn get_containers_by_node(&self, node_name: &str) -> Vec<&ContainerInfo> {
299+
self.container_node_mapping
300+
.iter()
301+
.filter(|(_, mapped_node)| *mapped_node == node_name)
302+
.filter_map(|(container_id, _)| self.containers.get(container_id))
303+
.collect()
304+
}
305+
306+
/// Removes container from memory and etcd
307+
pub async fn remove_container_info(&mut self, container_id: &str) -> Result<(), String> {
308+
// Remove from memory
309+
self.containers.remove(container_id);
310+
311+
// Remove from etcd
312+
if let Err(e) = crate::etcd_storage::delete_container_info(container_id).await {
313+
eprintln!(
314+
"[ETCD] Warning: Failed to delete ContainerInfo from etcd: {}",
315+
e
316+
);
317+
}
318+
319+
println!("[DataStore] Removed container info: {}", container_id);
320+
Ok(())
321+
}
322+
323+
/// Load all containers from etcd into memory (useful for initialization)
324+
pub async fn load_containers_from_etcd(&mut self) -> Result<(), String> {
325+
match crate::etcd_storage::get_all_containers().await {
326+
Ok(containers) => {
327+
for container in containers {
328+
self.containers.insert(container.id.clone(), container);
329+
}
330+
println!(
331+
"[DataStore] Loaded {} containers from etcd",
332+
self.containers.len()
333+
);
334+
Ok(())
335+
}
336+
Err(e) => {
337+
eprintln!("[ETCD] Warning: Failed to load containers from etcd: {}", e);
338+
Ok(()) // Don't fail initialization
339+
}
340+
}
341+
}
342+
211343
pub fn get_node_info(&self, node_name: &str) -> Option<&NodeInfo> {
212344
self.nodes.get(node_name)
213345
}
@@ -231,6 +363,40 @@ impl DataStore {
231363
pub fn get_all_boards(&self) -> &HashMap<String, BoardInfo> {
232364
&self.boards
233365
}
366+
367+
/// ADD THIS METHOD for cleanup with etcd deletion
368+
pub async fn cleanup_node_containers(
369+
&mut self,
370+
node_name: &str,
371+
current_containers: &[String],
372+
) {
373+
let containers_to_remove: Vec<String> = self
374+
.container_node_mapping
375+
.iter()
376+
.filter(|(_, mapped_node)| *mapped_node == node_name)
377+
.filter(|(container_id, _)| !current_containers.contains(container_id))
378+
.map(|(container_id, _)| container_id.clone())
379+
.collect();
380+
381+
for container_id in containers_to_remove {
382+
// Remove from memory
383+
self.containers.remove(&container_id);
384+
self.container_node_mapping.remove(&container_id);
385+
386+
// Remove from etcd
387+
if let Err(e) = crate::etcd_storage::delete_container_info(&container_id).await {
388+
eprintln!(
389+
"[ETCD] Warning: Failed to delete container {} from etcd: {}",
390+
container_id, e
391+
);
392+
}
393+
394+
println!(
395+
"[DataStore] Removed obsolete container {} from node {}",
396+
container_id, node_name
397+
);
398+
}
399+
}
234400
}
235401

236402
impl SocInfo {

0 commit comments

Comments
 (0)