Skip to content

Commit 79103ee

Browse files
authored
feat(server): add threads count and disk space to sysinfo stats (#2917)
1 parent b87a84f commit 79103ee

File tree

7 files changed

+232
-5
lines changed

7 files changed

+232
-5
lines changed

core/cli/src/commands/binary_system/stats.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,19 @@ impl CliCommand for GetStatsCmd {
159159
format!("{}", stats.consumer_groups_count).as_str(),
160160
]);
161161

162+
table.add_row(vec![
163+
"Threads Count",
164+
format!("{}", stats.threads_count).as_str(),
165+
]);
166+
table.add_row(vec![
167+
"Free Disk Space",
168+
stats.free_disk_space.as_bytes_u64().to_string().as_str(),
169+
]);
170+
table.add_row(vec![
171+
"Total Disk Space",
172+
stats.total_disk_space.as_bytes_u64().to_string().as_str(),
173+
]);
174+
162175
table.add_row(vec!["OS Name", stats.os_name.as_str()]);
163176
table.add_row(vec!["OS Version", stats.os_version.as_str()]);
164177
table.add_row(vec!["Kernel Version", stats.kernel_version.as_str()]);
@@ -208,6 +221,16 @@ impl CliCommand for GetStatsCmd {
208221
stats.consumer_groups_count
209222
));
210223

224+
list.push(format!("Threads Count|{}", stats.threads_count));
225+
list.push(format!(
226+
"Free Disk Space|{}",
227+
stats.free_disk_space.as_bytes_u64()
228+
));
229+
list.push(format!(
230+
"Total Disk Space|{}",
231+
stats.total_disk_space.as_bytes_u64()
232+
));
233+
211234
list.push(format!("OS Name|{}", stats.os_name));
212235
list.push(format!("OS Version|{}", stats.os_version));
213236
list.push(format!("Kernel Version|{}", stats.kernel_version));

core/common/src/traits/binary_mapper.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,41 @@ pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
340340
}
341341
}
342342

343+
let mut threads_count = 0u32;
344+
if current_position + 4 <= payload.len() {
345+
threads_count = u32::from_le_bytes(
346+
payload[current_position..current_position + 4]
347+
.try_into()
348+
.map_err(|_| IggyError::InvalidNumberEncoding)?,
349+
);
350+
current_position += 4;
351+
}
352+
353+
let mut free_disk_space: IggyByteSize = 0.into();
354+
if current_position + 8 <= payload.len() {
355+
free_disk_space = u64::from_le_bytes(
356+
payload[current_position..current_position + 8]
357+
.try_into()
358+
.map_err(|_| IggyError::InvalidNumberEncoding)?,
359+
)
360+
.into();
361+
current_position += 8;
362+
}
363+
364+
let mut total_disk_space: IggyByteSize = 0.into();
365+
if current_position + 8 <= payload.len() {
366+
total_disk_space = u64::from_le_bytes(
367+
payload[current_position..current_position + 8]
368+
.try_into()
369+
.map_err(|_| IggyError::InvalidNumberEncoding)?,
370+
)
371+
.into();
372+
#[allow(unused_assignments)]
373+
{
374+
current_position += 8;
375+
}
376+
}
377+
343378
Ok(Stats {
344379
process_id,
345380
cpu_usage,
@@ -366,6 +401,9 @@ pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
366401
iggy_server_version,
367402
iggy_server_semver,
368403
cache_metrics,
404+
threads_count,
405+
free_disk_space,
406+
total_disk_space,
369407
})
370408
}
371409

core/common/src/types/stats/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ pub struct Stats {
7474
/// Cache metrics per partition
7575
#[serde(with = "cache_metrics_serializer")]
7676
pub cache_metrics: HashMap<CacheMetricsKey, CacheMetrics>,
77+
/// The number of threads in the server process.
78+
pub threads_count: u32,
79+
/// The available (free) disk space for the data directory.
80+
pub free_disk_space: IggyByteSize,
81+
/// The total disk space for the data directory.
82+
pub total_disk_space: IggyByteSize,
7783
}
7884

7985
/// Key for identifying a specific partition's cache metrics
@@ -181,6 +187,9 @@ impl Default for Stats {
181187
iggy_server_version: "unknown_iggy_version".to_string(),
182188
iggy_server_semver: None,
183189
cache_metrics: HashMap::new(),
190+
threads_count: 0,
191+
free_disk_space: 0.into(),
192+
total_disk_space: 0.into(),
184193
}
185194
}
186195
}

core/integration/tests/cli/system/test_stats_command.rs

Lines changed: 126 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@ use async_trait::async_trait;
2424
use iggy::prelude::Client;
2525
use iggy::prelude::Identifier;
2626
use iggy::prelude::IggyExpiry;
27+
use iggy::prelude::IggyMessage;
2728
use iggy::prelude::MaxTopicSize;
29+
use iggy::prelude::Partitioning;
2830
use iggy_cli::commands::binary_system::stats::GetStatsOutput;
31+
use iggy_common::Stats;
2932
use predicates::str::{contains, starts_with};
3033
use serial_test::parallel;
34+
use std::str::FromStr;
3135

3236
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
3337
enum TestStatsCmdOutput {
@@ -97,7 +101,10 @@ impl IggyCmdTestCase for TestStatsCmd {
97101
.stdout(contains("Segments Count | 5"))
98102
.stdout(contains("Message Count | 0"))
99103
// Note: Client count can vary due to connection lifecycle; at least 2 expected
100-
.stdout(contains("Consumer Groups Count | 0"));
104+
.stdout(contains("Consumer Groups Count | 0"))
105+
.stdout(contains("Threads Count"))
106+
.stdout(contains("Free Disk Space"))
107+
.stdout(contains("Total Disk Space"));
101108
}
102109
TestStatsCmdOutput::Set(GetStatsOutput::List) => {
103110
command_state
@@ -107,7 +114,10 @@ impl IggyCmdTestCase for TestStatsCmd {
107114
.stdout(contains("Partitions Count|5"))
108115
.stdout(contains("Segments Count|5"))
109116
.stdout(contains("Message Count|0"))
110-
.stdout(contains("Consumer Groups Count|0"));
117+
.stdout(contains("Consumer Groups Count|0"))
118+
.stdout(contains("Threads Count|"))
119+
.stdout(contains("Free Disk Space|"))
120+
.stdout(contains("Total Disk Space|"));
111121
}
112122
TestStatsCmdOutput::Set(GetStatsOutput::Json) => {
113123
command_state
@@ -117,7 +127,10 @@ impl IggyCmdTestCase for TestStatsCmd {
117127
.stdout(contains(r#""partitions_count": 5"#))
118128
.stdout(contains(r#""segments_count": 5"#))
119129
.stdout(contains(r#""messages_count": 0"#))
120-
.stdout(contains(r#""consumer_groups_count": 0"#));
130+
.stdout(contains(r#""consumer_groups_count": 0"#))
131+
.stdout(contains(r#""threads_count":"#))
132+
.stdout(contains(r#""free_disk_space":"#))
133+
.stdout(contains(r#""total_disk_space":"#));
121134
}
122135
TestStatsCmdOutput::Set(GetStatsOutput::Toml) => {
123136
command_state
@@ -127,7 +140,10 @@ impl IggyCmdTestCase for TestStatsCmd {
127140
.stdout(contains("partitions_count = 5"))
128141
.stdout(contains("segments_count = 5"))
129142
.stdout(contains("messages_count = 0"))
130-
.stdout(contains("consumer_groups_count = 0"));
143+
.stdout(contains("consumer_groups_count = 0"))
144+
.stdout(contains("threads_count ="))
145+
.stdout(contains("free_disk_space ="))
146+
.stdout(contains("total_disk_space ="));
131147
}
132148
}
133149
}
@@ -143,6 +159,109 @@ impl IggyCmdTestCase for TestStatsCmd {
143159
}
144160
}
145161

162+
struct TestStatsCmdWithMessages {
163+
stream_id: u32,
164+
topic_id: u32,
165+
}
166+
167+
impl TestStatsCmdWithMessages {
168+
fn new() -> Self {
169+
Self {
170+
stream_id: 0,
171+
topic_id: 0,
172+
}
173+
}
174+
}
175+
176+
#[async_trait]
177+
impl IggyCmdTestCase for TestStatsCmdWithMessages {
178+
async fn prepare_server_state(&mut self, client: &dyn Client) {
179+
let stream = client.create_stream("size-test").await;
180+
assert!(stream.is_ok());
181+
let stream_details = stream.unwrap();
182+
self.stream_id = stream_details.id;
183+
184+
let topic = client
185+
.create_topic(
186+
&self.stream_id.try_into().unwrap(),
187+
"topic",
188+
1,
189+
Default::default(),
190+
None,
191+
IggyExpiry::NeverExpire,
192+
MaxTopicSize::ServerDefault,
193+
)
194+
.await;
195+
assert!(topic.is_ok());
196+
let topic_details = topic.unwrap();
197+
self.topic_id = topic_details.id;
198+
199+
let mut messages = (1..=10)
200+
.filter_map(|id| IggyMessage::from_str(format!("Test message {id}").as_str()).ok())
201+
.collect::<Vec<_>>();
202+
let send_status = client
203+
.send_messages(
204+
&self.stream_id.try_into().unwrap(),
205+
&self.topic_id.try_into().unwrap(),
206+
&Partitioning::default(),
207+
&mut messages,
208+
)
209+
.await;
210+
assert!(send_status.is_ok());
211+
}
212+
213+
fn get_command(&self) -> IggyCmdCommand {
214+
IggyCmdCommand::new()
215+
.arg("stats")
216+
.arg("-o")
217+
.arg("json")
218+
.opt("-q")
219+
.with_env_credentials()
220+
}
221+
222+
fn verify_command(&self, command_state: Assert) {
223+
let assert = command_state.success();
224+
let stdout = String::from_utf8_lossy(&assert.get_output().stdout);
225+
let stats: Stats =
226+
serde_json::from_str(&stdout).expect("Failed to parse stats JSON output");
227+
228+
assert!(
229+
stats.messages_count > 0,
230+
"messages_count should be > 0 after sending messages"
231+
);
232+
assert!(
233+
stats.messages_size_bytes.as_bytes_u64() > 0,
234+
"messages_size_bytes should be > 0 after sending messages"
235+
);
236+
assert!(
237+
stats.free_disk_space.as_bytes_u64() > 0,
238+
"free_disk_space should be > 0"
239+
);
240+
assert!(
241+
stats.total_disk_space.as_bytes_u64() > 0,
242+
"total_disk_space should be > 0"
243+
);
244+
assert!(
245+
stats.free_disk_space.as_bytes_u64() <= stats.total_disk_space.as_bytes_u64(),
246+
"free_disk_space should be <= total_disk_space"
247+
);
248+
}
249+
250+
async fn verify_server_state(&self, client: &dyn Client) {
251+
client
252+
.delete_topic(
253+
&self.stream_id.try_into().unwrap(),
254+
&self.topic_id.try_into().unwrap(),
255+
)
256+
.await
257+
.unwrap();
258+
client
259+
.delete_stream(&self.stream_id.try_into().unwrap())
260+
.await
261+
.unwrap();
262+
}
263+
}
264+
146265
#[tokio::test]
147266
#[parallel]
148267
pub async fn should_be_successful() {
@@ -172,6 +291,9 @@ pub async fn should_be_successful() {
172291
GetStatsOutput::Toml,
173292
)))
174293
.await;
294+
iggy_cmd_test
295+
.execute_test(TestStatsCmdWithMessages::new())
296+
.await;
175297
}
176298

177299
#[tokio::test]

core/server/src/binary/mapper.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ pub fn map_stats(stats: &Stats) -> Bytes {
7272
bytes.put_f32_le(metrics.hit_ratio);
7373
}
7474

75+
bytes.put_u32_le(stats.threads_count);
76+
bytes.put_u64_le(stats.free_disk_space.as_bytes_u64());
77+
bytes.put_u64_le(stats.total_disk_space.as_bytes_u64());
78+
7579
bytes.freeze()
7680
}
7781

core/server/src/shard/system/stats.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ impl IggyShard {
8787
let disk_usage = process.disk_usage();
8888
stats.read_bytes = disk_usage.total_read_bytes.into();
8989
stats.written_bytes = disk_usage.total_written_bytes.into();
90+
91+
stats.threads_count = process.tasks().map(|t| t.len() as u32).unwrap_or(0);
9092
}
9193

9294
let (streams_count, topics_count, partitions_count, consumer_groups_count, stream_ids) =
@@ -118,6 +120,25 @@ impl IggyShard {
118120
}
119121
}
120122

123+
match fs2::available_space(&self.config.system.path) {
124+
Ok(space) => stats.free_disk_space = space.into(),
125+
Err(err) => {
126+
tracing::warn!(
127+
"Failed to get available disk space for '{}': {err}",
128+
self.config.system.path
129+
);
130+
}
131+
}
132+
match fs2::total_space(&self.config.system.path) {
133+
Ok(space) => stats.total_disk_space = space.into(),
134+
Err(err) => {
135+
tracing::warn!(
136+
"Failed to get total disk space for '{}': {err}",
137+
self.config.system.path
138+
);
139+
}
140+
}
141+
121142
Ok(stats)
122143
})
123144
}

core/server/src/shard/tasks/periodic/sysinfo_printer.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,24 +69,34 @@ async fn print_sysinfo(shard: Rc<IggyShard>) -> Result<(), IggyError> {
6969
/ stats.total_memory.as_bytes_u64() as f64)
7070
* 100f64;
7171

72+
let threads_info = if stats.threads_count > 0 {
73+
format!(", Threads: {}", stats.threads_count)
74+
} else {
75+
String::new()
76+
};
77+
7278
let open_files_info = if let Some(open_files) = get_open_file_descriptors() {
7379
format!(", OpenFDs: {}", open_files)
7480
} else {
7581
String::new()
7682
};
7783

7884
info!(
79-
"CPU: {:.2}%/{:.2}% (IggyUsage/Total), Mem: {:.2}%/{}/{}/{} (Free/IggyUsage/TotalUsed/Total), Clients: {}, Messages: {}, Read: {}, Written: {}{}",
85+
"CPU: {:.2}%/{:.2}% (IggyUsage/Total), Mem: {:.2}%/{}/{}/{} (Free/IggyUsage/TotalUsed/Total), Disk: {}/{} (Free/Total), IggyUsage: {}, Clients: {}, Messages: {}, Read: {}, Written: {}{}{}",
8086
stats.cpu_usage,
8187
stats.total_cpu_usage,
8288
free_memory_percent,
8389
stats.memory_usage,
8490
stats.total_memory - stats.available_memory,
8591
stats.total_memory,
92+
stats.free_disk_space,
93+
stats.total_disk_space,
94+
stats.messages_size_bytes,
8695
stats.clients_count.human_count_bare().to_string(),
8796
stats.messages_count.human_count_bare().to_string(),
8897
stats.read_bytes,
8998
stats.written_bytes,
99+
threads_info,
90100
open_files_info,
91101
);
92102

0 commit comments

Comments
 (0)