Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69a6089933daa573c96808ec4bbc48f447ec6e8c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f9f226478070fddffaa92deeb2c3a9971068816a" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
50 changes: 38 additions & 12 deletions src/catalog/src/system_schema/information_schema/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use datatypes::timestamp::TimestampMillisecond;
use datatypes::value::Value;
use datatypes::vectors::{
Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
UInt32VectorBuilder, UInt64VectorBuilder,
};
use serde::Serialize;
use snafu::ResultExt;
Expand All @@ -53,6 +52,8 @@ const PEER_ADDR: &str = "peer_addr";
const PEER_HOSTNAME: &str = "peer_hostname";
const TOTAL_CPU_MILLICORES: &str = "total_cpu_millicores";
const TOTAL_MEMORY_BYTES: &str = "total_memory_bytes";
const CPU_USAGE_MILLICORES: &str = "cpu_usage_millicores";
const MEMORY_USAGE_BYTES: &str = "memory_usage_bytes";
const VERSION: &str = "version";
const GIT_COMMIT: &str = "git_commit";
const START_TIME: &str = "start_time";
Expand All @@ -67,15 +68,17 @@ const INIT_CAPACITY: usize = 42;
/// - `peer_id`: the peer server id.
/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
/// - `peer_addr`: the peer gRPC address.
/// - `peer_hostname`: the hostname of the peer.
/// - `total_cpu_millicores`: the total CPU millicores of the peer.
/// - `total_memory_bytes`: the total memory bytes of the peer.
/// - `cpu_usage_millicores`: the CPU usage millicores of the peer.
/// - `memory_usage_bytes`: the memory usage bytes of the peer.
/// - `version`: the build package version of the peer.
/// - `git_commit`: the build git commit hash of the peer.
/// - `start_time`: the starting time of the peer.
/// - `uptime`: the uptime of the peer.
/// - `active_time`: the time since the last activity of the peer.
/// - `node_status`: the status info of the peer.
/// - `peer_hostname`: the hostname of the peer.
///
#[derive(Debug)]
pub(super) struct InformationSchemaClusterInfo {
Expand All @@ -99,12 +102,22 @@ impl InformationSchemaClusterInfo {
ColumnSchema::new(PEER_HOSTNAME, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(
TOTAL_CPU_MILLICORES,
ConcreteDataType::uint32_datatype(),
ConcreteDataType::int64_datatype(),
false,
),
ColumnSchema::new(
TOTAL_MEMORY_BYTES,
ConcreteDataType::uint64_datatype(),
ConcreteDataType::int64_datatype(),
false,
),
ColumnSchema::new(
CPU_USAGE_MILLICORES,
ConcreteDataType::int64_datatype(),
false,
),
ColumnSchema::new(
MEMORY_USAGE_BYTES,
ConcreteDataType::int64_datatype(),
false,
),
ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
Expand Down Expand Up @@ -167,8 +180,10 @@ struct InformationSchemaClusterInfoBuilder {
peer_types: StringVectorBuilder,
peer_addrs: StringVectorBuilder,
peer_hostnames: StringVectorBuilder,
cpus: UInt32VectorBuilder,
memory_bytes: UInt64VectorBuilder,
total_cpu_millicores: Int64VectorBuilder,
total_memory_bytes: Int64VectorBuilder,
cpu_usage_millicores: Int64VectorBuilder,
memory_usage_bytes: Int64VectorBuilder,
versions: StringVectorBuilder,
git_commits: StringVectorBuilder,
start_times: TimestampMillisecondVectorBuilder,
Expand All @@ -186,8 +201,10 @@ impl InformationSchemaClusterInfoBuilder {
peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
peer_hostnames: StringVectorBuilder::with_capacity(INIT_CAPACITY),
cpus: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
memory_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
total_cpu_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
total_memory_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
cpu_usage_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
memory_usage_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
Expand Down Expand Up @@ -219,6 +236,7 @@ impl InformationSchemaClusterInfoBuilder {
(PEER_HOSTNAME, &Value::from(node_info.hostname.as_str())),
(VERSION, &Value::from(node_info.version.as_str())),
(GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
(PEER_HOSTNAME, &Value::from(node_info.hostname.as_str())),
];

if !predicates.eval(&row) {
Expand All @@ -243,8 +261,14 @@ impl InformationSchemaClusterInfoBuilder {
self.start_times.push(None);
self.uptimes.push(None);
}
self.cpus.push(Some(node_info.cpus));
self.memory_bytes.push(Some(node_info.memory_bytes));
self.total_cpu_millicores
.push(Some(node_info.total_cpu_millicores));
self.total_memory_bytes
.push(Some(node_info.total_memory_bytes));
self.cpu_usage_millicores
.push(Some(node_info.cpu_usage_millicores));
self.memory_usage_bytes
.push(Some(node_info.memory_usage_bytes));

if node_info.last_activity_ts > 0 {
self.active_times.push(Some(
Expand All @@ -269,8 +293,10 @@ impl InformationSchemaClusterInfoBuilder {
Arc::new(self.peer_types.finish()),
Arc::new(self.peer_addrs.finish()),
Arc::new(self.peer_hostnames.finish()),
Arc::new(self.cpus.finish()),
Arc::new(self.memory_bytes.finish()),
Arc::new(self.total_cpu_millicores.finish()),
Arc::new(self.total_memory_bytes.finish()),
Arc::new(self.cpu_usage_millicores.finish()),
Arc::new(self.memory_usage_bytes.finish()),
Arc::new(self.versions.finish()),
Arc::new(self.git_commits.finish()),
Arc::new(self.start_times.finish()),
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHand
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_stat::ResourceStatImpl;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_version::{short_version, verbose_version};
Expand Down Expand Up @@ -372,11 +373,15 @@ impl StartCommand {
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
]);

let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();

let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
&opts,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
Arc::new(resource_stat),
);

let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_stat::ResourceStatImpl;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_time::timezone::set_default_timezone;
Expand Down Expand Up @@ -421,11 +422,15 @@ impl StartCommand {
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
]);

let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();

let heartbeat_task = HeartbeatTask::new(
&opts,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
Arc::new(resource_stat),
);
let heartbeat_task = Some(heartbeat_task);

Expand Down
1 change: 0 additions & 1 deletion src/common/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-stat.workspace = true
config.workspace = true
humantime-serde.workspace = true
object-store.workspace = true
Expand Down
1 change: 0 additions & 1 deletion src/common/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

pub mod config;
pub mod error;
pub mod utils;

use std::time::Duration;

Expand Down
34 changes: 0 additions & 34 deletions src/common/config/src/utils.rs

This file was deleted.

16 changes: 12 additions & 4 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,16 @@ pub struct NodeInfo {
pub start_time_ms: u64,
// The node build cpus
#[serde(default)]
pub cpus: u32,
pub total_cpu_millicores: i64,
// The node build memory bytes
#[serde(default)]
pub memory_bytes: u64,
pub total_memory_bytes: i64,
// The node build cpu usage millicores
#[serde(default)]
pub cpu_usage_millicores: i64,
// The node build memory usage bytes
#[serde(default)]
pub memory_usage_bytes: i64,
// The node build hostname
#[serde(default)]
pub hostname: String,
Expand Down Expand Up @@ -333,8 +339,10 @@ mod tests {
version: "".to_string(),
git_commit: "".to_string(),
start_time_ms: 1,
cpus: 0,
memory_bytes: 0,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
};

Expand Down
3 changes: 3 additions & 0 deletions src/common/stat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ license.workspace = true

[dependencies]
common-base.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
lazy_static.workspace = true
nix.workspace = true
num_cpus.workspace = true
prometheus.workspace = true
sysinfo.workspace = true
tokio.workspace = true

[lints]
workspace = true
23 changes: 21 additions & 2 deletions src/common/stat/src/cgroups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ pub fn get_cpu_limit_from_cgroups() -> Option<i64> {
None
}

fn get_cpu_usage() -> Option<i64> {
/// Get the usage of cpu in millicores from cgroups filesystem.
///
/// - Return `None` if it's not in the cgroups v2 environment or fails to read the cpu usage.
pub fn get_cpu_usage_from_cgroups() -> Option<i64> {
// In certain bare-metal environments, the `/sys/fs/cgroup/cpu.stat` file may be present and reflect system-wide CPU usage rather than container-specific metrics.
// To ensure accurate collection of container-level CPU usage, verify the existence of the `/sys/fs/cgroup/memory.current` file.
// The presence of this file typically indicates execution within a containerized environment, thereby validating the relevance of the collected CPU usage data.
Expand All @@ -142,6 +145,22 @@ fn get_cpu_usage() -> Option<i64> {
fields[1].trim().parse::<i64>().ok()
}

// Calculate the cpu usage in millicores from cgroups filesystem.
//
// - Return `0` if the current cpu usage is equal to the last cpu usage or the interval is 0.
pub(crate) fn calculate_cpu_usage(
current_cpu_usage_usecs: i64,
last_cpu_usage_usecs: i64,
interval_milliseconds: i64,
) -> i64 {
let diff = current_cpu_usage_usecs - last_cpu_usage_usecs;
if diff > 0 && interval_milliseconds > 0 {
((diff as f64 / interval_milliseconds as f64).round() as i64).max(1)
} else {
0
}
}

// Check whether the cgroup is v2.
// - Return `true` if the cgroup is v2, otherwise return `false`.
// - Return `None` if the detection fails or not on linux.
Expand Down Expand Up @@ -230,7 +249,7 @@ impl Collector for CgroupsMetricsCollector {
}

fn collect(&self) -> Vec<MetricFamily> {
if let Some(cpu_usage) = get_cpu_usage() {
if let Some(cpu_usage) = get_cpu_usage_from_cgroups() {
self.cpu_usage.set(cpu_usage);
}

Expand Down
Loading