Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d000eedd3739c003bb139aa42cefe05521a60f7d" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "710f1fa95cf68c1cf4e8b6d757bb418c4fddf4ed" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down Expand Up @@ -207,6 +207,7 @@ rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
# It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms.
hostname = "0.4.0"
rustls = { version = "0.23.25", default-features = false }
sea-query = "0.32"
serde = { version = "1.0", features = ["derive"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const PEER_TYPE_METASRV: &str = "METASRV";
const PEER_ID: &str = "peer_id";
const PEER_TYPE: &str = "peer_type";
const PEER_ADDR: &str = "peer_addr";
const PEER_HOSTNAME: &str = "peer_hostname";
const CPUS: &str = "cpus";
const MEMORY_BYTES: &str = "memory_bytes";
const VERSION: &str = "version";
Expand All @@ -74,6 +75,7 @@ const INIT_CAPACITY: usize = 42;
/// - `uptime`: the uptime of the peer.
/// - `active_time`: the time since the last activity of the peer.
/// - `node_status`: the status info of the peer.
/// - `peer_hostname`: the hostname of the peer.
///
#[derive(Debug)]
pub(super) struct InformationSchemaClusterInfo {
Expand All @@ -94,6 +96,7 @@ impl InformationSchemaClusterInfo {
ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(PEER_HOSTNAME, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(CPUS, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(MEMORY_BYTES, ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
Expand Down Expand Up @@ -155,6 +158,7 @@ struct InformationSchemaClusterInfoBuilder {
peer_ids: Int64VectorBuilder,
peer_types: StringVectorBuilder,
peer_addrs: StringVectorBuilder,
peer_hostnames: StringVectorBuilder,
cpus: UInt32VectorBuilder,
memory_bytes: UInt64VectorBuilder,
versions: StringVectorBuilder,
Expand All @@ -173,6 +177,7 @@ impl InformationSchemaClusterInfoBuilder {
peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
peer_hostnames: StringVectorBuilder::with_capacity(INIT_CAPACITY),
cpus: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
memory_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
Expand Down Expand Up @@ -203,6 +208,7 @@ impl InformationSchemaClusterInfoBuilder {
(PEER_ID, &Value::from(peer_id)),
(PEER_TYPE, &Value::from(peer_type)),
(PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
(PEER_HOSTNAME, &Value::from(node_info.hostname.as_str())),
(VERSION, &Value::from(node_info.version.as_str())),
(GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
];
Expand All @@ -214,6 +220,7 @@ impl InformationSchemaClusterInfoBuilder {
self.peer_ids.push(Some(peer_id));
self.peer_types.push(Some(peer_type));
self.peer_addrs.push(Some(&node_info.peer.addr));
self.peer_hostnames.push(Some(&node_info.hostname));
self.versions.push(Some(&node_info.version));
self.git_commits.push(Some(&node_info.git_commit));
if node_info.start_time_ms > 0 {
Expand Down Expand Up @@ -253,6 +260,7 @@ impl InformationSchemaClusterInfoBuilder {
Arc::new(self.peer_ids.finish()),
Arc::new(self.peer_types.finish()),
Arc::new(self.peer_addrs.finish()),
Arc::new(self.peer_hostnames.finish()),
Arc::new(self.cpus.finish()),
Arc::new(self.memory_bytes.finish()),
Arc::new(self.versions.finish()),
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ pub struct NodeInfo {
// The node build memory bytes
#[serde(default)]
pub memory_bytes: u64,
// The node build hostname
#[serde(default)]
pub hostname: String,
}

#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -332,6 +335,7 @@ mod tests {
start_time_ms: 1,
cpus: 0,
memory_bytes: 0,
hostname: "test_hostname".to_string(),
};

let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ datatypes.workspace = true
file-engine.workspace = true
futures.workspace = true
futures-util.workspace = true
hostname.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
log-store.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ impl HeartbeatTask {
start_time_ms: node_epoch,
cpus,
memory_bytes,
hostname: hostname::get()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
}),
node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
types: workload_types.iter().map(|w| w.to_i32()).collect(),
Expand Down
1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ enum_dispatch = "0.3"
futures.workspace = true
get-size2 = "0.1.2"
greptime-proto.workspace = true
hostname.workspace = true
http.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions src/flow/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ impl HeartbeatTask {
start_time_ms,
cpus,
memory_bytes,
hostname: hostname::get()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
})
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ datafusion-expr.workspace = true
datanode.workspace = true
datatypes.workspace = true
futures.workspace = true
hostname.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ impl HeartbeatTask {
start_time_ms,
cpus,
memory_bytes,
hostname: hostname::get()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
})
}

Expand Down
74 changes: 56 additions & 18 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ impl ProcedureExecutor for MetaClient {
}
}

// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated fields are removed from the proto.
#[allow(deprecated)]
#[async_trait::async_trait]
impl ClusterInfo for MetaClient {
type Error = Error;
Expand All @@ -372,25 +374,61 @@ impl ClusterInfo for MetaClient {
let (leader, followers) = cluster_client.get_metasrv_peers().await?;
followers
.into_iter()
.map(|node| NodeInfo {
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
version: node.version,
git_commit: node.git_commit,
start_time_ms: node.start_time_ms,
cpus: node.cpus,
memory_bytes: node.memory_bytes,
.map(|node| {
if let Some(node_info) = node.info {
NodeInfo {
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
version: node_info.version,
git_commit: node_info.git_commit,
start_time_ms: node_info.start_time_ms,
cpus: node_info.cpus,
memory_bytes: node_info.memory_bytes,
hostname: node_info.hostname,
}
} else {
// TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
NodeInfo {
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
version: node.version,
git_commit: node.git_commit,
start_time_ms: node.start_time_ms,
cpus: node.cpus,
memory_bytes: node.memory_bytes,
hostname: "".to_string(),
}
}
})
.chain(leader.into_iter().map(|node| NodeInfo {
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
version: node.version,
git_commit: node.git_commit,
start_time_ms: node.start_time_ms,
cpus: node.cpus,
memory_bytes: node.memory_bytes,
.chain(leader.into_iter().map(|node| {
if let Some(node_info) = node.info {
NodeInfo {
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
version: node_info.version,
git_commit: node_info.git_commit,
start_time_ms: node_info.start_time_ms,
cpus: node_info.cpus,
memory_bytes: node_info.memory_bytes,
hostname: node_info.hostname,
}
} else {
// TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
NodeInfo {
peer: node.peer.unwrap_or_default(),
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
version: node.version,
git_commit: node.git_commit,
start_time_ms: node.start_time_ms,
cpus: node.cpus,
memory_bytes: node.memory_bytes,
hostname: "".to_string(),
}
}
}))
.collect::<Vec<_>>()
} else {
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ either.workspace = true
etcd-client.workspace = true
futures.workspace = true
h2 = "0.3"
hostname.workspace = true
http-body-util = "0.1"
humantime.workspace = true
humantime-serde.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/discovery/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ mod tests {
start_time_ms: current_time_millis() as u64,
cpus: 0,
memory_bytes: 0,
hostname: "test_hostname".to_string(),
};

let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
Expand All @@ -270,6 +271,7 @@ mod tests {
start_time_ms: current_time_millis() as u64,
cpus: 0,
memory_bytes: 0,
hostname: "test_hostname".to_string(),
};

in_memory
Expand Down Expand Up @@ -307,6 +309,7 @@ mod tests {
start_time_ms: last_activity_ts as u64,
cpus: 0,
memory_bytes: 0,
hostname: "test_hostname".to_string(),
};

let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/election/rds/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,7 @@ mod tests {
start_time_ms: 0,
cpus: 0,
memory_bytes: 0,
hostname: "test_hostname".to_string(),
};
mysql_election.register_candidate(&node_info).await.unwrap();
}
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/election/rds/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,7 @@ mod tests {
start_time_ms: 0,
cpus: 0,
memory_bytes: 0,
hostname: "test_hostname".to_string(),
};
pg_election.register_candidate(&node_info).await.unwrap();
}
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/handler/collect_cluster_info_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
start_time_ms: info.start_time_ms,
cpus: info.cpus,
memory_bytes: info.memory_bytes,
hostname: info.hostname,
};

put_into_memory_store(ctx, key, value).await?;
Expand Down Expand Up @@ -89,6 +90,7 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
start_time_ms: info.start_time_ms,
cpus: info.cpus,
memory_bytes: info.memory_bytes,
hostname: info.hostname,
};

put_into_memory_store(ctx, key, value).await?;
Expand Down Expand Up @@ -142,6 +144,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
start_time_ms: info.start_time_ms,
cpus: info.cpus,
memory_bytes: info.memory_bytes,
hostname: info.hostname,
};

put_into_memory_store(ctx, key, value).await?;
Expand Down
Loading
Loading