Skip to content

Commit 1daa5b4

Browse files
authored
Merge pull request #33287 from teskje/swap-usage
cluster,orchestrator-k8s: report swap usage
2 parents 7d24708 + 15daa9f commit 1daa5b4

File tree

5 files changed

+65
-68
lines changed

5 files changed

+65
-68
lines changed

src/catalog/src/builtin.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8857,9 +8857,9 @@ pub static MZ_CLUSTER_REPLICA_UTILIZATION: LazyLock<BuiltinView> = LazyLock::new
88578857
SELECT
88588858
r.id AS replica_id,
88598859
m.process_id,
8860-
m.cpu_nano_cores::float8 / s.cpu_nano_cores * 100 AS cpu_percent,
8861-
m.memory_bytes::float8 / s.memory_bytes * 100 AS memory_percent,
8862-
m.disk_bytes::float8 / s.disk_bytes * 100 AS disk_percent
8860+
m.cpu_nano_cores::float8 / NULLIF(s.cpu_nano_cores, 0) * 100 AS cpu_percent,
8861+
m.memory_bytes::float8 / NULLIF(s.memory_bytes, 0) * 100 AS memory_percent,
8862+
m.disk_bytes::float8 / NULLIF(s.disk_bytes, 0) * 100 AS disk_percent
88638863
FROM
88648864
mz_catalog.mz_cluster_replicas AS r
88658865
JOIN mz_catalog.mz_cluster_replica_sizes AS s ON r.size = s.size
@@ -8907,9 +8907,9 @@ pub static MZ_CLUSTER_REPLICA_UTILIZATION_HISTORY: LazyLock<BuiltinView> =
89078907
SELECT
89088908
r.id AS replica_id,
89098909
m.process_id,
8910-
m.cpu_nano_cores::float8 / s.cpu_nano_cores * 100 AS cpu_percent,
8911-
m.memory_bytes::float8 / s.memory_bytes * 100 AS memory_percent,
8912-
m.disk_bytes::float8 / s.disk_bytes * 100 AS disk_percent,
8910+
m.cpu_nano_cores::float8 / NULLIF(s.cpu_nano_cores, 0) * 100 AS cpu_percent,
8911+
m.memory_bytes::float8 / NULLIF(s.memory_bytes, 0) * 100 AS memory_percent,
8912+
m.disk_bytes::float8 / NULLIF(s.disk_bytes, 0) * 100 AS disk_percent,
89138913
m.occurred_at
89148914
FROM
89158915
mz_catalog.mz_cluster_replicas AS r
@@ -12279,9 +12279,9 @@ replica_metrics_history AS (
1227912279
m.occurred_at,
1228012280
m.replica_id,
1228112281
r.size,
12282-
(SUM(m.cpu_nano_cores::float8) / s.cpu_nano_cores) / s.processes AS cpu_percent,
12283-
(SUM(m.memory_bytes::float8) / s.memory_bytes) / s.processes AS memory_percent,
12284-
(SUM(m.disk_bytes::float8) / s.disk_bytes) / s.processes AS disk_percent,
12282+
(SUM(m.cpu_nano_cores::float8) / NULLIF(s.cpu_nano_cores, 0)) / s.processes AS cpu_percent,
12283+
(SUM(m.memory_bytes::float8) / NULLIF(s.memory_bytes, 0)) / s.processes AS memory_percent,
12284+
(SUM(m.disk_bytes::float8) / NULLIF(s.disk_bytes, 0)) / s.processes AS disk_percent,
1228512285
SUM(m.disk_bytes::float8) AS disk_bytes,
1228612286
SUM(m.memory_bytes::float8) AS memory_bytes,
1228712287
s.disk_bytes::numeric * s.processes AS total_disk_bytes,

src/clusterd/src/usage_metrics.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
//! Support for collecting system usage metrics.
1111
//!
12-
//! Currently only disk usage is supported. We may want to add CPU and memory usage in the future.
12+
//! Currently only disk and swap usage is supported.
13+
//! We may want to add CPU and memory usage in the future.
1314
1415
use std::path::PathBuf;
1516

@@ -26,6 +27,7 @@ impl Collector {
2627
pub fn collect(&self) -> Usage {
2728
Usage {
2829
disk_bytes: self.collect_disk_usage(),
30+
swap_bytes: self.collect_swap_usage(),
2931
}
3032
}
3133

@@ -49,10 +51,33 @@ impl Collector {
4951

5052
Some(used_bytes)
5153
}
54+
55+
#[cfg(target_os = "linux")]
56+
fn collect_swap_usage(&self) -> Option<u64> {
57+
use mz_compute::memory_limiter::ProcStatus;
58+
use mz_ore::cast::CastInto;
59+
60+
match ProcStatus::from_proc() {
61+
Ok(status) => {
62+
let bytes = status.vm_swap.cast_into();
63+
Some(bytes)
64+
}
65+
Err(err) => {
66+
error!("error reading /proc/self/status: {err}");
67+
None
68+
}
69+
}
70+
}
71+
72+
#[cfg(not(target_os = "linux"))]
73+
fn collect_swap_usage(&self) -> Option<u64> {
74+
None
75+
}
5276
}
5377

5478
/// A system usage measurement.
5579
#[derive(Serialize)]
5680
pub(crate) struct Usage {
5781
disk_bytes: Option<u64>,
82+
swap_bytes: Option<u64>,
5883
}

src/compute/src/memory_limiter.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,15 +296,17 @@ impl LimiterMetrics {
296296
}
297297
}
298298

299-
struct ProcStatus {
299+
/// Helper for reading and parsing `/proc/self/status` on Linux.
300+
pub struct ProcStatus {
300301
/// Resident Set Size (RSS) in bytes.
301-
vm_rss: usize,
302+
pub vm_rss: usize,
302303
/// Swap memory in bytes.
303-
vm_swap: usize,
304+
pub vm_swap: usize,
304305
}
305306

306307
impl ProcStatus {
307-
fn from_proc() -> anyhow::Result<Self> {
308+
/// Populate a new `ProcStatus` with information in /proc/self/status.
309+
pub fn from_proc() -> anyhow::Result<Self> {
308310
let contents = std::fs::read_to_string("/proc/self/status")?;
309311
let mut vm_rss = 0;
310312
let mut vm_swap = 0;

src/orchestrator-kubernetes/src/lib.rs

Lines changed: 23 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,6 @@ impl Orchestrator for KubernetesOrchestrator {
215215
#[derive(Clone, Copy)]
216216
struct ServiceInfo {
217217
scale: u16,
218-
disk: bool,
219-
disk_limit: Option<DiskLimit>,
220218
}
221219

222220
struct NamespacedKubernetesOrchestrator {
@@ -1228,14 +1226,10 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
12281226
},
12291227
});
12301228

1231-
self.service_infos.lock().expect("poisoned lock").insert(
1232-
id.to_string(),
1233-
ServiceInfo {
1234-
scale,
1235-
disk,
1236-
disk_limit,
1237-
},
1238-
);
1229+
self.service_infos
1230+
.lock()
1231+
.expect("poisoned lock")
1232+
.insert(id.to_string(), ServiceInfo { scale });
12391233

12401234
Ok(Box::new(KubernetesService { hosts, ports }))
12411235
}
@@ -1445,31 +1439,16 @@ impl OrchestratorWorker {
14451439
self_: &OrchestratorWorker,
14461440
service_name: &str,
14471441
i: usize,
1448-
disk: bool,
1449-
disk_limit: Option<DiskLimit>,
14501442
) -> ServiceProcessMetrics {
14511443
let name = format!("{service_name}-{i}");
14521444

1453-
let disk_usage_fut = async {
1454-
if disk {
1455-
Some(get_disk_usage(self_, service_name, i).await)
1456-
} else {
1457-
None
1458-
}
1459-
};
1445+
let disk_usage_fut = get_disk_usage(self_, service_name, i);
14601446
let (metrics, disk_usage) =
14611447
match futures::future::join(self_.metrics_api.get(&name), disk_usage_fut).await {
1462-
(Ok(metrics), disk_usage) => {
1463-
let disk_usage = match disk_usage {
1464-
Some(Ok(disk_usage)) => Some(disk_usage),
1465-
Some(Err(e)) => {
1466-
warn!("Failed to fetch disk usage for {name}: {e}");
1467-
None
1468-
}
1469-
_ => None,
1470-
};
1471-
1472-
(metrics, disk_usage)
1448+
(Ok(metrics), Ok(disk_usage)) => (metrics, disk_usage),
1449+
(Ok(metrics), Err(e)) => {
1450+
warn!("Failed to fetch disk usage for {name}: {e}");
1451+
(metrics, None)
14731452
}
14741453
(Err(e), _) => {
14751454
warn!("Failed to get metrics for {name}: {e}");
@@ -1516,21 +1495,6 @@ impl OrchestratorWorker {
15161495
}
15171496
};
15181497

1519-
// We only populate a `disk_usage` if we have both:
1520-
// - a disk limit (so it must be an actual managed cluster with a real limit)
1521-
// - a reported disk usage
1522-
//
1523-
// The disk limit can be more up-to-date (from `service_infos`) than the
1524-
// reported metric. In that case, we report the minimum of the usage
1525-
// and the limit, which means we can report 100% usage temporarily
1526-
// if a replica is sized down.
1527-
let disk_usage = match (disk_usage, disk_limit) {
1528-
(Some(disk_usage), Some(DiskLimit(disk_limit))) => {
1529-
Some(std::cmp::min(disk_usage, disk_limit.0))
1530-
}
1531-
_ => None,
1532-
};
1533-
15341498
ServiceProcessMetrics {
15351499
cpu_nano_cores: cpu,
15361500
memory_bytes: memory,
@@ -1547,10 +1511,11 @@ impl OrchestratorWorker {
15471511
self_: &OrchestratorWorker,
15481512
service_name: &str,
15491513
i: usize,
1550-
) -> anyhow::Result<u64> {
1514+
) -> anyhow::Result<Option<u64>> {
15511515
#[derive(Deserialize)]
15521516
pub(crate) struct Usage {
15531517
disk_bytes: Option<u64>,
1518+
swap_bytes: Option<u64>,
15541519
}
15551520

15561521
let service = self_
@@ -1584,16 +1549,21 @@ impl OrchestratorWorker {
15841549
.build()
15851550
.context("error building HTTP client")?;
15861551
let resp = http_client.get(metrics_url).send().await?;
1587-
let usage: Usage = resp.json().await?;
1552+
let Usage {
1553+
disk_bytes,
1554+
swap_bytes,
1555+
} = resp.json().await?;
15881556

1589-
usage
1590-
.disk_bytes
1591-
.ok_or_else(|| anyhow!("process did not provide disk usage"))
1557+
let bytes = if let (Some(disk), Some(swap)) = (disk_bytes, swap_bytes) {
1558+
Some(disk + swap)
1559+
} else {
1560+
disk_bytes.or(swap_bytes)
1561+
};
1562+
Ok(bytes)
15921563
}
15931564

1594-
let ret = futures::future::join_all(
1595-
(0..info.scale).map(|i| get_metrics(self, name, i.into(), info.disk, info.disk_limit)),
1596-
);
1565+
let ret =
1566+
futures::future::join_all((0..info.scale).map(|i| get_metrics(self, name, i.into())));
15971567

15981568
ret.await
15991569
}

test/sqllogictest/system-cluster.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ ORDER BY r.id;
411411
Explained Query:
412412
Finish order_by=[#0{id} asc nulls_last] output=[#0..=#5]
413413
Project (#0{id}..=#3{size}, #5{name}, #29)
414-
Map (((uint8_to_double(#27{memory_bytes}) / uint8_to_double(#21{memory_bytes})) * 100))
414+
Map (((uint8_to_double(#27{memory_bytes}) / uint8_to_double(case when (0 = uint8_to_numeric(#21{memory_bytes})) then null else #21{memory_bytes} end)) * 100))
415415
Join on=(#0{id} = #15{id} = #24{replica_id} AND #2{cluster_id} = #4{id} AND #16{size} = #17{size}) type=delta
416416
ArrangeBy keys=[[#0{id}], [#2{cluster_id}]]
417417
Project (#0{id}..=#3{size})

0 commit comments

Comments
 (0)