diff --git a/Cargo.lock b/Cargo.lock index 22459a936a..27d5c89511 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6755,6 +6755,7 @@ dependencies = [ "restate-errors", "restate-ingestion-client", "restate-metadata-providers", + "restate-metadata-server-grpc", "restate-metadata-store", "restate-service-client", "restate-service-protocol", diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index b6dd05137a..f8e1d27521 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -28,6 +28,7 @@ restate-bifrost = { workspace = true, features = ["local-loglet", "replicated-lo restate-core = { workspace = true } restate-errors = { workspace = true } restate-ingestion-client = { workspace = true } +restate-metadata-server-grpc = { workspace = true, features = ["grpc-client"] } restate-metadata-store = { workspace = true } restate-metadata-providers = { workspace = true } restate-service-client = { workspace = true } diff --git a/crates/admin/src/rest_api/internal_cluster_common.rs b/crates/admin/src/rest_api/internal_cluster_common.rs new file mode 100644 index 0000000000..a21d051bc1 --- /dev/null +++ b/crates/admin/src/rest_api/internal_cluster_common.rs @@ -0,0 +1,76 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde::Serialize; + +use restate_types::logs::metadata::ProviderConfiguration; +use restate_types::partition_table::PartitionReplication; +use restate_types::replication::ReplicationProperty; + +#[derive(Debug, Clone, Serialize)] +pub(super) struct LogsProviderView { + /// Provider kind (in-memory, local, or replicated). + pub kind: String, + /// Replication property if provider kind is replicated. + pub replication_property: Option, + /// Target nodeset size if provider kind is replicated. + pub target_nodeset_size: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct PartitionReplicationView { + /// Replication mode. + pub mode: PartitionReplicationMode, + /// Required copies per location scope when mode is `limit`. + #[serde(skip_serializing_if = "Option::is_none")] + pub copies: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] +pub(super) enum PartitionReplicationMode { + Everywhere, + Limit, +} + +pub(super) fn logs_provider_view(provider: ProviderConfiguration) -> LogsProviderView { + match provider { + ProviderConfiguration::InMemory => LogsProviderView { + kind: "in-memory".to_owned(), + replication_property: None, + target_nodeset_size: None, + }, + ProviderConfiguration::Local => LogsProviderView { + kind: "local".to_owned(), + replication_property: None, + target_nodeset_size: None, + }, + ProviderConfiguration::Replicated(config) => LogsProviderView { + kind: "replicated".to_owned(), + replication_property: Some(config.replication_property.clone()), + target_nodeset_size: Some(config.target_nodeset_size.as_u32()), + }, + } +} + +pub(super) fn partition_replication_view( + partition_replication: &PartitionReplication, +) -> PartitionReplicationView { + match partition_replication { + PartitionReplication::Everywhere => PartitionReplicationView { + mode: PartitionReplicationMode::Everywhere, + copies: None, + }, + PartitionReplication::Limit(replication_property) => PartitionReplicationView { + mode: PartitionReplicationMode::Limit, + copies: Some(replication_property.clone()), + }, + } +} diff --git a/crates/admin/src/rest_api/internal_cluster_config.rs b/crates/admin/src/rest_api/internal_cluster_config.rs new file mode 100644 index 0000000000..32c17dcc1e --- /dev/null +++ b/crates/admin/src/rest_api/internal_cluster_config.rs @@ -0,0 +1,41 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use axum::Json; +use serde::Serialize; + +use restate_core::Metadata; + +use super::internal_cluster_common::{ + LogsProviderView, PartitionReplicationView, logs_provider_view, partition_replication_view, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct InternalClusterConfigResponse { + /// Total number of partitions configured for the cluster. + pub num_partitions: u32, + /// Partition replication strategy. + pub partition_replication: PartitionReplicationView, + /// Default provider used when creating new logs. + pub logs_provider: LogsProviderView, +} + +pub async fn get_internal_cluster_config() -> Json { + Json(Metadata::with_current(|metadata| { + let logs = metadata.logs_ref(); + let partition_table = metadata.partition_table_ref(); + + InternalClusterConfigResponse { + num_partitions: u32::from(partition_table.num_partitions()), + partition_replication: partition_replication_view(partition_table.replication()), + logs_provider: logs_provider_view(logs.configuration().default_provider.clone()), + } + })) +} diff --git a/crates/admin/src/rest_api/internal_cluster_status.rs b/crates/admin/src/rest_api/internal_cluster_status.rs new file mode 100644 index 0000000000..53912d68db --- /dev/null +++ b/crates/admin/src/rest_api/internal_cluster_status.rs @@ -0,0 +1,1032 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use axum::Json; +use axum::extract::Query; +use axum::http::StatusCode; +use futures::future::join_all; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use tokio::time::timeout; +use tonic::Code; + +use restate_core::Metadata; +use restate_core::network::net_util::{DNSResolution, create_tonic_channel}; +use restate_core::protobuf::cluster_ctrl_svc::{ClusterStateRequest, new_cluster_ctrl_client}; +use restate_core::protobuf::node_ctl_svc::{IdentResponse, new_node_ctl_client}; +use restate_metadata_server_grpc::grpc::{ + StatusResponse as MetadataServerStatusResponse, new_metadata_server_client, +}; +use restate_types::config::{Configuration, NetworkingOptions}; +use restate_types::logs::metadata::{Logs, ProviderKind}; +use restate_types::nodes_config::{NodesConfiguration, Role}; +use restate_types::protobuf::cluster::{ + AliveNode, ClusterState, ReplayStatus, RunMode, node_state, +}; +use restate_types::protobuf::common::MetadataServerStatus; +use restate_types::replicated_loglet::ReplicatedLogletParams; +use restate_types::replication::ReplicationProperty; +use restate_types::{GenerationalNodeId, PlainNodeId, Versioned}; + +use super::internal_cluster_common::{LogsProviderView, logs_provider_view}; +use crate::rest_api::error::GenericRestError; + +#[derive(Debug, Default, Deserialize)] +pub struct InternalClusterStatusQuery { + /// Include expanded sections for nodes/logs/partitions/metadata-servers. + #[serde(default)] + extra: bool, +} + +const OPTIONAL_PROBE_TIMEOUT: Duration = Duration::from_secs(1); + +type NodeRuntimeInfoMap = HashMap>; +type MetadataServerStatusMap = + HashMap>; + +#[derive(Debug, Clone, Serialize)] +pub struct InternalClusterStatusResponse { + /// Version of the nodes configuration used to build this response. + pub node_configuration_version: u32, + /// Cluster nodes. Extra-only fields are included when `extra=true`. + pub nodes: Vec, + /// Expanded logs metadata information. Present only when `extra=true`. + #[serde(skip_serializing_if = "Option::is_none")] + pub logs: Option, + /// Expanded partition-processor information. Present only when `extra=true`. + #[serde(skip_serializing_if = "Option::is_none")] + pub partitions: Option, + /// Expanded metadata-role node status. Present only when `extra=true`. + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata_servers: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum NodeLiveness { + Alive, + Dead, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NodeStatus { + /// Node identifier including generation. + pub node_id: String, + /// Human-readable node name from nodes configuration. + pub name: String, + /// Enabled roles for this node. + pub roles: Vec, + /// Static and cluster-state derived state for this node. + pub state: NodeStateView, + /// Probe-derived and role-specific statuses for this node. + pub status: NodeStatusView, + /// Compact partition leadership counters for this node. + pub partitions: NodePartitionSummary, + /// Compact log placement counters for this node. + pub logs: NodeLogSummary, + /// Uptime in seconds, if reported by cluster state. + #[serde(skip_serializing_if = "Option::is_none")] + pub uptime_seconds: Option, + /// Seconds since process start according to the direct node status probe. Present only when + /// `extra=true` and the probe succeeded. + #[serde(skip_serializing_if = "Option::is_none")] + pub process_age_seconds: Option, + /// Node generation from nodes configuration. Present only when `extra=true`. + #[serde(skip_serializing_if = "Option::is_none")] + pub generation: Option, + /// Advertised address used by cluster internals. Present only when `extra=true`. + #[serde(skip_serializing_if = "Option::is_none")] + pub address: Option, + /// Node locality label. Present only when `extra=true`. + #[serde(skip_serializing_if = "Option::is_none")] + pub location: Option, + /// Metadata versions observed by this node. Present only when `extra=true` and the direct node + /// status probe succeeded. + #[serde(skip_serializing_if = "Option::is_none")] + pub versions: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NodeStateView { + /// Whether this node is currently alive in cluster state. + pub liveness: NodeLiveness, + /// Log-server storage state from nodes configuration. Present only when `extra=true`. + #[serde(skip_serializing_if = "Option::is_none")] + pub storage: Option, + /// Worker placement state from nodes configuration. Present only when `extra=true`. + #[serde(skip_serializing_if = "Option::is_none")] + pub worker: Option, +} + +#[derive(Debug, Clone, Default, Serialize)] +pub struct NodeStatusView { + /// Node-level metadata status. Without `extra` this comes from the metadata status probe; + /// with `extra` it comes from the direct node status probe to match `restatectl nodes + /// list --extra`. + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, + /// High-level node RPC status from the direct node status probe. + #[serde(skip_serializing_if = "Option::is_none")] + pub node: Option, + /// Admin role status from the direct node status probe. + #[serde(skip_serializing_if = "Option::is_none")] + pub admin: Option, + /// Worker role status from the direct node status probe. + #[serde(skip_serializing_if = "Option::is_none")] + pub worker: Option, + /// Log-server role status from the direct node status probe. + #[serde(skip_serializing_if = "Option::is_none")] + pub log_server: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NodePartitionSummary { + /// Number of partitions where this node is currently leader. + pub leader: u16, + /// Number of partitions where this node is currently follower. + pub follower: u16, + /// Number of partitions transitioning follower -> leader. + pub upgrading: u16, + /// Number of partitions transitioning leader -> follower. + pub downgrading: u16, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NodeLogSummary { + /// Number of writable log tails where this node is in the nodeset. + pub nodeset_member_count: usize, + /// Number of writable log tails where this node is the sequencer. + pub sequencer_count: usize, + /// Whether sequencer placement appears optimal for this node. + pub sequencer_placement_optimal: bool, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NodeObservedVersions { + /// Nodes configuration version observed by this node. + pub nodes_config: u32, + /// Logs metadata version observed by this node. + pub logs: u32, + /// Schema version observed by this node. + pub schema: u32, + /// Partition-table version observed by this node. + pub partition_table: u32, +} + +#[derive(Debug, Clone, Serialize)] +pub struct LogsSection { + /// Version of logs metadata. + pub version: u32, + /// Default provider used when creating new logs. + pub default_provider: LogsProviderView, + /// One entry per log id. + pub entries: Vec, +} + +#[derive(Debug, Clone, Serialize)] +pub struct LogEntry { + /// Log identifier. + pub log_id: u32, + /// Number of segments in this log chain. + pub segment_count: usize, + /// Base LSN of the tail segment. + pub from_lsn: u64, + /// Tail segment provider kind. + pub provider_kind: String, + /// Replicated loglet id (for replicated tails). + pub loglet_id: Option, + /// Replication property (for replicated tails). + pub replication_property: Option, + /// Sequencer node id (for replicated tails). + pub sequencer: Option, + /// Nodeset members (for replicated tails). + pub nodeset: Option>, +} + +#[derive(Debug, Clone, Serialize)] +pub struct PartitionsSection { + /// Nodes-config version attached to cluster state. + pub node_configuration_version: u32, + /// Partition-table version attached to cluster state. + pub partition_table_version: u32, + /// Partition-processor entries across alive nodes. + pub processors: Vec, + /// Nodes currently marked dead. + pub dead_nodes: Vec, +} + +#[derive(Debug, Clone, Serialize)] +pub struct PartitionProcessorEntry { + /// Partition identifier. + pub partition_id: u32, + /// Host node for this processor instance (with generation). + pub host_node: String, + /// Planned run mode. + pub planned_mode: String, + /// Effective run mode. + pub effective_mode: String, + /// Replay status. + pub replay_status: String, + /// Last observed leader epoch. + pub observed_leader_epoch: Option, + /// Last observed leader node (plain id + generation if present). + pub observed_leader_node: Option, + /// Last applied log LSN. + pub last_applied_lsn: Option, + /// Last durable LSN. + pub durable_lsn: Option, + /// Last archived LSN. + pub last_archived_lsn: Option, + /// Target tail LSN if catching up. + pub target_tail_lsn: Option, + /// Computed lag `(target_tail - 1) - last_applied`. + pub lsn_lag: Option, + /// Last processor status update timestamp in unix milliseconds. + pub updated_at_unix_millis: Option, + /// Whether the processor currently sees itself as leader. + pub sees_itself_as_leader: bool, + /// Whether this processor's observed epoch is older than peers for the same partition. + pub leadership_epoch_outdated: bool, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DeadNodeRow { + /// Plain node id. + pub node_id: String, + /// Last time this node was seen alive in unix milliseconds. + pub last_seen_alive_unix_millis: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct MetadataServersSection { + /// Reachable metadata-role nodes with status from metadata-server `status()`. + pub servers: Vec, + /// Metadata-role nodes for which the metadata-server status RPC failed. + pub unreachable_nodes: Vec, +} + +#[derive(Debug, Clone, Serialize)] +pub struct MetadataServerEntry { + /// Plain node id. + pub node_id: String, + /// Metadata server state. + pub status: String, + /// Metadata cluster configuration version on this node. + pub configuration_version: Option, + /// Current metadata leader plain node id. + pub leader: Option, + /// Metadata cluster members. + pub members: Vec, + /// Last raft log index applied. + pub raft_applied: u64, + /// Last raft log index committed. + pub raft_committed: u64, + /// Current raft term. + pub raft_term: u64, + /// Raft log length computed as `(last_index + 1) - first_index`. + pub raft_log_length: u64, + /// Last snapshot index. + pub snapshot_index: u64, + /// Snapshot size in bytes. + pub snapshot_size_bytes: u64, +} + +#[derive(Debug, Clone, Serialize)] +pub struct UnreachableNodeRow { + /// Plain node id. + pub node_id: String, + /// Error message returned when querying metadata server status. + pub error: String, +} + +pub async fn get_internal_cluster_status( + Query(query): Query, +) -> Result, GenericRestError> { + let (networking, nodes_config, logs, my_node_id) = gather_local_snapshots()?; + let cluster_state = fetch_cluster_state(&networking, &nodes_config, my_node_id).await?; + let alive_nodes = alive_nodes(&cluster_state); + + let (metadata_server_statuses, node_runtime_info) = if query.extra { + tokio::join!( + fetch_metadata_server_status(&networking, &nodes_config, &alive_nodes, true), + fetch_node_runtime_info(&networking, &nodes_config), + ) + } else { + ( + fetch_metadata_server_status(&networking, &nodes_config, &alive_nodes, false).await, + NodeRuntimeInfoMap::new(), + ) + }; + + Ok(Json(InternalClusterStatusResponse { + node_configuration_version: nodes_config.version().into(), + nodes: build_nodes( + &nodes_config, + &logs, + &cluster_state, + &metadata_server_statuses, + &node_runtime_info, + query.extra, + ), + logs: query.extra.then(|| build_logs_section(&logs)), + partitions: query + .extra + .then(|| build_partitions_section(&cluster_state)), + metadata_servers: query + .extra + .then(|| build_metadata_servers_section(&metadata_server_statuses)), + })) +} + +fn gather_local_snapshots() -> Result< + ( + NetworkingOptions, + std::sync::Arc, + std::sync::Arc, + GenerationalNodeId, + ), + GenericRestError, +> { + let networking = Configuration::pinned().networking.clone(); + + let nodes_config = Metadata::with_current(|m| m.nodes_config_snapshot()); + let logs = Metadata::with_current(|m| m.logs_snapshot()); + let my_node_id = Metadata::with_current(|m| m.my_node_id_opt()).ok_or_else(|| { + GenericRestError::new( + StatusCode::SERVICE_UNAVAILABLE, + "The cluster does not seem to be provisioned yet. Try again later.", + ) + })?; + + Ok((networking, nodes_config, logs, my_node_id)) +} + +async fn fetch_cluster_state( + networking: &NetworkingOptions, + nodes_config: &NodesConfiguration, + my_node_id: GenerationalNodeId, +) -> Result { + let node = nodes_config.find_node_by_id(my_node_id).map_err(|_| { + GenericRestError::new( + StatusCode::SERVICE_UNAVAILABLE, + "Could not resolve this node from cluster metadata.", + ) + })?; + + let channel = create_tonic_channel( + node.ctrl_address().into_owned(), + networking, + DNSResolution::Gai, + ); + let mut client = new_cluster_ctrl_client(channel, networking); + + let cluster_state = client + .get_cluster_state(ClusterStateRequest::default()) + .await + .map_err(|err| { + GenericRestError::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to fetch cluster state: {}", err.message()), + ) + })? + .into_inner() + .cluster_state + .ok_or_else(|| { + GenericRestError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "Cluster state response did not contain a payload.", + ) + })?; + + Ok(cluster_state) +} + +fn alive_nodes(cluster_state: &ClusterState) -> HashSet { + cluster_state + .nodes + .iter() + .filter_map(|(node_id, node_state)| { + matches!(node_state.state.as_ref(), Some(node_state::State::Alive(_))) + .then_some(PlainNodeId::from(*node_id)) + }) + .collect() +} + +async fn fetch_node_runtime_info( + networking: &NetworkingOptions, + nodes_config: &NodesConfiguration, +) -> NodeRuntimeInfoMap { + let futures = nodes_config.iter().map(|(plain_node_id, node)| { + let address = node.ctrl_address().into_owned(); + let networking = networking.clone(); + async move { + let result = timeout(OPTIONAL_PROBE_TIMEOUT, async move { + let channel = create_tonic_channel(address, &networking, DNSResolution::Gai); + new_node_ctl_client(channel, &networking) + .get_ident(()) + .await + .map(|response| response.into_inner()) + .map_err(|err| err.message().to_owned()) + }) + .await + .unwrap_or_else(|_| { + Err(format!( + "Timed out calling GetIdent after {}s", + OPTIONAL_PROBE_TIMEOUT.as_secs() + )) + }); + + (plain_node_id, result) + } + }); + + join_all(futures) + .await + .into_iter() + .collect::() +} + +async fn fetch_metadata_server_status( + networking: &NetworkingOptions, + nodes_config: &NodesConfiguration, + alive_nodes: &HashSet, + include_dead_nodes: bool, +) -> MetadataServerStatusMap { + let futures = nodes_config + .iter_role(Role::MetadataServer) + .filter(|(plain_node_id, _)| include_dead_nodes || alive_nodes.contains(plain_node_id)) + .map(|(plain_node_id, node)| { + let address = node.address.clone(); + let networking = networking.clone(); + async move { + let channel = create_tonic_channel(address, &networking, DNSResolution::Gai); + let result = new_metadata_server_client(channel, &networking) + .status(()) + .await + .map(|response| response.into_inner()); + + (plain_node_id, result) + } + }); + + join_all(futures) + .await + .into_iter() + .collect::() +} + +fn build_nodes( + nodes_config: &NodesConfiguration, + logs: &Logs, + cluster_state: &ClusterState, + metadata_statuses: &MetadataServerStatusMap, + node_runtime_info: &NodeRuntimeInfoMap, + include_extra: bool, +) -> Vec { + nodes_config + .iter() + .sorted_by_key(|(node_id, _)| *node_id) + .filter_map(|(plain_node_id, node_config)| { + let roles = node_config + .roles + .iter() + .map(|r| r.to_string()) + .sorted() + .collect::>(); + + let node_state = cluster_state.nodes.get(&u32::from(plain_node_id))?; + + match node_state.state.as_ref() { + Some(node_state::State::Alive(alive)) => { + let node_id = alive + .generational_node_id + .map(GenerationalNodeId::from) + .unwrap_or(node_config.current_generation); + + let ( + leader_partitions, + follower_partitions, + upgrading_partitions, + downgrading_partitions, + ) = partition_counters(alive); + let (nodeset_member_count, sequencer_count, sequencer_placement_optimal) = + log_counters(logs, alive, node_id); + + let (process_age_seconds, mut status, versions) = + node_runtime_details(node_runtime_info.get(&plain_node_id), include_extra); + if !include_extra && node_config.has_role(Role::MetadataServer) { + status.metadata = Some(compact_metadata_status( + metadata_statuses.get(&plain_node_id), + )); + } + + Some(NodeStatus { + node_id: node_id.to_string(), + name: node_config.name.clone(), + roles, + state: NodeStateView { + liveness: NodeLiveness::Alive, + storage: include_extra.then(|| { + normalize_display_enum(node_config.log_server_config.storage_state) + }), + worker: include_extra.then(|| { + normalize_display_enum(node_config.worker_config.worker_state) + }), + }, + status, + partitions: NodePartitionSummary { + leader: leader_partitions, + follower: follower_partitions, + upgrading: upgrading_partitions, + downgrading: downgrading_partitions, + }, + logs: NodeLogSummary { + nodeset_member_count, + sequencer_count, + sequencer_placement_optimal, + }, + uptime_seconds: Some(alive.uptime_s), + process_age_seconds, + generation: include_extra.then_some(node_id.generation()), + address: include_extra.then_some(node_config.address.to_string()), + location: include_extra + .then(|| string_or_none(node_config.location.to_string())) + .flatten(), + versions, + }) + } + Some(node_state::State::Dead(_)) => { + let (process_age_seconds, status, versions) = + node_runtime_details(node_runtime_info.get(&plain_node_id), include_extra); + Some(NodeStatus { + node_id: plain_node_id.to_string(), + name: node_config.name.clone(), + roles, + state: NodeStateView { + liveness: NodeLiveness::Dead, + storage: include_extra.then(|| { + normalize_display_enum(node_config.log_server_config.storage_state) + }), + worker: include_extra.then(|| { + normalize_display_enum(node_config.worker_config.worker_state) + }), + }, + status, + partitions: NodePartitionSummary { + leader: 0, + follower: 0, + upgrading: 0, + downgrading: 0, + }, + logs: NodeLogSummary { + nodeset_member_count: 0, + sequencer_count: 0, + sequencer_placement_optimal: false, + }, + uptime_seconds: None, + process_age_seconds, + generation: include_extra + .then_some(node_config.current_generation.generation()), + address: include_extra.then_some(node_config.address.to_string()), + location: include_extra + .then(|| string_or_none(node_config.location.to_string())) + .flatten(), + versions, + }) + } + None => None, + } + }) + .collect() +} + +fn partition_counters(alive: &AliveNode) -> (u16, u16, u16, u16) { + let mut leaders = 0; + let mut followers = 0; + let mut upgrading = 0; + let mut downgrading = 0; + + for status in alive.partitions.values() { + let planned = RunMode::try_from(status.planned_mode).unwrap_or(RunMode::Unknown); + let effective = RunMode::try_from(status.effective_mode).unwrap_or(RunMode::Unknown); + match (effective, planned) { + (RunMode::Leader, RunMode::Leader) => leaders += 1, + (RunMode::Follower, RunMode::Follower) => followers += 1, + (RunMode::Follower, RunMode::Leader) => upgrading += 1, + (RunMode::Leader, RunMode::Follower) => downgrading += 1, + _ => {} + } + } + + (leaders, followers, upgrading, downgrading) +} + +fn log_counters( + logs: &Logs, + alive: &AliveNode, + node_id: GenerationalNodeId, +) -> (usize, usize, bool) { + let mut nodesets = 0usize; + let mut sequencers = 0usize; + let mut optimal = true; + + for (log_id, chain) in logs.iter() { + let tail = chain.tail(); + if tail.config.kind != ProviderKind::Replicated { + continue; + } + + let Ok(params) = ReplicatedLogletParams::deserialize_from(tail.config.params.as_bytes()) + else { + continue; + }; + + if params.nodeset.contains(node_id) { + nodesets += 1; + } + + if params.sequencer != node_id { + continue; + } + + sequencers += 1; + + let Some(partition_status) = alive.partitions.get(&u32::from(*log_id)) else { + optimal = false; + continue; + }; + let planned = RunMode::try_from(partition_status.planned_mode).unwrap_or(RunMode::Unknown); + let effective = + RunMode::try_from(partition_status.effective_mode).unwrap_or(RunMode::Unknown); + if planned != RunMode::Leader && effective != RunMode::Leader { + optimal = false; + } + } + + (nodesets, sequencers, optimal) +} + +fn node_runtime_details( + node_runtime_info: Option<&Result>, + include_extra: bool, +) -> (Option, NodeStatusView, Option) { + if !include_extra { + return (None, NodeStatusView::default(), None); + } + + match node_runtime_info { + Some(Ok(node_runtime_info)) => ( + Some(node_runtime_info.age_s), + NodeStatusView { + metadata: optional_proto_enum( + node_runtime_info.metadata_server_status().as_str_name(), + ), + node: optional_proto_enum(node_runtime_info.status().as_str_name()), + admin: optional_proto_enum(node_runtime_info.admin_status().as_str_name()), + worker: optional_proto_enum(node_runtime_info.worker_status().as_str_name()), + log_server: optional_proto_enum( + node_runtime_info.log_server_status().as_str_name(), + ), + }, + Some(NodeObservedVersions { + nodes_config: node_runtime_info.nodes_config_version, + logs: node_runtime_info.logs_version, + schema: node_runtime_info.schema_version, + partition_table: node_runtime_info.partition_table_version, + }), + ), + Some(Err(_)) => (None, NodeStatusView::default(), None), + None => (None, NodeStatusView::default(), None), + } +} + +fn build_logs_section(logs: &Logs) -> LogsSection { + let entries = logs + .iter() + .sorted_by_key(|(log_id, _)| *log_id) + .map(|(log_id, chain)| { + let tail = chain.tail(); + if tail.config.kind == ProviderKind::Replicated + && let Ok(params) = + ReplicatedLogletParams::deserialize_from(tail.config.params.as_bytes()) + { + let mut nodeset = params + .nodeset + .iter() + .map(|n| n.to_string()) + .collect::>(); + nodeset.sort(); + return LogEntry { + log_id: (*log_id).into(), + segment_count: chain.num_segments(), + from_lsn: tail.base_lsn.into(), + provider_kind: tail.config.kind.to_string(), + loglet_id: Some(params.loglet_id.to_string()), + replication_property: Some(params.replication.clone()), + sequencer: Some(params.sequencer.to_string()), + nodeset: Some(nodeset), + }; + } + + LogEntry { + log_id: (*log_id).into(), + segment_count: chain.num_segments(), + from_lsn: tail.base_lsn.into(), + provider_kind: tail.config.kind.to_string(), + loglet_id: None, + replication_property: None, + sequencer: None, + nodeset: None, + } + }) + .collect(); + + LogsSection { + version: logs.version().into(), + default_provider: logs_provider_view(logs.configuration().default_provider.clone()), + entries, + } +} + +fn build_partitions_section(cluster_state: &ClusterState) -> PartitionsSection { + #[derive(Clone)] + struct TempRow { + partition_id: u32, + host_node: GenerationalNodeId, + status: restate_types::protobuf::cluster::PartitionProcessorStatus, + } + + let mut rows = Vec::::new(); + let mut dead_nodes = Vec::<(PlainNodeId, DeadNodeRow)>::new(); + let mut max_epoch_per_partition = HashMap::::new(); + + for (node_id, node_state) in &cluster_state.nodes { + match node_state.state.as_ref() { + Some(node_state::State::Alive(alive)) => { + let host_node = alive + .generational_node_id + .map(GenerationalNodeId::from) + .unwrap_or_else(|| PlainNodeId::from(*node_id).with_generation(0)); + + for (partition_id, status) in &alive.partitions { + rows.push(TempRow { + partition_id: *partition_id, + host_node, + status: *status, + }); + let epoch = status + .last_observed_leader_epoch + .map(|e| e.value) + .unwrap_or(0); + max_epoch_per_partition + .entry(*partition_id) + .and_modify(|current| *current = (*current).max(epoch)) + .or_insert(epoch); + } + } + Some(node_state::State::Dead(dead)) => { + let plain_node_id = PlainNodeId::from(*node_id); + dead_nodes.push(( + plain_node_id, + DeadNodeRow { + node_id: plain_node_id.to_string(), + last_seen_alive_unix_millis: dead + .last_seen_alive + .as_ref() + .and_then(|ts| timestamp_to_unix_millis(ts.seconds, ts.nanos)), + }, + )); + } + None => {} + } + } + + dead_nodes.sort_by_key(|(node_id, _)| *node_id); + + let processors = rows + .into_iter() + .sorted_by(|a, b| { + a.partition_id + .cmp(&b.partition_id) + .then_with(|| a.host_node.cmp(&b.host_node)) + }) + .map(|row| { + let planned = RunMode::try_from(row.status.planned_mode).unwrap_or(RunMode::Unknown); + let effective = + RunMode::try_from(row.status.effective_mode).unwrap_or(RunMode::Unknown); + let replay = + ReplayStatus::try_from(row.status.replay_status).unwrap_or(ReplayStatus::Unknown); + + let observed_leader_epoch = row + .status + .last_observed_leader_epoch + .as_ref() + .map(|e| e.value); + let observed_leader_node = row.status.last_observed_leader_node.as_ref().map(|node| { + let generation = node.generation.unwrap_or_default(); + PlainNodeId::from(node.id) + .with_generation(generation) + .to_string() + }); + + let sees_itself_as_leader = row + .status + .last_observed_leader_node + .as_ref() + .and_then(|n| { + n.generation + .map(|g| PlainNodeId::from(n.id).with_generation(g) == row.host_node) + }) + .unwrap_or(false); + + let leadership_epoch = observed_leader_epoch.unwrap_or(0); + let leadership_epoch_outdated = leadership_epoch + < max_epoch_per_partition + .get(&row.partition_id) + .copied() + .unwrap_or(0); + + let last_applied_lsn = row + .status + .last_applied_log_lsn + .as_ref() + .map(|lsn| lsn.value); + let target_tail_lsn = row.status.target_tail_lsn.as_ref().map(|lsn| lsn.value); + let lsn_lag = target_tail_lsn + .zip(last_applied_lsn) + .map(|(tail, applied)| tail.saturating_sub(applied.saturating_add(1))); + + PartitionProcessorEntry { + partition_id: row.partition_id, + host_node: row.host_node.to_string(), + planned_mode: normalize_proto_enum(planned.as_str_name()), + effective_mode: normalize_proto_enum(effective.as_str_name()), + replay_status: normalize_proto_enum(replay.as_str_name()), + observed_leader_epoch, + observed_leader_node, + last_applied_lsn, + durable_lsn: row.status.durable_lsn.as_ref().map(|lsn| lsn.value), + last_archived_lsn: row + .status + .last_archived_log_lsn + .as_ref() + .map(|lsn| lsn.value), + target_tail_lsn, + lsn_lag, + updated_at_unix_millis: row + .status + .updated_at + .as_ref() + .and_then(|ts| timestamp_to_unix_millis(ts.seconds, ts.nanos)), + sees_itself_as_leader, + leadership_epoch_outdated, + } + }) + .collect(); + + PartitionsSection { + node_configuration_version: cluster_state + .nodes_config_version + .map(|v| v.value) + .unwrap_or_default(), + partition_table_version: cluster_state + .partition_table_version + .map(|v| v.value) + .unwrap_or_default(), + processors, + dead_nodes: dead_nodes.into_iter().map(|(_, row)| row).collect(), + } +} + +fn build_metadata_servers_section( + metadata_statuses: &MetadataServerStatusMap, +) -> MetadataServersSection { + let mut servers = Vec::new(); + let mut unreachable_nodes = Vec::new(); + + for (node_id, status) in metadata_statuses + .iter() + .sorted_by_key(|(node_id, _)| *node_id) + { + match status { + Ok(status) => servers.push(MetadataServerEntry { + node_id: node_id.to_string(), + status: normalize_proto_enum(status.status().as_str_name()), + configuration_version: status + .configuration + .as_ref() + .and_then(|config| config.version.map(|version| version.value)), + leader: status + .leader + .map(|leader| PlainNodeId::new(leader).to_string()), + members: status + .configuration + .as_ref() + .map(|config| { + config + .members + .keys() + .copied() + .map(PlainNodeId::from) + .sorted() + .map(|node_id| node_id.to_string()) + .collect() + }) + .unwrap_or_default(), + raft_applied: status + .raft + .as_ref() + .map(|raft| raft.applied) + .unwrap_or_default(), + raft_committed: status + .raft + .as_ref() + .map(|raft| raft.committed) + .unwrap_or_default(), + raft_term: status + .raft + .as_ref() + .map(|raft| raft.term) + .unwrap_or_default(), + raft_log_length: status + .raft + .as_ref() + .map(|raft| { + raft.last_index + .saturating_add(1) + .saturating_sub(raft.first_index) + }) + .unwrap_or_default(), + snapshot_index: status + .snapshot + .as_ref() + .map(|snapshot| snapshot.index) + .unwrap_or_default(), + snapshot_size_bytes: status + .snapshot + .as_ref() + .map(|snapshot| snapshot.size) + .unwrap_or_default(), + }), + Err(reason) => unreachable_nodes.push(UnreachableNodeRow { + node_id: node_id.to_string(), + error: reason.to_string(), + }), + } + } + + MetadataServersSection { + servers, + unreachable_nodes, + } +} + +fn compact_metadata_status( + metadata_status: Option<&Result>, +) -> String { + match metadata_status { + Some(Ok(status)) => normalize_proto_enum(status.status().as_str_name()), + Some(Err(status)) if status.code() == Code::Unimplemented => "local".to_owned(), + Some(Err(_)) => "unreachable".to_owned(), + None => normalize_proto_enum(MetadataServerStatus::Unknown.as_str_name()), + } +} + +fn normalize_proto_enum(name: &str) -> String { + // The protobuf `as_str_name()` values passed here follow the `EnumPrefix_VARIANT` pattern. + name.split_once('_') + .map(|(_, suffix)| suffix) + .unwrap_or(name) + .to_ascii_lowercase() +} + +fn optional_proto_enum(name: &str) -> Option { + let value = normalize_proto_enum(name); + (value != "unknown").then_some(value) +} + +fn normalize_display_enum(value: impl ToString) -> String { + value.to_string().replace('-', "_") +} + +fn string_or_none(value: String) -> Option { + (!value.is_empty()).then_some(value) +} + +fn timestamp_to_unix_millis(seconds: i64, nanos: i32) -> Option { + let seconds_millis = seconds.checked_mul(1_000)?; + let nanos_millis = i64::from(nanos) / 1_000_000; + seconds_millis.checked_add(nanos_millis) +} diff --git a/crates/admin/src/rest_api/mod.rs b/crates/admin/src/rest_api/mod.rs index 8601437692..95c259d650 100644 --- a/crates/admin/src/rest_api/mod.rs +++ b/crates/admin/src/rest_api/mod.rs @@ -15,6 +15,9 @@ mod deployments; mod error; mod handlers; mod health; +mod internal_cluster_common; +mod internal_cluster_config; +mod internal_cluster_status; mod invocations; mod query; mod services; @@ -128,6 +131,14 @@ where axum::routing::put(deployments::update_deployment), ) // Internal batch operation routes (for UI only, not documented in OpenAPI) + .route( + "/internal/cluster/config", + axum::routing::get(internal_cluster_config::get_internal_cluster_config), + ) + .route( + "/internal/cluster/status", + axum::routing::get(internal_cluster_status::get_internal_cluster_status), + ) .route( "/internal/invocations_batch_operations/kill", axum::routing::post(invocations::batch_kill_invocations), diff --git a/tools/restatectl/src/commands/partition/list.rs b/tools/restatectl/src/commands/partition/list.rs index 81203d3f42..32291e9f52 100644 --- a/tools/restatectl/src/commands/partition/list.rs +++ b/tools/restatectl/src/commands/partition/list.rs @@ -209,7 +209,9 @@ pub async fn list_partitions( .zip(processor.status.last_applied_log_lsn) .map(|(tail, applied)| { // (tail - 1) - applied_lsn = tail - (applied_lsn + 1) - tail.value.saturating_sub(applied.value + 1).to_string() + tail.value + .saturating_sub(applied.value.saturating_add(1)) + .to_string() }) .unwrap_or("-".to_owned()), ),