Skip to content

Commit 39d7053

Browse files
committed
rust version implementation of procdockerstatsd
1 parent bb0a31c commit 39d7053

File tree

3 files changed

+179
-0
lines changed

3 files changed

+179
-0
lines changed

scripts/rs/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "procdockerstatsd_rs"
3+
version = "1.0.0"
4+
edition = "2024"
5+
description = "Daemon to collect process and Docker stats and push them to Redis"
6+
license = "MIT"
7+
authors = ["Feng Pan"]
8+
9+
[dependencies]
10+
redis = "0.23"
11+
sysinfo = "0.30"
12+
regex = "1.10"
13+
daemonize = "0.5"
14+
chrono = "0.4"
15+
16+
[[bin]]
17+
name = "procdockerstatsd_rs"
18+
path = "main.rs"

scripts/rs/main.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use std::process::Command;
2+
use std::thread::sleep;
3+
use std::time::Duration;
4+
use redis::{Commands, Connection, Client};
5+
use regex::Regex;
6+
use sysinfo::{System, Process};
7+
use chrono::Utc;
8+
use std::fs;
9+
use std::collections::HashMap;
10+
11+
12+
const REDIS_URL: &str = "redis://127.0.0.1/";
13+
const UPDATE_INTERVAL: u64 = 120; // 2 minutes
14+
15+
16+
fn run_command(cmd: &[&str]) -> Option<String> {
17+
let output = Command::new(cmd[0]).args(&cmd[1..]).output().ok()?;
18+
if output.status.success() {
19+
Some(String::from_utf8_lossy(&output.stdout).to_string())
20+
} else {
21+
eprintln!("Error running command: {:?}", cmd);
22+
None
23+
}
24+
}
25+
26+
fn convert_to_bytes(value: &str) -> u64 {
27+
let re = Regex::new(r"(\d+\.?\d*)([a-zA-Z]+)").unwrap();
28+
if let Some(caps) = re.captures(value) {
29+
let num: f64 = caps[1].parse().unwrap_or(0.0);
30+
let unit = &caps[2];
31+
match unit.to_lowercase().as_str() {
32+
"kb" => (num * 1024.0) as u64,
33+
"mb" | "mib" => (num * 1024.0 * 1024.0) as u64,
34+
"gb" | "gib" => (num * 1024.0 * 1024.0 * 1024.0) as u64,
35+
_ => num as u64,
36+
}
37+
} else {
38+
0
39+
}
40+
}
41+
42+
fn parse_docker_stats(output: &str) -> Vec<HashMap<String, String>> {
43+
let lines: Vec<&str> = output.lines().collect();
44+
if lines.len() < 2 { return vec![]; }
45+
46+
let keys: Vec<&str> = lines[0].split_whitespace().collect();
47+
let mut stats_list = Vec::new();
48+
49+
for line in &lines[1..] {
50+
let values: Vec<&str> = line.split_whitespace().collect();
51+
if values.len() >= keys.len() {
52+
let mut stats = HashMap::new();
53+
stats.insert("CONTAINER ID".to_string(), values[0].to_string());
54+
stats.insert("NAME".to_string(), values[1].to_string());
55+
stats.insert("CPU%".to_string(), values[2].trim_end_matches('%').to_string());
56+
stats.insert("MEM_BYTES".to_string(), convert_to_bytes(values[3]).to_string());
57+
stats.insert("MEM_LIMIT_BYTES".to_string(), convert_to_bytes(values[5]).to_string());
58+
stats.insert("MEM%".to_string(), values[6].trim_end_matches('%').to_string());
59+
stats.insert("NET_IN_BYTES".to_string(), convert_to_bytes(values[7]).to_string());
60+
stats.insert("NET_OUT_BYTES".to_string(), convert_to_bytes(values[9]).to_string());
61+
stats.insert("BLOCK_IN_BYTES".to_string(), convert_to_bytes(values[10]).to_string());
62+
stats.insert("BLOCK_OUT_BYTES".to_string(), convert_to_bytes(values[12]).to_string());
63+
stats.insert("PIDS".to_string(), values[13].to_string());
64+
stats_list.push(stats);
65+
}
66+
}
67+
stats_list
68+
}
69+
70+
fn collect_docker_stats(conn: &mut Connection) {
71+
if let Some(output) = run_command(&["docker", "stats", "--no-stream", "-a"]) {
72+
let stats_list = parse_docker_stats(&output);
73+
let _: () = redis::cmd("DEL").arg("DOCKER_STATS|*").execute(conn);
74+
for stats in stats_list {
75+
let key = format!("DOCKER_STATS|{}", stats["CONTAINER ID"]);
76+
77+
// Convert the HashMap to a vector of tuples
78+
let stats_vec: Vec<(String, String)> = stats.into_iter().collect();
79+
let _: () = conn.hset_multiple(&key, &stats_vec).unwrap();
80+
}
81+
}
82+
}
83+
84+
fn collect_process_stats(conn: &mut Connection) {
85+
let mut sys = System::new_all();
86+
sys.refresh_all();
87+
88+
let mut process_list: Vec<&Process> = sys.processes().values().collect();
89+
90+
// Sort processes by CPU usage in descending order and take top 1024
91+
process_list.sort_by(|a, b| b.cpu_usage().partial_cmp(&a.cpu_usage()).unwrap());
92+
let top_processes = process_list.iter().take(1024);
93+
94+
let mut active_pids = std::collections::HashSet::new();
95+
96+
for process in top_processes {
97+
let pid = process.pid().as_u32();
98+
active_pids.insert(pid);
99+
100+
let key = format!("PROCESS_STATS|{}", pid);
101+
102+
let stats: Vec<(String, String)> = vec![
103+
("UID".to_string(), process.user_id().map(|uid| uid.to_string()).unwrap_or_else(|| "0".to_string())),
104+
("PPID".to_string(), process.parent().map(|p| p.to_string()).unwrap_or_else(|| "0".to_string())),
105+
("CMD".to_string(), process.cmd().join(" ")),
106+
("CPU".to_string(), format!("{:.2}", process.cpu_usage() as f64)),
107+
("MEM".to_string(), process.memory().to_string()),
108+
("STIME".to_string(), process.start_time().to_string()),
109+
];
110+
111+
let _: () = conn.hset_multiple(&key, &stats).unwrap();
112+
}
113+
114+
// Remove stale process stats from Redis
115+
let existing_keys: Vec<String> = conn.keys("PROCESS_STATS|*").unwrap_or_default();
116+
for key in existing_keys {
117+
if let Some(pid_str) = key.strip_prefix("PROCESS_STATS|") {
118+
if let Ok(pid) = pid_str.parse::<u32>() {
119+
if !active_pids.contains(&pid) {
120+
let _: () = conn.del(&key).unwrap();
121+
}
122+
}
123+
}
124+
}
125+
}
126+
127+
128+
fn collect_fips_stats(conn: &mut Connection) {
129+
let kernel_cmdline = fs::read_to_string("/proc/cmdline").unwrap_or_default();
130+
let enforced = kernel_cmdline.contains("sonic_fips=1") || kernel_cmdline.contains("fips=1");
131+
let enabled = run_command(&["sudo", "openssl", "engine", "-vv"]).map_or(false, |out| out.contains("symcryp"));
132+
133+
let key = "FIPS_STATS|state";
134+
let mut stats = HashMap::new();
135+
stats.insert("timestamp".to_string(), Utc::now().to_rfc3339());
136+
stats.insert("enforced".to_string(), enforced.to_string());
137+
stats.insert("enabled".to_string(), enabled.to_string());
138+
139+
// Convert the HashMap to a vector of tuples
140+
let stats_vec: Vec<(String, String)> = stats.into_iter().collect();
141+
142+
// Pass the vector of tuples to hset_multiple
143+
let _: () = conn.hset_multiple(&key, &stats_vec).unwrap();
144+
}
145+
146+
fn main() {
147+
let client = Client::open(REDIS_URL).expect("Failed to connect to Redis");
148+
let mut conn = client.get_connection().expect("Failed to get Redis connection");
149+
150+
loop {
151+
collect_docker_stats(&mut conn);
152+
collect_process_stats(&mut conn);
153+
collect_fips_stats(&mut conn);
154+
155+
let timestamp = Utc::now().to_rfc3339();
156+
let _: () = conn.set("STATS|LastUpdateTime", timestamp).unwrap();
157+
sleep(Duration::from_secs(UPDATE_INTERVAL));
158+
}
159+
}

scripts/rs/rs.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
source $HOME/.cargo/env
2+
cargo build

0 commit comments

Comments
 (0)