Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ const PEER_ID: &str = "peer_id";
const PEER_TYPE: &str = "peer_type";
const PEER_ADDR: &str = "peer_addr";
const PEER_HOSTNAME: &str = "peer_hostname";
const CPUS: &str = "cpus";
const MEMORY_BYTES: &str = "memory_bytes";
const TOTAL_CPU_MILLICORES: &str = "total_cpu_millicores";
const TOTAL_MEMORY_BYTES: &str = "total_memory_bytes";
const VERSION: &str = "version";
const GIT_COMMIT: &str = "git_commit";
const START_TIME: &str = "start_time";
Expand All @@ -67,8 +67,8 @@ const INIT_CAPACITY: usize = 42;
/// - `peer_id`: the peer server id.
/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
/// - `peer_addr`: the peer gRPC address.
/// - `cpus`: the number of CPUs of the peer.
/// - `memory_bytes`: the memory bytes of the peer.
/// - `total_cpu_millicores`: the total CPU millicores of the peer.
/// - `total_memory_bytes`: the total memory bytes of the peer.
/// - `version`: the build package version of the peer.
/// - `git_commit`: the build git commit hash of the peer.
/// - `start_time`: the starting time of the peer.
Expand Down Expand Up @@ -97,8 +97,16 @@ impl InformationSchemaClusterInfo {
ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(PEER_HOSTNAME, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(CPUS, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(MEMORY_BYTES, ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(
TOTAL_CPU_MILLICORES,
ConcreteDataType::uint32_datatype(),
false,
),
ColumnSchema::new(
TOTAL_MEMORY_BYTES,
ConcreteDataType::uint64_datatype(),
false,
),
ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
Expand Down
8 changes: 5 additions & 3 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_mem_prof::activate_heap_profile;
use common_stat::{get_cpu_limit, get_memory_limit};
use common_stat::{get_total_cpu_millicores, get_total_memory_bytes};
use common_telemetry::{error, info, warn};

use crate::error::Result;
Expand Down Expand Up @@ -125,15 +125,17 @@ pub fn log_versions(version: &str, short_version: &str, app: &str) {
}

pub fn create_resource_limit_metrics(app: &str) {
if let Some(cpu_limit) = get_cpu_limit() {
let cpu_limit = get_total_cpu_millicores();
if cpu_limit > 0 {
info!(
"GreptimeDB start with cpu limit in millicores: {}",
cpu_limit
);
CPU_LIMIT.with_label_values(&[app]).set(cpu_limit);
}

if let Some(memory_limit) = get_memory_limit() {
let memory_limit = get_total_memory_bytes();
if memory_limit > 0 {
info!(
"GreptimeDB start with memory limit in bytes: {}",
memory_limit
Expand Down
3 changes: 1 addition & 2 deletions src/common/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-stat.workspace = true
config.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
object-store.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
snafu.workspace = true
sysinfo.workspace = true
toml.workspace = true

[dev-dependencies]
Expand Down
47 changes: 4 additions & 43 deletions src/common/config/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,61 +13,22 @@
// limitations under the License.

use common_base::readable_size::ReadableSize;
use sysinfo::System;

/// Get the CPU core number of system, aware of cgroups.
pub fn get_cpus() -> usize {
// This function will check cgroups
num_cpus::get()
}

/// Get the total memory of the system.
/// If `cgroup_limits` is enabled, it will also check it.
pub fn get_sys_total_memory() -> Option<ReadableSize> {
if sysinfo::IS_SUPPORTED_SYSTEM {
let mut sys_info = System::new();
sys_info.refresh_memory();
let mut total_memory = sys_info.total_memory();
// Compare with cgroups memory limit, use smaller values
// This method is only implemented for Linux. It always returns None for all other systems.
if let Some(cgroup_limits) = sys_info.cgroup_limits() {
total_memory = total_memory.min(cgroup_limits.total_memory)
}
Some(ReadableSize(total_memory))
} else {
None
}
}
use common_stat::{get_total_cpu_millicores, get_total_memory_readable};

/// `ResourceSpec` holds the static resource specifications of a node,
/// such as CPU cores and memory capacity. These values are fixed
/// at startup and do not change dynamically during runtime.
#[derive(Debug, Clone, Copy)]
pub struct ResourceSpec {
pub cpus: usize,
pub cpus: i64,
pub memory: Option<ReadableSize>,
}

impl Default for ResourceSpec {
fn default() -> Self {
Self {
cpus: get_cpus(),
memory: get_sys_total_memory(),
cpus: get_total_cpu_millicores(),
memory: get_total_memory_readable(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_get_cpus() {
assert!(get_cpus() > 0);
}

#[test]
fn test_get_sys_total_memory() {
assert!(get_sys_total_memory().unwrap() > ReadableSize::mb(0));
}
}
3 changes: 3 additions & 0 deletions src/common/stat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ edition.workspace = true
license.workspace = true

[dependencies]
common-base.workspace = true
lazy_static.workspace = true
nix.workspace = true
num_cpus.workspace = true
prometheus.workspace = true
sysinfo.workspace = true

[lints]
workspace = true
47 changes: 20 additions & 27 deletions src/common/stat/src/cgroups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ use prometheus::core::{Collector, Desc};
use prometheus::proto::MetricFamily;
use prometheus::{IntGauge, Opts};

/// `MAX_VALUE` is used to indicate that the resource is unlimited.
pub const MAX_VALUE: i64 = -1;

const CGROUP_UNIFIED_MOUNTPOINT: &str = "/sys/fs/cgroup";

const MEMORY_MAX_FILE_CGROUP_V2: &str = "memory.max";
Expand All @@ -43,11 +40,11 @@ const MAX_VALUE_CGROUP_V2: &str = "max";
// For easier comparison, if the memory limit is larger than 1PB we consider it as unlimited.
const MAX_MEMORY_IN_BYTES: i64 = 1125899906842624; // 1PB

/// Get the limit of memory in bytes.
/// Get the limit of memory in bytes from cgroups filesystem.
///
/// - If the memory is unlimited, return `-1`.
/// - If the cgroup total memory is unset, return `None`.
/// - Return `None` if it fails to read the memory limit or not on linux.
pub fn get_memory_limit() -> Option<i64> {
pub fn get_memory_limit_from_cgroups() -> Option<i64> {
#[cfg(target_os = "linux")]
{
let memory_max_file = if is_cgroup_v2()? {
Expand All @@ -58,13 +55,13 @@ pub fn get_memory_limit() -> Option<i64> {
MEMORY_MAX_FILE_CGROUP_V1
};

// For cgroup v1, it will return a very large value(different from platform) if the memory is unlimited.
// For cgroup v1, it will return a very large value(different from platform) if the memory is unset.
let memory_limit =
read_value_from_file(Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(memory_max_file))?;

// If memory limit exceeds 1PB(cgroup v1), consider it as unlimited.
// If memory limit exceeds 1PB(cgroup v1), consider it as unset.
if memory_limit > MAX_MEMORY_IN_BYTES {
return Some(MAX_VALUE);
return None;
}
Some(memory_limit)
}
Expand All @@ -73,10 +70,10 @@ pub fn get_memory_limit() -> Option<i64> {
None
}

/// Get the usage of memory in bytes.
/// Get the usage of memory in bytes from cgroups filesystem.
///
/// - Return `None` if it fails to read the memory usage or not on linux or cgroup is v1.
pub fn get_memory_usage() -> Option<i64> {
pub fn get_memory_usage_from_cgroups() -> Option<i64> {
#[cfg(target_os = "linux")]
{
if is_cgroup_v2()? {
Expand All @@ -93,11 +90,11 @@ pub fn get_memory_usage() -> Option<i64> {
None
}

/// Get the limit of cpu in millicores.
/// Get the limit of cpu in millicores from cgroups filesystem.
///
/// - If the cpu is unlimited, return `-1`.
/// - If the cpu limit is unset, return `None`.
/// - Return `None` if it fails to read the cpu limit or not on linux.
pub fn get_cpu_limit() -> Option<i64> {
pub fn get_cpu_limit_from_cgroups() -> Option<i64> {
#[cfg(target_os = "linux")]
if is_cgroup_v2()? {
// Read `/sys/fs/cgroup/cpu.max` to get the cpu limit.
Expand All @@ -108,10 +105,6 @@ pub fn get_cpu_limit() -> Option<i64> {
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_QUOTA_FILE_CGROUP_V1),
)?;

if quota == MAX_VALUE {
return Some(MAX_VALUE);
}

let period = read_value_from_file(
Path::new(CGROUP_UNIFIED_MOUNTPOINT).join(CPU_PERIOD_FILE_CGROUP_V1),
)?;
Expand Down Expand Up @@ -167,9 +160,9 @@ fn is_cgroup_v2() -> Option<bool> {
fn read_value_from_file<P: AsRef<Path>>(path: P) -> Option<i64> {
let content = read_to_string(&path).ok()?;

// If the content starts with "max", return `MAX_VALUE`.
// If the content starts with "max", return `None`.
if content.starts_with(MAX_VALUE_CGROUP_V2) {
return Some(MAX_VALUE);
return None;
}

content.trim().parse::<i64>().ok()
Expand All @@ -183,10 +176,10 @@ fn get_cgroup_v2_cpu_limit<P: AsRef<Path>>(path: P) -> Option<i64> {
return None;
}

// If the cpu is unlimited, it will be `-1`.
// If the cgroup cpu limit is unset, return `None`.
let quota = fields[0].trim();
if quota == MAX_VALUE_CGROUP_V2 {
return Some(MAX_VALUE);
return None;
}

let quota = quota.parse::<i64>().ok()?;
Expand Down Expand Up @@ -241,7 +234,7 @@ impl Collector for CgroupsMetricsCollector {
self.cpu_usage.set(cpu_usage);
}

if let Some(memory_usage) = get_memory_usage() {
if let Some(memory_usage) = get_memory_usage_from_cgroups() {
self.memory_usage.set(memory_usage);
}

Expand All @@ -263,8 +256,8 @@ mod tests {
100000
);
assert_eq!(
read_value_from_file(Path::new("testdata").join("memory.max.unlimited")).unwrap(),
MAX_VALUE
read_value_from_file(Path::new("testdata").join("memory.max.unlimited")),
None
);
assert_eq!(read_value_from_file(Path::new("non_existent_file")), None);
}
Expand All @@ -276,8 +269,8 @@ mod tests {
1500
);
assert_eq!(
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max.unlimited")).unwrap(),
MAX_VALUE
get_cgroup_v2_cpu_limit(Path::new("testdata").join("cpu.max.unlimited")),
None
);
assert_eq!(
get_cgroup_v2_cpu_limit(Path::new("non_existent_file")),
Expand Down
Loading
Loading