diff --git a/Cargo.lock b/Cargo.lock index a002ea91da4e..ca6cccf80c38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2033,7 +2033,6 @@ dependencies = [ "common-base", "common-error", "common-macro", - "common-stat", "common-telemetry", "common-test-util", "common-wal", @@ -2553,11 +2552,14 @@ name = "common-stat" version = "0.18.0" dependencies = [ "common-base", + "common-runtime", + "common-telemetry", "lazy_static", "nix 0.30.1", "num_cpus", "prometheus", "sysinfo", + "tokio", ] [[package]] @@ -3917,6 +3919,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-runtime", + "common-stat", "common-telemetry", "common-test-util", "common-time", @@ -4915,6 +4918,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-runtime", + "common-stat", "common-telemetry", "common-test-util", "common-time", @@ -5331,7 +5335,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=69a6089933daa573c96808ec4bbc48f447ec6e8c#69a6089933daa573c96808ec4bbc48f447ec6e8c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f9f226478070fddffaa92deeb2c3a9971068816a#f9f226478070fddffaa92deeb2c3a9971068816a" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", @@ -7404,6 +7408,7 @@ dependencies = [ "common-procedure", "common-procedure-test", "common-runtime", + "common-stat", "common-telemetry", "common-time", "common-version", @@ -13048,6 +13053,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-runtime", + "common-stat", "common-telemetry", "common-test-util", "common-time", diff --git a/Cargo.toml b/Cargo.toml index f500f70b0ee5..63c07a91ca26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69a6089933daa573c96808ec4bbc48f447ec6e8c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f9f226478070fddffaa92deeb2c3a9971068816a" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/catalog/src/system_schema/information_schema/cluster_info.rs b/src/catalog/src/system_schema/information_schema/cluster_info.rs index f45dc5be06a7..d7ef02b0d233 100644 --- a/src/catalog/src/system_schema/information_schema/cluster_info.rs +++ b/src/catalog/src/system_schema/information_schema/cluster_info.rs @@ -33,7 +33,6 @@ use datatypes::timestamp::TimestampMillisecond; use datatypes::value::Value; use datatypes::vectors::{ Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder, - UInt32VectorBuilder, UInt64VectorBuilder, }; use serde::Serialize; use snafu::ResultExt; @@ -53,6 +52,8 @@ const PEER_ADDR: &str = "peer_addr"; const PEER_HOSTNAME: &str = "peer_hostname"; const TOTAL_CPU_MILLICORES: &str = "total_cpu_millicores"; const TOTAL_MEMORY_BYTES: &str = "total_memory_bytes"; +const CPU_USAGE_MILLICORES: &str = "cpu_usage_millicores"; +const MEMORY_USAGE_BYTES: &str = "memory_usage_bytes"; const VERSION: &str = "version"; const GIT_COMMIT: &str = "git_commit"; const START_TIME: &str = "start_time"; @@ -67,15 +68,17 @@ const INIT_CAPACITY: usize = 42; /// - `peer_id`: the peer server id. /// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc. /// - `peer_addr`: the peer gRPC address. +/// - `peer_hostname`: the hostname of the peer. /// - `total_cpu_millicores`: the total CPU millicores of the peer. /// - `total_memory_bytes`: the total memory bytes of the peer. +/// - `cpu_usage_millicores`: the CPU usage millicores of the peer. +/// - `memory_usage_bytes`: the memory usage bytes of the peer. /// - `version`: the build package version of the peer. /// - `git_commit`: the build git commit hash of the peer. /// - `start_time`: the starting time of the peer. /// - `uptime`: the uptime of the peer. /// - `active_time`: the time since the last activity of the peer. /// - `node_status`: the status info of the peer. -/// - `peer_hostname`: the hostname of the peer. /// #[derive(Debug)] pub(super) struct InformationSchemaClusterInfo { @@ -99,12 +102,22 @@ impl InformationSchemaClusterInfo { ColumnSchema::new(PEER_HOSTNAME, ConcreteDataType::string_datatype(), true), ColumnSchema::new( TOTAL_CPU_MILLICORES, - ConcreteDataType::uint32_datatype(), + ConcreteDataType::int64_datatype(), false, ), ColumnSchema::new( TOTAL_MEMORY_BYTES, - ConcreteDataType::uint64_datatype(), + ConcreteDataType::int64_datatype(), + false, + ), + ColumnSchema::new( + CPU_USAGE_MILLICORES, + ConcreteDataType::int64_datatype(), + false, + ), + ColumnSchema::new( + MEMORY_USAGE_BYTES, + ConcreteDataType::int64_datatype(), false, ), ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false), @@ -167,8 +180,10 @@ struct InformationSchemaClusterInfoBuilder { peer_types: StringVectorBuilder, peer_addrs: StringVectorBuilder, peer_hostnames: StringVectorBuilder, - cpus: UInt32VectorBuilder, - memory_bytes: UInt64VectorBuilder, + total_cpu_millicores: Int64VectorBuilder, + total_memory_bytes: Int64VectorBuilder, + cpu_usage_millicores: Int64VectorBuilder, + memory_usage_bytes: Int64VectorBuilder, versions: StringVectorBuilder, git_commits: StringVectorBuilder, start_times: TimestampMillisecondVectorBuilder, @@ -186,8 +201,10 @@ impl InformationSchemaClusterInfoBuilder { peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY), peer_hostnames: StringVectorBuilder::with_capacity(INIT_CAPACITY), - cpus: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), - memory_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + total_cpu_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY), + total_memory_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY), + cpu_usage_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY), + memory_usage_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY), versions: StringVectorBuilder::with_capacity(INIT_CAPACITY), git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY), start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY), @@ -219,6 +236,7 @@ impl InformationSchemaClusterInfoBuilder { (PEER_HOSTNAME, &Value::from(node_info.hostname.as_str())), (VERSION, &Value::from(node_info.version.as_str())), (GIT_COMMIT, &Value::from(node_info.git_commit.as_str())), + (PEER_HOSTNAME, &Value::from(node_info.hostname.as_str())), ]; if !predicates.eval(&row) { @@ -243,8 +261,14 @@ impl InformationSchemaClusterInfoBuilder { self.start_times.push(None); self.uptimes.push(None); } - self.cpus.push(Some(node_info.cpus)); - self.memory_bytes.push(Some(node_info.memory_bytes)); + self.total_cpu_millicores + .push(Some(node_info.total_cpu_millicores)); + self.total_memory_bytes + .push(Some(node_info.total_memory_bytes)); + self.cpu_usage_millicores + .push(Some(node_info.cpu_usage_millicores)); + self.memory_usage_bytes + .push(Some(node_info.memory_usage_bytes)); if node_info.last_activity_ts > 0 { self.active_times.push(Some( @@ -269,8 +293,10 @@ impl InformationSchemaClusterInfoBuilder { Arc::new(self.peer_types.finish()), Arc::new(self.peer_addrs.finish()), Arc::new(self.peer_hostnames.finish()), - Arc::new(self.cpus.finish()), - Arc::new(self.memory_bytes.finish()), + Arc::new(self.total_cpu_millicores.finish()), + Arc::new(self.total_memory_bytes.finish()), + Arc::new(self.cpu_usage_millicores.finish()), + Arc::new(self.memory_usage_bytes.finish()), Arc::new(self.versions.finish()), Arc::new(self.git_commits.finish()), Arc::new(self.start_times.finish()), diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 500e9bfa89b4..07f32797244d 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -30,6 +30,7 @@ use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHand use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::key::TableMetadataManager; use common_meta::key::flow::FlowMetadataManager; +use common_stat::ResourceStatImpl; use common_telemetry::info; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_version::{short_version, verbose_version}; @@ -372,11 +373,15 @@ impl StartCommand { Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())), ]); + let mut resource_stat = ResourceStatImpl::default(); + resource_stat.start_collect_cpu_usage(); + let heartbeat_task = flow::heartbeat::HeartbeatTask::new( &opts, meta_client.clone(), opts.heartbeat.clone(), Arc::new(executor), + Arc::new(resource_stat), ); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone())); diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 4c72021a473e..fda6d968bff7 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -30,6 +30,7 @@ use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use common_stat::ResourceStatImpl; use common_telemetry::info; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_time::timezone::set_default_timezone; @@ -421,11 +422,15 @@ impl StartCommand { Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())), ]); + let mut resource_stat = ResourceStatImpl::default(); + resource_stat.start_collect_cpu_usage(); + let heartbeat_task = HeartbeatTask::new( &opts, meta_client.clone(), opts.heartbeat.clone(), Arc::new(executor), + Arc::new(resource_stat), ); let heartbeat_task = Some(heartbeat_task); diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml index 1d2b21602f48..b45c03a6c334 100644 --- a/src/common/config/Cargo.toml +++ b/src/common/config/Cargo.toml @@ -11,7 +11,6 @@ workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true -common-stat.workspace = true config.workspace = true humantime-serde.workspace = true object-store.workspace = true diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs index b8069242179b..cc25ebce1663 100644 --- a/src/common/config/src/lib.rs +++ b/src/common/config/src/lib.rs @@ -14,7 +14,6 @@ pub mod config; pub mod error; -pub mod utils; use std::time::Duration; diff --git a/src/common/config/src/utils.rs b/src/common/config/src/utils.rs deleted file mode 100644 index 1bc986b77ef6..000000000000 --- a/src/common/config/src/utils.rs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_base::readable_size::ReadableSize; -use common_stat::{get_total_cpu_millicores, get_total_memory_readable}; - -/// `ResourceSpec` holds the static resource specifications of a node, -/// such as CPU cores and memory capacity. These values are fixed -/// at startup and do not change dynamically during runtime. -#[derive(Debug, Clone, Copy)] -pub struct ResourceSpec { - pub cpus: i64, - pub memory: Option, -} - -impl Default for ResourceSpec { - fn default() -> Self { - Self { - cpus: get_total_cpu_millicores(), - memory: get_total_memory_readable(), - } - } -} diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 63001970b6ae..74485513e9e6 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -120,10 +120,16 @@ pub struct NodeInfo { pub start_time_ms: u64, // The node build cpus #[serde(default)] - pub cpus: u32, + pub total_cpu_millicores: i64, // The node build memory bytes #[serde(default)] - pub memory_bytes: u64, + pub total_memory_bytes: i64, + // The node build cpu usage millicores + #[serde(default)] + pub cpu_usage_millicores: i64, + // The node build memory usage bytes + #[serde(default)] + pub memory_usage_bytes: i64, // The node build hostname #[serde(default)] pub hostname: String, @@ -333,8 +339,10 @@ mod tests { version: "".to_string(), git_commit: "".to_string(), start_time_ms: 1, - cpus: 0, - memory_bytes: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, hostname: "test_hostname".to_string(), }; diff --git a/src/common/stat/Cargo.toml b/src/common/stat/Cargo.toml index 3d0198f6a2fc..d0e8b5448ff3 100644 --- a/src/common/stat/Cargo.toml +++ b/src/common/stat/Cargo.toml @@ -6,11 +6,14 @@ license.workspace = true [dependencies] common-base.workspace = true +common-runtime.workspace = true +common-telemetry.workspace = true lazy_static.workspace = true nix.workspace = true num_cpus.workspace = true prometheus.workspace = true sysinfo.workspace = true +tokio.workspace = true [lints] workspace = true diff --git a/src/common/stat/src/cgroups.rs b/src/common/stat/src/cgroups.rs index fe26f5ec3607..ce8f5ac87aa7 100644 --- a/src/common/stat/src/cgroups.rs +++ b/src/common/stat/src/cgroups.rs @@ -117,7 +117,10 @@ pub fn get_cpu_limit_from_cgroups() -> Option { None } -fn get_cpu_usage() -> Option { +/// Get the usage of cpu in millicores from cgroups filesystem. +/// +/// - Return `None` if it's not in the cgroups v2 environment or fails to read the cpu usage. +pub fn get_cpu_usage_from_cgroups() -> Option { // In certain bare-metal environments, the `/sys/fs/cgroup/cpu.stat` file may be present and reflect system-wide CPU usage rather than container-specific metrics. // To ensure accurate collection of container-level CPU usage, verify the existence of the `/sys/fs/cgroup/memory.current` file. // The presence of this file typically indicates execution within a containerized environment, thereby validating the relevance of the collected CPU usage data. @@ -142,6 +145,22 @@ fn get_cpu_usage() -> Option { fields[1].trim().parse::().ok() } +// Calculate the cpu usage in millicores from cgroups filesystem. +// +// - Return `0` if the current cpu usage is equal to the last cpu usage or the interval is 0. +pub(crate) fn calculate_cpu_usage( + current_cpu_usage_usecs: i64, + last_cpu_usage_usecs: i64, + interval_milliseconds: i64, +) -> i64 { + let diff = current_cpu_usage_usecs - last_cpu_usage_usecs; + if diff > 0 && interval_milliseconds > 0 { + ((diff as f64 / interval_milliseconds as f64).round() as i64).max(1) + } else { + 0 + } +} + // Check whether the cgroup is v2. // - Return `true` if the cgroup is v2, otherwise return `false`. // - Return `None` if the detection fails or not on linux. @@ -230,7 +249,7 @@ impl Collector for CgroupsMetricsCollector { } fn collect(&self) -> Vec { - if let Some(cpu_usage) = get_cpu_usage() { + if let Some(cpu_usage) = get_cpu_usage_from_cgroups() { self.cpu_usage.set(cpu_usage); } diff --git a/src/common/stat/src/lib.rs b/src/common/stat/src/lib.rs index 2c6cbea3f186..544b9439c8b7 100644 --- a/src/common/stat/src/lib.rs +++ b/src/common/stat/src/lib.rs @@ -13,66 +13,7 @@ // limitations under the License. mod cgroups; +mod resource; pub use cgroups::*; -use common_base::readable_size::ReadableSize; -use sysinfo::System; - -/// Get the total CPU in millicores. -pub fn get_total_cpu_millicores() -> i64 { - // Get CPU limit from cgroups filesystem. - if let Some(cgroup_cpu_limit) = get_cpu_limit_from_cgroups() { - cgroup_cpu_limit - } else { - // Get total CPU cores from host system. - num_cpus::get() as i64 * 1000 - } -} - -/// Get the total memory in bytes. -pub fn get_total_memory_bytes() -> i64 { - // Get memory limit from cgroups filesystem. - if let Some(cgroup_memory_limit) = get_memory_limit_from_cgroups() { - cgroup_memory_limit - } else { - // Get total memory from host system. - if sysinfo::IS_SUPPORTED_SYSTEM { - let mut sys_info = System::new(); - sys_info.refresh_memory(); - sys_info.total_memory() as i64 - } else { - // If the system is not supported, return -1. - -1 - } - } -} - -/// Get the total CPU cores. The result will be rounded to the nearest integer. -/// For example, if the total CPU is 1.5 cores(1500 millicores), the result will be 2. -pub fn get_total_cpu_cores() -> usize { - ((get_total_cpu_millicores() as f64) / 1000.0).round() as usize -} - -/// Get the total memory in readable size. -pub fn get_total_memory_readable() -> Option { - if get_total_memory_bytes() > 0 { - Some(ReadableSize(get_total_memory_bytes() as u64)) - } else { - None - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_get_total_cpu_cores() { - assert!(get_total_cpu_cores() > 0); - } - - #[test] - fn test_get_total_memory_readable() { - assert!(get_total_memory_readable().unwrap() > ReadableSize::mb(0)); - } -} +pub use resource::*; diff --git a/src/common/stat/src/resource.rs b/src/common/stat/src/resource.rs new file mode 100644 index 000000000000..cf24f785f041 --- /dev/null +++ b/src/common/stat/src/resource.rs @@ -0,0 +1,181 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::sync::atomic::{AtomicI64, Ordering}; +use std::time::Duration; + +use common_base::readable_size::ReadableSize; +use common_runtime::JoinHandle; +use common_telemetry::info; +use sysinfo::System; +use tokio::time::sleep; + +use crate::cgroups::calculate_cpu_usage; +use crate::{ + get_cpu_limit_from_cgroups, get_cpu_usage_from_cgroups, get_memory_limit_from_cgroups, + get_memory_usage_from_cgroups, +}; + +/// Get the total CPU in millicores. +pub fn get_total_cpu_millicores() -> i64 { + // Get CPU limit from cgroups filesystem. + if let Some(cgroup_cpu_limit) = get_cpu_limit_from_cgroups() { + cgroup_cpu_limit + } else { + // Get total CPU cores from host system. + num_cpus::get() as i64 * 1000 + } +} + +/// Get the total memory in bytes. +pub fn get_total_memory_bytes() -> i64 { + // Get memory limit from cgroups filesystem. + if let Some(cgroup_memory_limit) = get_memory_limit_from_cgroups() { + cgroup_memory_limit + } else { + // Get total memory from host system. + if sysinfo::IS_SUPPORTED_SYSTEM { + let mut sys_info = System::new(); + sys_info.refresh_memory(); + sys_info.total_memory() as i64 + } else { + // If the system is not supported, return 0 + 0 + } + } +} + +/// Get the total CPU cores. The result will be rounded to the nearest integer. +/// For example, if the total CPU is 1.5 cores(1500 millicores), the result will be 2. +pub fn get_total_cpu_cores() -> usize { + ((get_total_cpu_millicores() as f64) / 1000.0).round() as usize +} + +/// Get the total memory in readable size. +pub fn get_total_memory_readable() -> Option { + if get_total_memory_bytes() > 0 { + Some(ReadableSize(get_total_memory_bytes() as u64)) + } else { + None + } +} + +/// A reference to a `ResourceStat` implementation. +pub type ResourceStatRef = Arc; + +/// A trait for getting resource statistics. +pub trait ResourceStat { + /// Get the total CPU in millicores. + fn get_total_cpu_millicores(&self) -> i64; + /// Get the total memory in bytes. + fn get_total_memory_bytes(&self) -> i64; + /// Get the CPU usage in millicores. + fn get_cpu_usage_millicores(&self) -> i64; + /// Get the memory usage in bytes. + fn get_memory_usage_bytes(&self) -> i64; +} + +/// A implementation of `ResourceStat` trait. +pub struct ResourceStatImpl { + cpu_usage_millicores: Arc, + last_cpu_usage_usecs: Arc, + calculate_interval: Duration, + handler: Option>, +} + +impl Default for ResourceStatImpl { + fn default() -> Self { + Self { + cpu_usage_millicores: Arc::new(AtomicI64::new(0)), + last_cpu_usage_usecs: Arc::new(AtomicI64::new(0)), + calculate_interval: Duration::from_secs(5), + handler: None, + } + } +} + +impl ResourceStatImpl { + /// Start collecting CPU usage periodically. It will calculate the CPU usage in millicores based on rate of change of CPU usage usage_usec in `/sys/fs/cgroup/cpu.stat`. + /// It ONLY works in cgroup v2 environment. + pub fn start_collect_cpu_usage(&mut self) { + if self.handler.is_some() { + return; + } + + let cpu_usage_millicores = self.cpu_usage_millicores.clone(); + let last_cpu_usage_usecs = self.last_cpu_usage_usecs.clone(); + let calculate_interval = self.calculate_interval; + + let handler = common_runtime::spawn_global(async move { + info!( + "Starting to collect CPU usage periodically for every {} seconds", + calculate_interval.as_secs() + ); + loop { + let current_cpu_usage_usecs = get_cpu_usage_from_cgroups(); + if let Some(current_cpu_usage_usecs) = current_cpu_usage_usecs { + let cpu_usage = calculate_cpu_usage( + current_cpu_usage_usecs, + last_cpu_usage_usecs.load(Ordering::Relaxed), + calculate_interval.as_millis() as i64, + ); + cpu_usage_millicores.store(cpu_usage, Ordering::Relaxed); + last_cpu_usage_usecs.store(current_cpu_usage_usecs, Ordering::Relaxed); + } + sleep(calculate_interval).await; + } + }); + + self.handler = Some(handler); + } +} + +impl ResourceStat for ResourceStatImpl { + /// Get the total CPU in millicores. + fn get_total_cpu_millicores(&self) -> i64 { + get_total_cpu_millicores() + } + + /// Get the total memory in bytes. + fn get_total_memory_bytes(&self) -> i64 { + get_total_memory_bytes() + } + + /// Get the CPU usage in millicores. + fn get_cpu_usage_millicores(&self) -> i64 { + self.cpu_usage_millicores.load(Ordering::Relaxed) + } + + /// Get the memory usage in bytes. + /// It ONLY works in cgroup v2 environment. + fn get_memory_usage_bytes(&self) -> i64 { + get_memory_usage_from_cgroups().unwrap_or_default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_total_cpu_cores() { + assert!(get_total_cpu_cores() > 0); + } + + #[test] + fn test_get_total_memory_readable() { + assert!(get_total_memory_readable().unwrap() > ReadableSize::mb(0)); + } +} diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index b9728af50bd6..4f26692b9c83 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -30,6 +30,7 @@ common-procedure.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true +common-stat.workspace = true common-telemetry.workspace = true common-time.workspace = true common-version.workspace = true diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 5c9e86848f2e..40922fb14cfd 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -27,6 +27,7 @@ use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; +use common_stat::ResourceStatImpl; use common_telemetry::{error, info, warn}; use common_wal::config::DatanodeWalConfig; use common_wal::config::kafka::DatanodeKafkaConfig; @@ -281,6 +282,9 @@ impl DatanodeBuilder { open_all_regions.await?; } + let mut resource_stat = ResourceStatImpl::default(); + resource_stat.start_collect_cpu_usage(); + let heartbeat_task = if let Some(meta_client) = meta_client { Some( HeartbeatTask::try_new( @@ -289,6 +293,7 @@ impl DatanodeBuilder { meta_client, cache_registry, self.plugins.clone(), + Arc::new(resource_stat), ) .await?, ) diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index bb3f25957c2a..002200d31814 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -20,7 +20,6 @@ use std::time::Duration; use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; use common_base::Plugins; -use common_config::utils::ResourceSpec; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::datanode::REGION_STATISTIC_KEY; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; @@ -31,6 +30,7 @@ use common_meta::heartbeat::handler::{ }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; +use common_stat::ResourceStatRef; use common_telemetry::{debug, error, info, trace, warn}; use common_workload::DatanodeWorkloadType; use meta_client::MetaClientRef; @@ -63,7 +63,7 @@ pub struct HeartbeatTask { interval: u64, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, region_alive_keeper: Arc, - resource_spec: ResourceSpec, + resource_stat: ResourceStatRef, } impl Drop for HeartbeatTask { @@ -80,6 +80,7 @@ impl HeartbeatTask { meta_client: MetaClientRef, cache_invalidator: CacheInvalidatorRef, plugins: Plugins, + resource_stat: ResourceStatRef, ) -> Result { let countdown_task_handler_ext = plugins.get::(); let region_alive_keeper = Arc::new(RegionAliveKeeper::new( @@ -106,7 +107,7 @@ impl HeartbeatTask { interval: opts.heartbeat.interval.as_millis() as u64, resp_handler_executor, region_alive_keeper, - resource_spec: Default::default(), + resource_stat, }) } @@ -183,6 +184,7 @@ impl HeartbeatTask { .context(error::HandleHeartbeatResponseSnafu) } + #[allow(deprecated)] /// Start heartbeat task, spawn background task. pub async fn start( &self, @@ -234,8 +236,9 @@ impl HeartbeatTask { self.region_alive_keeper.start(Some(event_receiver)).await?; let mut last_sent = Instant::now(); - let cpus = self.resource_spec.cpus as u32; - let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes(); + let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores(); + let total_memory_bytes = self.resource_stat.get_total_memory_bytes(); + let resource_stat = self.resource_stat.clone(); common_runtime::spawn_hb(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); @@ -249,8 +252,13 @@ impl HeartbeatTask { version: build_info.version.to_string(), git_commit: build_info.commit_short.to_string(), start_time_ms: node_epoch, - cpus, - memory_bytes, + total_cpu_millicores, + total_memory_bytes, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, + // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto. + cpus: total_cpu_millicores as u32, + memory_bytes: total_memory_bytes as u64, hostname: hostname::get() .unwrap_or_default() .to_string_lossy() @@ -294,12 +302,18 @@ impl HeartbeatTask { let topic_stats = region_server_clone.topic_stats(); let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; - let req = HeartbeatRequest { + let mut req = HeartbeatRequest { region_stats, topic_stats, duration_since_epoch, ..heartbeat_request.clone() }; + + if let Some(info) = req.info.as_mut() { + info.cpu_usage_millicores = resource_stat.get_cpu_usage_millicores(); + info.memory_usage_bytes = resource_stat.get_memory_usage_bytes(); + } + sleep.as_mut().reset(now + Duration::from_millis(interval)); Some(req) } diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index cc42668f5a00..89b37860c5f6 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use api::v1::meta::{HeartbeatRequest, Peer}; -use common_config::utils::ResourceSpec; use common_error::ext::BoxedError; use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, @@ -26,6 +25,7 @@ use common_meta::heartbeat::handler::{ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_meta::key::flow::flow_state::FlowStat; +use common_stat::ResourceStatRef; use common_telemetry::{debug, error, info, warn}; use greptime_proto::v1::meta::NodeInfo; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; @@ -69,7 +69,7 @@ pub struct HeartbeatTask { resp_handler_executor: HeartbeatResponseHandlerExecutorRef, running: Arc, query_stat_size: Option, - resource_spec: ResourceSpec, + resource_stat: ResourceStatRef, } impl HeartbeatTask { @@ -77,11 +77,13 @@ impl HeartbeatTask { self.query_stat_size = Some(query_stat_size); self } + pub fn new( opts: &FlownodeOptions, meta_client: Arc, heartbeat_opts: HeartbeatOptions, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + resource_stat: ResourceStatRef, ) -> Self { Self { node_id: opts.node_id.unwrap_or(0), @@ -93,7 +95,7 @@ impl HeartbeatTask { resp_handler_executor, running: Arc::new(AtomicBool::new(false)), query_stat_size: None, - resource_spec: Default::default(), + resource_stat, } } @@ -146,6 +148,8 @@ impl HeartbeatTask { heartbeat_request: &HeartbeatRequest, message: Option, latest_report: &Option, + cpu_usage: i64, + memory_usage: i64, ) -> Option { let mailbox_message = match message.map(outgoing_message_to_mailbox_message) { Some(Ok(message)) => Some(message), @@ -170,21 +174,38 @@ impl HeartbeatTask { .collect(), }); - Some(HeartbeatRequest { + let mut heartbeat_request = HeartbeatRequest { mailbox_message, flow_stat, ..heartbeat_request.clone() - }) + }; + + if let Some(info) = heartbeat_request.info.as_mut() { + info.cpu_usage_millicores = cpu_usage; + info.memory_usage_bytes = memory_usage; + } + + Some(heartbeat_request) } - fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option { + #[allow(deprecated)] + fn build_node_info( + start_time_ms: u64, + total_cpu_millicores: i64, + total_memory_bytes: i64, + ) -> Option { let build_info = common_version::build_info(); Some(NodeInfo { version: build_info.version.to_string(), git_commit: build_info.commit_short.to_string(), start_time_ms, - cpus, - memory_bytes, + total_cpu_millicores, + total_memory_bytes, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, + // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto. + cpus: total_cpu_millicores as u32, + memory_bytes: total_memory_bytes as u64, hostname: hostname::get() .unwrap_or_default() .to_string_lossy() @@ -203,9 +224,9 @@ impl HeartbeatTask { id: self.node_id, addr: self.peer_addr.clone(), }); - let cpus = self.resource_spec.cpus as u32; - let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes(); - + let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores(); + let total_memory_bytes = self.resource_stat.get_total_memory_bytes(); + let resource_stat = self.resource_stat.clone(); let query_stat_size = self.query_stat_size.clone(); common_runtime::spawn_hb(async move { @@ -218,7 +239,7 @@ impl HeartbeatTask { let heartbeat_request = HeartbeatRequest { peer: self_peer, node_epoch, - info: Self::build_node_info(node_epoch, cpus, memory_bytes), + info: Self::build_node_info(node_epoch, total_cpu_millicores, total_memory_bytes), ..Default::default() }; @@ -226,7 +247,7 @@ impl HeartbeatTask { let req = tokio::select! { message = outgoing_rx.recv() => { if let Some(message) = message { - Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report) + Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report, 0, 0) } else { warn!("Sender has been dropped, exiting the heartbeat loop"); // Receives None that means Sender was dropped, we need to break the current loop @@ -234,7 +255,7 @@ impl HeartbeatTask { } } _ = interval.tick() => { - Self::new_heartbeat_request(&heartbeat_request, None, &latest_report) + Self::new_heartbeat_request(&heartbeat_request, None, &latest_report, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes()) } }; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index db26316c39f5..3b6029539a13 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -37,6 +37,7 @@ common-procedure.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true +common-stat.workspace = true common-telemetry.workspace = true common-time.workspace = true common-version.workspace = true diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 76fdc3305bdf..95645ad1ca19 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -18,12 +18,12 @@ mod tests; use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer}; -use common_config::utils::ResourceSpec; use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; +use common_stat::ResourceStatRef; use common_telemetry::{debug, error, info, warn}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; @@ -47,7 +47,7 @@ pub struct HeartbeatTask { retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, start_time_ms: u64, - resource_spec: ResourceSpec, + resource_stat: ResourceStatRef, } impl HeartbeatTask { @@ -56,6 +56,7 @@ impl HeartbeatTask { meta_client: Arc, heartbeat_opts: HeartbeatOptions, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + resource_stat: ResourceStatRef, ) -> Self { HeartbeatTask { // if internal grpc is configured, use its address as the peer address @@ -71,7 +72,7 @@ impl HeartbeatTask { retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, - resource_spec: Default::default(), + resource_stat, } } @@ -133,6 +134,8 @@ impl HeartbeatTask { fn new_heartbeat_request( heartbeat_request: &HeartbeatRequest, message: Option, + cpu_usage: i64, + memory_usage: i64, ) -> Option { let mailbox_message = match message.map(outgoing_message_to_mailbox_message) { Some(Ok(message)) => Some(message), @@ -143,21 +146,38 @@ impl HeartbeatTask { None => None, }; - Some(HeartbeatRequest { + let mut heartbeat_request = HeartbeatRequest { mailbox_message, ..heartbeat_request.clone() - }) + }; + + if let Some(info) = heartbeat_request.info.as_mut() { + info.memory_usage_bytes = memory_usage; + info.cpu_usage_millicores = cpu_usage; + } + + Some(heartbeat_request) } - fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option { + #[allow(deprecated)] + fn build_node_info( + start_time_ms: u64, + total_cpu_millicores: i64, + total_memory_bytes: i64, + ) -> Option { let build_info = common_version::build_info(); Some(NodeInfo { version: build_info.version.to_string(), git_commit: build_info.commit_short.to_string(), start_time_ms, - cpus, - memory_bytes, + total_cpu_millicores, + total_memory_bytes, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, + // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto. + cpus: total_cpu_millicores as u32, + memory_bytes: total_memory_bytes as u64, hostname: hostname::get() .unwrap_or_default() .to_string_lossy() @@ -177,16 +197,20 @@ impl HeartbeatTask { id: 0, addr: self.peer_addr.clone(), }); - let cpus = self.resource_spec.cpus as u32; - let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes(); - + let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores(); + let total_memory_bytes = self.resource_stat.get_total_memory_bytes(); + let resource_stat = self.resource_stat.clone(); common_runtime::spawn_hb(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); let heartbeat_request = HeartbeatRequest { peer: self_peer, - info: Self::build_node_info(start_time_ms, cpus, memory_bytes), + info: Self::build_node_info( + start_time_ms, + total_cpu_millicores, + total_memory_bytes, + ), ..Default::default() }; @@ -194,7 +218,7 @@ impl HeartbeatTask { let req = tokio::select! { message = outgoing_rx.recv() => { if let Some(message) = message { - Self::new_heartbeat_request(&heartbeat_request, Some(message)) + Self::new_heartbeat_request(&heartbeat_request, Some(message), 0, 0) } else { warn!("Sender has been dropped, exiting the heartbeat loop"); // Receives None that means Sender was dropped, we need to break the current loop @@ -202,8 +226,8 @@ impl HeartbeatTask { } } _ = &mut sleep => { - sleep.as_mut().reset(Instant::now() + report_interval); - Self::new_heartbeat_request(&heartbeat_request, None) + sleep.as_mut().reset(Instant::now() + report_interval); + Self::new_heartbeat_request(&heartbeat_request, None, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes()) } }; diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 2a66c1570af5..d8192515976e 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -24,7 +24,9 @@ mod util; use std::fmt::Debug; use std::sync::Arc; -use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role}; +use api::v1::meta::{ + MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role, +}; pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef}; use cluster::Client as ClusterClient; pub use cluster::ClusterKvBackend; @@ -371,7 +373,8 @@ impl ClusterInfo for MetaClient { let mut nodes = if get_metasrv_nodes { let last_activity_ts = -1; // Metasrv does not provide this information. - let (leader, followers) = cluster_client.get_metasrv_peers().await?; + let (leader, followers): (Option, Vec) = + cluster_client.get_metasrv_peers().await?; followers .into_iter() .map(|node| { @@ -383,8 +386,10 @@ impl ClusterInfo for MetaClient { version: node_info.version, git_commit: node_info.git_commit, start_time_ms: node_info.start_time_ms, - cpus: node_info.cpus, - memory_bytes: node_info.memory_bytes, + total_cpu_millicores: node_info.total_cpu_millicores, + total_memory_bytes: node_info.total_memory_bytes, + cpu_usage_millicores: node_info.cpu_usage_millicores, + memory_usage_bytes: node_info.memory_usage_bytes, hostname: node_info.hostname, } } else { @@ -396,8 +401,10 @@ impl ClusterInfo for MetaClient { version: node.version, git_commit: node.git_commit, start_time_ms: node.start_time_ms, - cpus: node.cpus, - memory_bytes: node.memory_bytes, + total_cpu_millicores: node.cpus as i64, + total_memory_bytes: node.memory_bytes as i64, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, hostname: "".to_string(), } } @@ -411,8 +418,10 @@ impl ClusterInfo for MetaClient { version: node_info.version, git_commit: node_info.git_commit, start_time_ms: node_info.start_time_ms, - cpus: node_info.cpus, - memory_bytes: node_info.memory_bytes, + total_cpu_millicores: node_info.total_cpu_millicores, + total_memory_bytes: node_info.total_memory_bytes, + cpu_usage_millicores: node_info.cpu_usage_millicores, + memory_usage_bytes: node_info.memory_usage_bytes, hostname: node_info.hostname, } } else { @@ -424,8 +433,10 @@ impl ClusterInfo for MetaClient { version: node.version, git_commit: node.git_commit, start_time_ms: node.start_time_ms, - cpus: node.cpus, - memory_bytes: node.memory_bytes, + total_cpu_millicores: node.cpus as i64, + total_memory_bytes: node.memory_bytes as i64, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, hostname: "".to_string(), } } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 90a4fdc17b80..bd2075501c98 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -39,6 +39,7 @@ common-meta.workspace = true common-options.workspace = true common-procedure.workspace = true common-runtime.workspace = true +common-stat.workspace = true common-telemetry.workspace = true common-time.workspace = true common-version.workspace = true diff --git a/src/meta-srv/src/discovery/lease.rs b/src/meta-srv/src/discovery/lease.rs index 46b92c0f1ae6..9d9e0d6c23e7 100644 --- a/src/meta-srv/src/discovery/lease.rs +++ b/src/meta-srv/src/discovery/lease.rs @@ -243,8 +243,10 @@ mod tests { version: "1.0.0".to_string(), git_commit: "1234567890".to_string(), start_time_ms: current_time_millis() as u64, - cpus: 0, - memory_bytes: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, hostname: "test_hostname".to_string(), }; @@ -269,8 +271,10 @@ mod tests { version: "1.0.0".to_string(), git_commit: "1234567890".to_string(), start_time_ms: current_time_millis() as u64, - cpus: 0, - memory_bytes: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, hostname: "test_hostname".to_string(), }; @@ -307,8 +311,10 @@ mod tests { version: "1.0.0".to_string(), git_commit: "1234567890".to_string(), start_time_ms: last_activity_ts as u64, - cpus: 0, - memory_bytes: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, hostname: "test_hostname".to_string(), }; diff --git a/src/meta-srv/src/election/rds/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs index a0890969f86c..014923c7c31c 100644 --- a/src/meta-srv/src/election/rds/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -1161,8 +1161,10 @@ mod tests { version: "test_version".to_string(), git_commit: "test_git_commit".to_string(), start_time_ms: 0, - cpus: 0, - memory_bytes: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, hostname: "test_hostname".to_string(), }; mysql_election.register_candidate(&node_info).await.unwrap(); diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index 14b2bbb40986..beab74dac41c 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -1000,8 +1000,10 @@ mod tests { version: "test_version".to_string(), git_commit: "test_git_commit".to_string(), start_time_ms: 0, - cpus: 0, - memory_bytes: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, hostname: "test_hostname".to_string(), }; pg_election.register_candidate(&node_info).await.unwrap(); diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index f144f3edc5dc..c96229f9cfb7 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -52,8 +52,10 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler { version: info.version, git_commit: info.git_commit, start_time_ms: info.start_time_ms, - cpus: info.cpus, - memory_bytes: info.memory_bytes, + total_cpu_millicores: info.total_cpu_millicores, + total_memory_bytes: info.total_memory_bytes, + cpu_usage_millicores: info.cpu_usage_millicores, + memory_usage_bytes: info.memory_usage_bytes, hostname: info.hostname, }; @@ -88,8 +90,10 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler { version: info.version, git_commit: info.git_commit, start_time_ms: info.start_time_ms, - cpus: info.cpus, - memory_bytes: info.memory_bytes, + total_cpu_millicores: info.total_cpu_millicores, + total_memory_bytes: info.total_memory_bytes, + cpu_usage_millicores: info.cpu_usage_millicores, + memory_usage_bytes: info.memory_usage_bytes, hostname: info.hostname, }; @@ -142,8 +146,10 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { version: info.version, git_commit: info.git_commit, start_time_ms: info.start_time_ms, - cpus: info.cpus, - memory_bytes: info.memory_bytes, + total_cpu_millicores: info.total_cpu_millicores, + total_memory_bytes: info.total_memory_bytes, + cpu_usage_millicores: info.cpu_usage_millicores, + memory_usage_bytes: info.memory_usage_bytes, hostname: info.hostname, }; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 4c2c7fcf5339..aeaea1337b51 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -22,7 +22,6 @@ use std::time::Duration; use clap::ValueEnum; use common_base::Plugins; use common_base::readable_size::ReadableSize; -use common_config::utils::ResourceSpec; use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_event_recorder::EventRecorderOptions; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; @@ -47,6 +46,7 @@ use common_options::datanode::DatanodeClientOptions; use common_options::memory::MemoryOptions; use common_procedure::ProcedureManagerRef; use common_procedure::options::ProcedureConfig; +use common_stat::ResourceStatRef; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use common_telemetry::{error, info, warn}; use common_wal::config::MetasrvWalConfig; @@ -372,12 +372,16 @@ pub struct MetasrvNodeInfo { pub git_commit: String, // The node start timestamp in milliseconds pub start_time_ms: u64, - // The node cpus + // The node total cpu millicores #[serde(default)] - pub cpus: u32, - // The node memory bytes + pub total_cpu_millicores: i64, #[serde(default)] - pub memory_bytes: u64, + // The node total memory bytes + pub total_memory_bytes: i64, + /// The node build cpu usage millicores + pub cpu_usage_millicores: i64, + /// The node build memory usage bytes + pub memory_usage_bytes: i64, // The node hostname #[serde(default)] pub hostname: String, @@ -397,15 +401,19 @@ impl From for api::v1::meta::MetasrvNodeInfo { version: node_info.version.clone(), git_commit: node_info.git_commit.clone(), start_time_ms: node_info.start_time_ms, - cpus: node_info.cpus, - memory_bytes: node_info.memory_bytes, + cpus: node_info.total_cpu_millicores as u32, + memory_bytes: node_info.total_memory_bytes as u64, // The canonical location for node information. info: Some(api::v1::meta::NodeInfo { version: node_info.version, git_commit: node_info.git_commit, start_time_ms: node_info.start_time_ms, - cpus: node_info.cpus, - memory_bytes: node_info.memory_bytes, + total_cpu_millicores: node_info.total_cpu_millicores, + total_memory_bytes: node_info.total_memory_bytes, + cpu_usage_millicores: node_info.cpu_usage_millicores, + memory_usage_bytes: node_info.memory_usage_bytes, + cpus: node_info.total_cpu_millicores as u32, + memory_bytes: node_info.total_memory_bytes as u64, hostname: node_info.hostname, }), } @@ -517,7 +525,7 @@ pub struct Metasrv { region_flush_ticker: Option, table_id_sequence: SequenceRef, reconciliation_manager: ReconciliationManagerRef, - resource_spec: ResourceSpec, + resource_stat: ResourceStatRef, plugins: Plugins, } @@ -699,8 +707,8 @@ impl Metasrv { self.start_time_ms } - pub fn resource_spec(&self) -> &ResourceSpec { - &self.resource_spec + pub fn resource_stat(&self) -> &ResourceStatRef { + &self.resource_stat } pub fn node_info(&self) -> MetasrvNodeInfo { @@ -710,8 +718,10 @@ impl Metasrv { version: build_info.version.to_string(), git_commit: build_info.commit_short.to_string(), start_time_ms: self.start_time_ms(), - cpus: self.resource_spec().cpus as u32, - memory_bytes: self.resource_spec().memory.unwrap_or_default().as_bytes(), + total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(), + total_memory_bytes: self.resource_stat.get_total_memory_bytes(), + cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(), + memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(), hostname: hostname::get() .unwrap_or_default() .to_string_lossy() diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 9cc0b8cc72dd..0bcc914e276a 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -46,6 +46,7 @@ use common_meta::stats::topic::TopicStatsRegistry; use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator}; use common_procedure::ProcedureManagerRef; use common_procedure::local::{LocalManager, ManagerConfig}; +use common_stat::ResourceStatImpl; use common_telemetry::{info, warn}; use snafu::{ResultExt, ensure}; use store_api::storage::MAX_REGION_SEQ; @@ -517,6 +518,9 @@ impl MetasrvBuilder { .try_start() .context(error::InitReconciliationManagerSnafu)?; + let mut resource_stat = ResourceStatImpl::default(); + resource_stat.start_collect_cpu_usage(); + Ok(Metasrv { state, started: Arc::new(AtomicBool::new(false)), @@ -556,7 +560,7 @@ impl MetasrvBuilder { table_id_sequence, reconciliation_manager, topic_stats_registry, - resource_spec: Default::default(), + resource_stat: Arc::new(resource_stat), }) } } diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs index e39337c37466..5c0ae4c71fbc 100644 --- a/src/meta-srv/src/service/cluster.rs +++ b/src/meta-srv/src/service/cluster.rs @@ -97,8 +97,10 @@ impl Metasrv { version: build_info.version.to_string(), git_commit: build_info.commit_short.to_string(), start_time_ms: self.start_time_ms(), - cpus: self.resource_spec().cpus as u32, - memory_bytes: self.resource_spec().memory.unwrap_or_default().as_bytes(), + total_cpu_millicores: self.resource_stat().get_total_cpu_millicores(), + total_memory_bytes: self.resource_stat().get_total_memory_bytes(), + cpu_usage_millicores: self.resource_stat().get_cpu_usage_millicores(), + memory_usage_bytes: self.resource_stat().get_memory_usage_bytes(), hostname: hostname::get() .unwrap_or_default() .to_string_lossy() diff --git a/src/standalone/src/information_extension.rs b/src/standalone/src/information_extension.rs index b15ab74a9897..852da25e653f 100644 --- a/src/standalone/src/information_extension.rs +++ b/src/standalone/src/information_extension.rs @@ -24,6 +24,7 @@ use common_meta::key::flow::flow_state::FlowStat; use common_meta::peer::Peer; use common_procedure::{ProcedureInfo, ProcedureManagerRef}; use common_query::request::QueryRequest; +use common_stat::{ResourceStatImpl, ResourceStatRef}; use datanode::region_server::RegionServer; use flow::StreamingEngine; use snafu::ResultExt; @@ -35,15 +36,19 @@ pub struct StandaloneInformationExtension { procedure_manager: ProcedureManagerRef, start_time_ms: u64, flow_streaming_engine: RwLock>>, + resource_stat: ResourceStatRef, } impl StandaloneInformationExtension { pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self { + let mut resource_stat = ResourceStatImpl::default(); + resource_stat.start_collect_cpu_usage(); Self { region_server, procedure_manager, start_time_ms: common_time::util::current_time_millis() as u64, flow_streaming_engine: RwLock::new(None), + resource_stat: Arc::new(resource_stat), } } @@ -75,8 +80,10 @@ impl InformationExtension for StandaloneInformationExtension { // Use `self.start_time_ms` instead. // It's not precise but enough. start_time_ms: self.start_time_ms, - cpus: common_stat::get_total_cpu_millicores() as u32, - memory_bytes: common_stat::get_total_memory_bytes() as u64, + total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(), + total_memory_bytes: self.resource_stat.get_total_memory_bytes(), + cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(), + memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(), hostname: hostname::get() .unwrap_or_default() .to_string_lossy() diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 13e4cc31153b..91cb0f5ad2a8 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -35,6 +35,7 @@ common-procedure.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true +common-stat.workspace = true common-telemetry.workspace = true common-test-util.workspace = true common-time.workspace = true diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 6be94cbcd4ec..19c2ce413469 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -44,6 +44,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_runtime::Builder as RuntimeBuilder; use common_runtime::runtime::BuilderBuild; +use common_stat::ResourceStatImpl; use common_test_util::temp_dir::create_temp_dir; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::config::DatanodeOptions; @@ -411,11 +412,15 @@ impl GreptimeDbClusterBuilder { let fe_opts = self.build_frontend_options(); + let mut resource_stat = ResourceStatImpl::default(); + resource_stat.start_collect_cpu_usage(); + let heartbeat_task = HeartbeatTask::new( &fe_opts, meta_client.clone(), HeartbeatOptions::default(), Arc::new(handlers_executor), + Arc::new(resource_stat), ); let instance = FrontendBuilder::new( diff --git a/tests/cases/distributed/information_schema/cluster_info.result b/tests/cases/distributed/information_schema/cluster_info.result index 63d02f435526..4ab8f6808df1 100644 --- a/tests/cases/distributed/information_schema/cluster_info.result +++ b/tests/cases/distributed/information_schema/cluster_info.result @@ -11,8 +11,10 @@ DESC TABLE CLUSTER_INFO; | peer_type | String | | NO | | FIELD | | peer_addr | String | | YES | | FIELD | | peer_hostname | String | | YES | | FIELD | -| total_cpu_millicores | UInt32 | | NO | | FIELD | -| total_memory_bytes | UInt64 | | NO | | FIELD | +| total_cpu_millicores | Int64 | | NO | | FIELD | +| total_memory_bytes | Int64 | | NO | | FIELD | +| cpu_usage_millicores | Int64 | | NO | | FIELD | +| memory_usage_bytes | Int64 | | NO | | FIELD | | version | String | | NO | | FIELD | | git_commit | String | | NO | | FIELD | | start_time | TimestampMillisecond | | YES | | FIELD | diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 7da0520b3eb7..f9aa9b6b7578 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -71,18 +71,20 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | check_constraints | constraint_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | check_constraints | constraint_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | check_constraints | constraint_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | cluster_info | active_time | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | cluster_info | git_commit | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | cluster_info | node_status | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | cluster_info | active_time | 13 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | cluster_info | cpu_usage_millicores | 7 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | +| greptime | information_schema | cluster_info | git_commit | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | cluster_info | memory_usage_bytes | 8 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | +| greptime | information_schema | cluster_info | node_status | 14 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | cluster_info | peer_addr | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | cluster_info | peer_hostname | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | cluster_info | peer_id | 1 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | cluster_info | peer_type | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | cluster_info | start_time | 9 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | | -| greptime | information_schema | cluster_info | total_cpu_millicores | 5 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | information_schema | cluster_info | total_memory_bytes | 6 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | -| greptime | information_schema | cluster_info | uptime | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | cluster_info | version | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | cluster_info | start_time | 11 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | | +| greptime | information_schema | cluster_info | total_cpu_millicores | 5 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | +| greptime | information_schema | cluster_info | total_memory_bytes | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | +| greptime | information_schema | cluster_info | uptime | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | cluster_info | version | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | collation_character_set_applicability | character_set_name | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | collation_character_set_applicability | collation_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | collations | character_set_name | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | diff --git a/tests/cases/standalone/information_schema/cluster_info.result b/tests/cases/standalone/information_schema/cluster_info.result index 85429840284a..bc9520ba6aaa 100644 --- a/tests/cases/standalone/information_schema/cluster_info.result +++ b/tests/cases/standalone/information_schema/cluster_info.result @@ -11,8 +11,10 @@ DESC TABLE CLUSTER_INFO; | peer_type | String | | NO | | FIELD | | peer_addr | String | | YES | | FIELD | | peer_hostname | String | | YES | | FIELD | -| total_cpu_millicores | UInt32 | | NO | | FIELD | -| total_memory_bytes | UInt64 | | NO | | FIELD | +| total_cpu_millicores | Int64 | | NO | | FIELD | +| total_memory_bytes | Int64 | | NO | | FIELD | +| cpu_usage_millicores | Int64 | | NO | | FIELD | +| memory_usage_bytes | Int64 | | NO | | FIELD | | version | String | | NO | | FIELD | | git_commit | String | | NO | | FIELD | | start_time | TimestampMillisecond | | YES | | FIELD |